Kafka Connector介绍

GeaFlow 支持从 Kafka 中读取数据,并向 Kafka 写入数据。目前支持的 Kafka 版本为 2.4.1。

语法

CREATE TABLE kafka_table (
  id BIGINT,
  name VARCHAR,
  age INT
) WITH (
    type='kafka',
    geaflow.dsl.kafka.servers = 'localhost:9092',
    geaflow.dsl.kafka.topic = 'test-topic'
)

参数

参数名 是否必须 描述
geaflow.dsl.kafka.servers Kafka 的引导服务器(bootstrap)列表
geaflow.dsl.kafka.topic Kafka topic
geaflow.dsl.kafka.group.id Kafka组(group id),默认是'default-group-id'.

示例

CREATE TABLE kafka_source (
  id BIGINT,
  name VARCHAR,
  age INT
) WITH (
    type='kafka',
    geaflow.dsl.kafka.servers = 'localhost:9092',
    geaflow.dsl.kafka.topic = 'read-topic'
);

CREATE TABLE kafka_sink (
  id BIGINT,
  name VARCHAR,
  age INT
) WITH (
    type='kafka',
    geaflow.dsl.kafka.servers = 'localhost:9092',
    geaflow.dsl.kafka.topic = 'write-topic'
);

INSERT INTO kafka_sink
SELECT * FROM kafka_source;