创建FlinkServer作业写入数据至Kafka消息队列
本章节适用于MRS 3.1.2及之后的版本。
操作场景
本章节介绍Kafka作为source表或者sink表的DDL定义,以及创建表时使用的WITH参数和代码示例,并指导如何在FlinkServer作业管理页面操作。
本示例以安全模式Kafka为例。
前提条件
- 集群中已安装HDFS、Yarn、Kafka和Flink服务。
- 包含Kafka服务的客户端已安装,例如安装路径为:/opt/client
- 参考创建FlinkServer权限角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。
创建作业步骤
- 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考如何创建FlinkServer作业,新建Flink SQL流作业,在作业开发界面进行作业开发,配置完成后启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
CREATE TABLE KafkaSource ( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); CREATE TABLE KafkaSink( `user_id` VARCHAR, `user_name` VARCHAR, `age` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'scan.startup.mode' = 'latest-offset', 'value.format' = 'csv', 'properties.sasl.kerberos.service.name' = 'kafka',--普通模式集群不需要该参数,同时删除上一行的逗号 'properties.security.protocol' = 'SASL_PLAINTEXT',--普通模式集群不需要该参数 'properties.kerberos.domain.name' = 'hadoop.系统域名'--普通模式集群不需要该参数 ); Insert into KafkaSink select * from KafkaSource;
- 查看作业管理界面,作业状态为“运行中”。
- 参考管理Kafka Topic中的消息,执行以下命令查看Sink表中是否接收到数据,即5执行完成后查看Kafka topic是否正常写入数据。
sh kafka-console-consumer.sh --topic test_sink --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --consumer.config /opt/client/Kafka/kafka/config/consumer.properties
- 参考管理Kafka Topic中的消息,查看Topic并向Kafka中写入数据,输入完成后可在4中的窗口查看执行结果。
./kafka-topics.sh --list --bootstrap-server Kafka的Broker实例业务IP:Kafka端口号 --command-config 客户端目录/Kafka/kafka/config/client.properties
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
例如本示例使用主题名称为test_source:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic test_source --producer.config /opt/client/Kafka/kafka/config/producer.properties
输入消息内容:1,clw,33
输入完成后按回车发送消息。
WITH主要参数说明
配置项 |
是否必选 |
类型 |
描述 |
---|---|---|---|
connector |
必选 |
String |
指定要使用的连接器,Kafka使用“kafka” |
topic |
|
String |
主题名称
|
topic-pattern |
kafka作为source时可选 |
String |
主题模式 当表用作source时可设置该参数,主题名称需使用正则表达式
说明:
不能同时设置“topic-pattern” 和“topic” 。 |
properties.bootstrap.servers |
必选 |
String |
Kafka broker列表,以逗号分隔 |
properties.group.id |
kafka作为source时必选 |
String |
Kafka的使用者组ID |
format |
必选 |
String |
用于反序列化和序列化Kafka消息的值部分的格式 |
properties.* |
可选 |
String |
安全模式下需增加认证相关的参数 |
scan.topic-partition-discovery.interval |
可选 |
Duration |
消费者定时动态发现创建的Partition的时间间隔。默认值:5min。 |