更新时间:2024-12-11 GMT+08:00

创建FlinkServer作业写入数据至Doris

本章节适用于MRS 3.5.0及之后的版本。

操作场景

本章节提供了如何使用FlinkServer将Kafka数据写入到Doris中,和Doris数据和Kafka数据的Lookup Join操作指导。

前提条件

  • 集群中已安装Doris、HDFS、Yarn、Flink和Kafka等服务。
  • 待连接Doris数据库的节点与MRS集群网络互通。
  • 创建具有Doris管理权限的用户。
    • 集群已启用Kerberos认证(安全模式)

      在FusionInsight Manager中创建一个人机用户,例如“dorisuser”,创建一个拥有“Doris管理员权限”的角色绑定给该用户。

      使用新建的用户dorisuser重新登录FusionInsight Manager,修改该用户初始密码。

    • 集群未启用Kerberos认证(普通模式)

      使用admin用户连接Doris后,创建具有管理员权限的角色并绑定给用户。

  • 已安装MySQL客户端,相关操作可参考使用MySQL客户端连接Doris
  • 已安装Flink客户端。

使用FlinkServer将Kafka数据写入到Doris中

Doris侧操作

  1. 登录安装了MySQL的节点,执行以下命令,连接Doris数据库。

    集群已启用Kerberos认证(安全模式),需先执行以下命令再连接Doris数据库:

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1

    mysql -u数据库登录用户 -p -PFE查询连接端口 -hDoris FE实例IP地址

    执行命令后输入数据库登录用户密码。

    • Doris FE的查询连接端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
    • Doris DBalancer的TCP访问端口,可以通过登录Manager,单击“集群 > 服务 > Doris > 配置”,查询Doris服务的“balancer_tcp_port”参数获取。
    • Doris FE或DBalancer实例IP地址可通过登录MRS集群的Manager界面,单击“集群 > 服务 > Doris > 实例”,查看任一FE或DBalancer实例的业务IP地址。
    • 用户也可以使用MySQL连接软件或者在Doris WebUI界面连接数据库。

  2. 执行以下命令创建数据库和数据写入的目标表usertable2

    create database sink;

    use sink;

    create table usertable2(

    `user_id` VARCHAR(10),

    `user_name` VARCHAR(10),

    `age` INT

    )

    DISTRIBUTED BY HASH(user_id) BUCKETS 32;

Flink侧操作

  1. 使用具有FlinkServer管理员权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Flink”。

    对于开启了Kerberos认证的MRS集群,访问Flink WebUI,需提前创建具有FlinkServer管理员权限或应用查看、应用编辑权限的角色,并为用户绑定该角色,角色创建可参考创建FlinkServer权限角色

  2. 在“Flink WebUI”右侧,单击链接,访问FlinkServer。
  3. 进入FlinkServer界面后,选择 “作业管理 > 新建作业”,在新建作业界面配置以下参数填,并单击“确定”,进入作业管理界:

    • 类型:选择“Flink SQL”。
    • 名称:填写作业名称,例如:FlinkSQL1。

  4. 在Flink作业管理界面创建流或批的Flink SQL作业,例如:

    1. 创建Kafka数据源表:

      CREATE TABLE KafkaSource (

      `user_id` VARCHAR,

      `user_name` VARCHAR,

      `age` INT

      ) WITH (

      'connector' = 'kafka',

      'topic' = 'Topic名称',

      'properties.bootstrap.servers' = 'Kafka Broker实例的业务IP地址:Broker实例的端口',

      'properties.group.id' = 'testGroup',

      'scan.startup.mode' = 'latest-offset',

      'format' = 'csv',

      'properties.sasl.kerberos.service.name' = 'kafka',

      'properties.security.protocol' = 'SASL_PLAINTEXT',

      'properties.kerberos.domain.name' = 'hadoop.hadoop.com'

      );

      • properties.bootstrap.servers:参数值若有多个,可使用逗号分隔。

        其中,Kafka Broker实例的业务IP地址可在FusionInsight Manager界面,选择“集群 > 服务 > Kafka > 实例”查看;Broker实例的端口号可单击“配置”,集群已启用Kerberos认证(安全模式)搜索“sasl.port”参数查看,集群未启用Kerberos认证(普通模式)搜索“port”参数查看。

      • “properties.sasl.kerberos.service.name”、“properties.security.protocol”和“properties.kerberos.domain.name”的参数值可在安装了Kafka客户端的节点的“客户端安装目录/Kafka/kafka/config”目录下的“server.properties”文件中,搜索“sasl.kerberos.service.name”、“security.protocol”或“kerberos.domain.name”查看。
    2. 创建Doris Sink表:

      CREATE TABLE dorisSink (

      `user_id` VARCHAR,

      `user_name` VARCHAR,

      `age` INT

      ) WITH (

      'connector' = 'doris',

      'fenodes' = '任一FE实例IP地址:HTTPS端口号或HTTP端口',

      'table.identifier' = 'sink.usertable2',

      'username' = 'user',

      'password' = 'password',

      'doris.enable.https' = 'true',

      'doris.ignore.https.ca' = 'true',

      'sink.label-prefix' = 'doris_label1'

      );

      • FE实例的IP地址可在FusionInsight Manager,选择“集群 > 服务 > Doris > 实例”查看。
      • HTTPS端口号可在Manager界面,选择“集群 > 服务 > Doris > 配置”,搜索“https_port”查看;HTTP端口号可搜索“http_port”查看。
      • table.identifier:参数值为2Doris侧创建的数据库和表。
      • “username”和“password”为连接Doris的用户名和密码。
      • 集群未启用Kerberos认证(普通模式)集群已启用Kerberos认证(安全模式)关闭HTTPS后,需要去掉Doris Sink表的with子句中的如下配置参数:

        'doris.enable.https' = 'true'

        'doris.ignore.https.ca' = 'true'

      • 创建Doris Sink表时还可设置表1中的相关参数。
    3. 执行以下命令将Kafka数据写入Doris:

      insert into dorisSink select * from KafkaSource;

  5. 在作业管理界面右边的基础参数中勾选开启CheckPoint,“时间间隔(ms)”根据实际需求填写合适的值,推荐配置的时间间隔取值范围为30000~60000。
  6. 单击“语义校验”对输入内容进行语义校验,单击“保存”,单击“提交”提交作业。

Kafka侧操作

  1. 登录安装了Kafka客户端的节点,执行以下操作创建Kafka Topic:

    cd 客户端安装目录/Kafka/kafka/bin

    1. 执行以下命令创建Kafka Topic,且Topic名称需与6.a配置的Topic名称保持一致。:

      sh kafka-topics.sh --create --topic Topic名称 --partitions 1 --replication-factor 1 --bootstrap-server Kafka Broker实例的Controller所在主机的IP地址:Broker实例的端口 --command-config ../config/client.properties

    2. 执行以下命令查看Topic列表:

      sh kafka-topics.sh --list --bootstrap-server Kafka Broker实例的Controller所在主机的IP地址:Broker实例的端口 --command-config ../config/client.properties

    3. 执行以下命令连接Kafka客户端:

      sh kafka-console-producer.sh --broker-list Kafka Broker实例的Controller所在主机的IP地址:Broker实例的端口 --topic TopicTest --producer.config ../config/producer.properties

    Broker实例的Controller所在主机的IP地址可登录FusionInsight Manager界面,选择“集群 > 服务 > Kafka”,在“概览”界面的基本信息区域查看“Controller所在的主机”获取。

  2. 在安装了MySQL客户端的节点上连接Doris,执行以下命令查看Doris表中的数据是否和9.c插入的数据一致:

    select * from sink.usertable2

Lookup Join

  1. 使用具有FlinkServer管理员权限的用户登录FusionInsight Manager,选择“集群 > 服务 > Flink”。

    对于开启了Kerberos认证的MRS集群,访问Flink WebUI,需提前创建具有FlinkServer管理员权限或应用查看、应用编辑权限的角色,并为用户绑定该角色,角色创建可参考创建FlinkServer权限角色

  1. 在“Flink WebUI”右侧,单击链接,访问FlinkServer。
  2. 进入Flink WebUI界面后,选择 “作业管理 > 新建作业”,在新建作业界面配置以下参数填,并单击“确定”,进入作业管理界:

    • 类型:选择“Flink SQL”。
    • 名称:填写作业名称,例如:FlinkSQL2。

  3. 在Flink作业管理界面创建流或批的Flink SQL作业,例如:

    1. 创建Kafka Source表:

      CREATE TABLE fact_table (

      `id` BIGINT,

      `name` STRING,

      `city` STRING,

      `process_time` as proctime()

      ) WITH (

      'connector' = 'kafka',

      'topic' = 'Topic名称',

      'properties.bootstrap.servers' = 'Kafka Broker实例的IP地址:21007',

      'properties.group.id' = 'testGroup',

      'scan.startup.mode' = 'latest-offset',

      'format' = 'csv',

      'properties.sasl.kerberos.service.name' = 'kafka',

      'properties.security.protocol' = 'SASL_PLAINTEXT',

      'properties.kerberos.domain.name' = 'hadoop.hadoop.com'

      );

      • properties.bootstrap.servers:参数值若有多个,可使用逗号分隔。

        其中,Kafka Broker实例的业务IP地址可在FusionInsight Manager界面,选择“集群 > 服务 > Kafka > 实例”查看;Broker实例的端口号可单击“配置”,集群已启用Kerberos认证(安全模式)搜索“sasl.port”参数查看,集群未启用Kerberos认证(普通模式)搜索“port”参数查看。

      • “properties.sasl.kerberos.service.name”、“properties.security.protocol”和“properties.kerberos.domain.name”的参数值可在安装了Kafka客户端的节点的“客户端安装目录/Kafka/kafka/config”目录下的“server.properties”文件中,搜索“sasl.kerberos.service.name”、“security.protocol”或“kerberos.domain.name”查看。
    2. 创建Flink表:

      create table dim_city(

      `city` STRING,

      `level` INT ,

      `province` STRING,

      `country` STRING

      ) WITH (

      'connector' = 'doris',

      'fenodes' = 'FE实例IP地址:HTTPS端口或者HTTP端口',

      'jdbc-url' = 'jdbc:mysql://FE实例IP地址:FE查询连接端口',

      'table.identifier' = 'dim.dim_city',

      'username' = 'user',

      'password' = 'password'

      );

      • FE实例的IP地址可登录FusionInsight Manager,选择“集群 > 服务 > Doris > 实例”查看。
      • HTTPS端口可在Manager界面,选择“集群 > 服务 > Doris > 配置”,搜索“https_port”查看;HTTP端口可搜索“http_port”查看。
      • Doris FE的查询连接端口,可以通过登录Manager,选择“集群 > 服务 > Doris > 配置”,查询Doris服务的“query_port”参数获取。
      • 创建Flink表时还可设置表2中的相关参数。
    3. 执行以下命令进行JOIN查询:

      SELECT a.id, a.name, a.city, c.province, c.country,c.level

      FROM fact_table a

      LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c

      ON a.city = c.city

  4. 在作业管理界面右边的基础参数中勾选开启CheckPoint,“时间间隔(ms)”根据实际需求填写合适的值,推荐配置的时间间隔取值范围为30000~60000。
  5. 单击“语义校验”对输入内容进行语义校验,单击“保存”,单击“提交”提交作业。

相关表参数配置说明

表1 创建Doris Sink表时可选配置参数

参数名称

参数默认值

是否必须配置

参数描述

doris.request.retries

3

向Doris发送请求的重试次数。

doris.request.connect.timeout.ms

30000

向Doris发送请求的连接超时时间。

doris.request.read.timeout.ms

30000

向Doris发送请求的读取超时时间。

sink.max-retries

3

Commit失败后的最大重试次数,默认为3次。

sink.enable.batch-mode

false

是否使用攒批模式写入Doris,开启后写入时机不依赖CheckPoint,通过“sink.buffer-flush.max-rows”、“sink.buffer-flush.max-bytes”或“sink.buffer-flush.interval”参数来控制写入时机。

sink.buffer-flush.max-rows

50000

攒批模式下,单个批次最多写入的数据行数。

sink.buffer-flush.max-bytes

10MB

攒批模式下,单个批次最多写入的字节数。

sink.buffer-flush.interval

10s

攒批模式下,异步刷新缓存的间隔。

表2 Lookup Join场景下创建Flink表时的可选配置参数

参数名称

参数默认值

是否必须配置

参数描述

lookup.cache.max-rows

-1

Lookup缓存的最大行数,默认值为:-1,表示不开启缓存。

lookup.cache.ttl

10s

Lookup缓存的最大时间,默认值为10s。

lookup.max-retries

1

Lookup查询失败后的重试次数。

lookup.jdbc.async

false

是否开启异步Lookup,默认值为:false。

lookup.jdbc.read.batch.size

128

启用异步Lookup时,每次查询的最大批次大小。

lookup.jdbc.read.batch.queue-size

256

启用异步Lookup时,中间缓冲队列的大小。

lookup.jdbc.read.thread-size

3

每个Task中Lookup的JDBC线程数。