创建FlinkServer作业写入数据至Doris表
本章节适用于MRS 3.5.0及之后的版本。
操作场景
本章节提供了如何使用FlinkServer将Kafka数据写入到Doris中,和Doris数据和Kafka数据的Lookup Join操作指导。
前提条件
- 集群中已安装Doris、HDFS、Yarn、Flink和Kafka等服务。
- 待连接Doris数据库的节点与MRS集群网络互通。
- 创建具有Doris管理权限的用户。
- 集群已启用Kerberos认证(安全模式)
在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。
使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。
- 集群未启用Kerberos认证(普通模式)
使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。
- 集群已启用Kerberos认证(安全模式)
- 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris。
- 已安装Flink客户端。
使用FlinkServer将Kafka数据写入到Doris中
Doris侧操作。
- 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。
若集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:
export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1
mysql -u数据库登录用户 -p -PFE查询连接端口 -hDoris FE实例IP地址
执行命令后输入数据库登录用户密码。
- Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
- Doris DBalancer的TCP访问端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“balancer_tcp_port”参数获取。
- Doris FE或DBalancer实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE或DBalancer实例的业务IP地址。
- 用户也可以使用MySQL连接软件或者在Doris WebUI界面连接数据库。
- 执行以下命令创建数据库和数据写入的目标表usertable2:
create database sink;
use sink;
create table usertable2(
`user_id` VARCHAR(10),
`user_name` VARCHAR(10),
`age` INT
)
DISTRIBUTED BY HASH(user_id) BUCKETS 32;
Flink侧操作。
- 使用具有FlinkServer管理员权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Flink”。
对于开启了Kerberos认证的MRS集群,访问Flink WebUI,需提前创建具有FlinkServer管理员权限或应用查看、应用编辑权限的角色,并为用户绑定该角色,角色创建可参考创建FlinkServer权限角色。
- 在“Flink WebUI”右侧,单击链接,访问FlinkServer。
- 进入FlinkServer界面后,选择 “作业管理 > 新建作业”,在新建作业界面配置以下参数填,并单击“确定”,进入作业管理界:
- 类型:选择“Flink SQL”。
- 名称:填写作业名称,例如:FlinkSQL1。
- 在Flink作业管理界面创建流或批的Flink SQL作业,例如:
- 创建Kafka数据源表:
`user_id` VARCHAR,
`user_name` VARCHAR,
`age` INT
) WITH (
'connector' = 'kafka',
'topic' = 'Topic名称',
'properties.bootstrap.servers' = 'Kafka Broker实例的业务IP地址:Broker实例的端口号',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
- properties.bootstrap.servers:参数值若有多个,可使用逗号分隔。
其中,Kafka Broker实例的业务IP地址可在FusionInsight Manager界面,选择“集群 > 服务 > Kafka > 实例”查看;Broker实例的端口号可单击“配置”,集群已启用Kerberos认证(安全模式)搜索“sasl.port”参数查看,集群未启用Kerberos认证(普通模式)搜索“port”参数查看。
- “properties.sasl.kerberos.service.name”、“properties.security.protocol”和“properties.kerberos.domain.name”的参数值可在安装了Kafka客户端的节点的“客户端安装目录/Kafka/kafka/config”目录下的“server.properties”文件中,搜索“sasl.kerberos.service.name”、“security.protocol”或“kerberos.domain.name”查看。
- properties.bootstrap.servers:参数值若有多个,可使用逗号分隔。
- 创建Doris Sink表:
`user_id` VARCHAR,
`user_name` VARCHAR,
`age` INT
) WITH (
'connector' = 'doris',
'fenodes' = '任一FE实例IP地址:HTTPS端口号或HTTP端口',
'table.identifier' = 'sink.usertable2',
'username' = 'user',
'password' = 'password',
'doris.enable.https' = 'true',
'doris.ignore.https.ca' = 'true',
'sink.label-prefix' = 'doris_label1'
);
- FE实例的IP地址可在FusionInsight Manager,选择“集群 > 服务 > Doris > 实例”查看。
- HTTPS端口号可在Manager界面,选择“集群 > 服务 > Doris > 配置”,搜索“https_port”查看;HTTP端口号可搜索“http_port”查看。
- table.identifier:参数值为2Doris侧创建的数据库和表。
- “username”和“password”为连接Doris的用户名和密码。
- 集群未启用Kerberos认证(普通模式)和集群已启用Kerberos认证(安全模式)关闭HTTPS后,需要去掉Doris Sink表的with子句中的如下配置参数:
'doris.enable.https' = 'true'
'doris.ignore.https.ca' = 'true'
- 创建Doris Sink表时还可设置表1中的相关参数。
- 执行以下命令将Kafka数据写入Doris:
- 创建Kafka数据源表:
- 在作业管理界面右边的基础参数中勾选开启CheckPoint,“时间间隔(ms)”根据实际需求填写合适的值,推荐配置的时间间隔取值范围为30000~60000。
- 单击“语义校验”对输入内容进行语义校验,单击“保存”,单击“提交”提交作业。
Kafka侧操作。
- 登录安装了Kafka客户端的节点,执行以下操作创建Kafka Topic:
cd 客户端安装目录/Kafka/kafka/bin
- 执行以下命令创建Kafka Topic,且Topic名称需与6.a配置的Topic名称保持一致。:
sh kafka-topics.sh --create --topic Topic名称 --partitions 1 --replication-factor 1 --bootstrap-server Kafka Broker实例的Controller所在主机的IP地址:Broker实例的端口号 --command-config ../config/client.properties
- 执行以下命令查看Topic列表:
sh kafka-topics.sh --list --bootstrap-server Kafka Broker实例的Controller所在主机的IP地址:Broker实例的端口号 --command-config ../config/client.properties
- 执行以下命令连接Kafka客户端:
sh kafka-console-producer.sh --broker-list Kafka Broker实例的Controller所在主机的IP地址:Broker实例的端口号 --topic TopicTest --producer.config ../config/producer.properties
Broker实例的Controller所在主机的IP地址可登录FusionInsight Manager界面,选择“集群 > 服务 > Kafka”,在“概览”界面的基本信息区域查看“Controller所在的主机”获取。
- 执行以下命令创建Kafka Topic,且Topic名称需与6.a配置的Topic名称保持一致。:
- 在安装了MySQL客户端的节点上连接Doris,执行以下命令查看Doris表中的数据是否和9.c插入的数据一致:
select * from sink.usertable2;
Lookup Join
- 使用具有FlinkServer管理员权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Flink”。
对于开启了Kerberos认证的MRS集群,访问Flink WebUI,需提前创建具有FlinkServer管理员权限或应用查看、应用编辑权限的角色,并为用户绑定该角色,角色创建可参考创建FlinkServer权限角色。
- 在“Flink WebUI”右侧,单击链接,访问FlinkServer。
- 进入Flink WebUI界面后,选择 “作业管理 > 新建作业”,在新建作业界面配置以下参数填,并单击“确定”,进入作业管理界:
- 类型:选择“Flink SQL”。
- 名称:填写作业名称,例如:FlinkSQL2。
- 在Flink作业管理界面创建流或批的Flink SQL作业,例如:
- 创建Kafka Source表:
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
'topic' = 'Topic名称',
'properties.bootstrap.servers' = 'Kafka Broker实例的IP地址:21007',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'properties.sasl.kerberos.service.name' = 'kafka',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.hadoop.com'
);
- properties.bootstrap.servers:参数值若有多个,可使用逗号分隔。
其中,Kafka Broker实例的业务IP地址可在FusionInsight Manager界面,选择“集群 > 服务 > Kafka > 实例”查看;Broker实例的端口号可单击“配置”,集群已启用Kerberos认证(安全模式)搜索“sasl.port”参数查看,集群未启用Kerberos认证(普通模式)搜索“port”参数查看。
- “properties.sasl.kerberos.service.name”、“properties.security.protocol”和“properties.kerberos.domain.name”的参数值可在安装了Kafka客户端的节点的“客户端安装目录/Kafka/kafka/config”目录下的“server.properties”文件中,搜索“sasl.kerberos.service.name”、“security.protocol”或“kerberos.domain.name”查看。
- properties.bootstrap.servers:参数值若有多个,可使用逗号分隔。
- 创建Flink表:
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'FE实例IP地址:HTTPS端口或者HTTP端口',
'jdbc-url' = 'jdbc:mysql://FE实例IP地址:FE查询连接端口',
'table.identifier' = 'dim.dim_city',
'username' = 'user',
'password' = 'password'
);
- FE实例的IP地址可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 实例”查看。
- HTTPS端口可在Manager界面,选择“集群 > 服务 > Doris > 配置”,搜索“https_port”查看;HTTP端口可搜索“http_port”查看。
- Doris FE的查询连接端口,可以通过登录Manager,选择“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
- 创建Flink表时还可设置表2中的相关参数。
- 执行以下命令进行JOIN查询:
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city
- 创建Kafka Source表:
- 在作业管理界面右边的基础参数中勾选开启CheckPoint,“时间间隔(ms)”根据实际需求填写合适的值,推荐配置的时间间隔取值范围为30000~60000。
- 单击“语义校验”对输入内容进行语义校验,单击“保存”,单击“提交”提交作业。
相关表参数配置说明
参数名称 |
参数默认值 |
是否必须配置 |
参数描述 |
---|---|---|---|
doris.request.retries |
3 |
否 |
向Doris发送请求的重试次数。 |
doris.request.connect.timeout.ms |
30000 |
否 |
向Doris发送请求的连接超时时间。 |
doris.request.read.timeout.ms |
30000 |
否 |
向Doris发送请求的读取超时时间。 |
sink.max-retries |
3 |
否 |
Commit失败后的最大重试次数,默认为3次。 |
sink.enable.batch-mode |
false |
否 |
是否使用攒批模式写入Doris,开启后写入时机不依赖CheckPoint,通过“sink.buffer-flush.max-rows”、“sink.buffer-flush.max-bytes”或“sink.buffer-flush.interval”参数来控制写入时机。 |
sink.buffer-flush.max-rows |
50000 |
否 |
攒批模式下,单个批次最多写入的数据行数。 |
sink.buffer-flush.max-bytes |
10MB |
否 |
攒批模式下,单个批次最多写入的字节数。 |
sink.buffer-flush.interval |
10s |
否 |
攒批模式下,异步刷新缓存的间隔。 |
参数名称 |
参数默认值 |
是否必须配置 |
参数描述 |
---|---|---|---|
lookup.cache.max-rows |
-1 |
否 |
Lookup缓存的最大行数,默认值为:-1,表示不开启缓存。 |
lookup.cache.ttl |
10s |
否 |
Lookup缓存的最大时间,默认值为10s。 |
lookup.max-retries |
1 |
否 |
Lookup查询失败后的重试次数。 |
lookup.jdbc.async |
false |
否 |
是否开启异步Lookup,默认值为:false。 |
lookup.jdbc.read.batch.size |
128 |
否 |
启用异步Lookup时,每次查询的最大批次大小。 |
lookup.jdbc.read.batch.queue-size |
256 |
否 |
启用异步Lookup时,中间缓冲队列的大小。 |
lookup.jdbc.read.thread-size |
3 |
否 |
每个Task中Lookup的JDBC线程数。 |