文档首页/ MapReduce服务 MRS/ 最佳实践/ 数据分析/ 实时OLAP数据分析/ 通过FlinkSQL将订单表Kafka数据实时同步到Doris
更新时间:2025-08-09 GMT+08:00
分享

通过FlinkSQL将订单表Kafka数据实时同步到Doris

应用场景

创建MRS FlinkServer作业,将订单表Kafka数据通过FlinkSQL实时同步到Doris。

方案架构

对实时性要求较高的场景,可直接将实时流式数据通过FlinkSQL传入Doris中,在Doris中进行实时查询。

如果数据在加载到Doris之前需要处理,可直接在FlinkSQL中进行数据加工,将加工后的数据实时写入Doris中。

图1 实时数据加载
  • 场景特点:
    • 业务特点为实时流式数据,需要针对实时流式数据进行快速的统计查询。
    • 前端业务库系统实时同步到后端分析系统,在后端分析系统进行实时OLAP分析。
  • 数据加载方式:
    • 前端数据统一接入Kafka中,Kafka作为实时数据加工的唯一数据源。
    • 针对仅追加的数据,按正常的数据插入逻辑,进行数据插入。
    • 针对实时数据同步方式,数据格式指定为Debezium方式,利用Flink的数据同步能力将数据同步到Flink。
  • 注意事项:
    • 实时流式数据写入Doris时,需要使用微批模式,一批数据写入一次,提高写入效率。
    • 使用DataArts Studio DLF进行流任务调度开发时,需要使用2.10.1及以上的版本。

步骤1:创建MRS Flink集群并配置

  1. 创建MRS集群。

    详细操作请参考自定义购买MRS集群,例如相关参数如下:

    • 集群名称:自定义,例如“mrs_test”
    • 版本类型:LTS版
    • 集群版本:MRS 3.3.0-LTS
    • 集群组件包含:Kafka、Flink等
    • Kerberos认证:开启

    其他参数根据实际需要进行配置。

  2. 为集群添加拥有OBS权限的委托。

    1. 创建具有访问OBS权限的ECS委托,如果已存在可用的OBS权限委托,则跳过该步骤。

      详细操作请参考配置MRS集群通过IAM委托对接OBS中“创建具有访问OBS权限的ECS委托”部分,例如委托名称为“mrs_ecs_obs”。

    2. 在MRS管理控制台“现有集群”中单击已创建完成的MRS集群名称。

      在集群“概览”页签单击“委托”后的“选择委托”,选择已创建的委托名称(例如“mrs_ecs_obs”),单击“确定”。

  3. 创建具有FlinkServer管理权限的用户。

    1. 登录集群Manager页面,详细操作请参考访问MRS集群Manager
    2. 选择“系统 > 权限 > 角色 > 添加角色”。

      填写角色名称,如“flinkrole”,在“配置资源权限”的表格中选择“待操作集群的名称 > Flink”,勾选“FlinkServer管理操作权限”,单击“确定”。

    3. 选择“用户 > 添加用户”,填写用户名称,如“flinkuser”,“用户类型”选择“人机”,“密码策略”默认,输入用户密码并确认密码,关联“flinkrole”、“System_administrator”角色,单击“确定”。
    4. 使用新建的flinkuser用户重新登录FusionInsight Manager,修改该用户初始密码。

  4. 集群购买完成后安装集群全量客户端,详细操作请参考安装客户端(3.x版本)

    例如客户端安装路径为“/opt/client”。

步骤2:创建MRS Doris集群并配置

  1. 创建MRS Doris集群。

    详细操作请参考自定义购买MRS集群,例如相关参数如下:

    • 集群名称:自定义,例如“mrs_doris”
    • 集群版本:MRS 3.3.0-LTS
    • 组件选择:Doris集群。
    • Kerberos认证:开启

    其他参数根据实际需要进行配置。

  2. 为集群添加拥有OBS权限的委托。

    1. 创建具有访问OBS权限的ECS委托,如果已存在可用的OBS权限委托,则跳过该步骤。

      详细操作请参考配置MRS集群通过IAM委托对接OBS中“创建具有访问OBS权限的ECS委托”部分,例如委托名称为“mrs_ecs_obs”。

    2. 在MRS管理控制台“现有集群”中单击已创建完成的MRS集群名称。

      在集群“概览”页签单击“委托”后的“选择委托”,选择已创建的委托名称(例如“mrs_ecs_obs”),单击“确定”。

  3. 创建拥有Doris管理权限的人机用户(例如用户名为dorisuser),并修改初始密码。

    1. 登录Doris集群Manager页面。
    2. 选择“系统 > 权限 > 角色 > 添加角色”,填写角色名称,如“dorisrole”,在“配置资源权限”选择“待操作的集群 > Doris”,勾选“Doris管理员权限”,单击“确定”。
    3. 选择“用户 > 添加用户”,填写用户名称,如“dorisuser”,“用户类型”选择“人机”,“密码策略”默认,输入用户密码并确认密码,关联“dorisrole”角色,单击“确定”。
    4. 使用新建的dorisuser用户重新登录FusionInsight Manager,修改该用户初始密码。

  4. 集群购买完成后安装集群客户端。

    详细操作请参考安装客户端(3.x版本)

    例如客户端安装路径为“/opt/dorisclient”。

  5. 在Doris集群安装MySQL客户端。

    详细操作请参考使用MySQL客户端连接Doris

步骤3:准备数据

  1. 创建Doris的Unique表。

    1. 登录Doris集群中已安装好MySQL客户端的节点,执行如下命令进入Doris数据库。
      export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1 //若集群已启用Kerberos认证(安全模式)需执行该命令。
      mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
      • 数据库连接端口为Doris FE的查询连接端口,可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 实例”,查看任一FE实例的业务IP地址获取。
      • Doris FE实例IP地址可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 配置 > 全部配置”,搜索并查看“query_port”参数值获取。
    2. 执行以下命令创建Doris的Unique表。
      create database test;
      use test;
      CREATE TABLE `lineorder_u` (
        `LO_ORDERKEY` bigint NOT NULL COMMENT "订单id",
        `LO_LINENUMBER` int NOT NULL COMMENT "订单码",
        `LO_CUSTKEY` bigint NOT NULL COMMENT "客户id",
        `LO_PARTKEY` bigint NOT NULL COMMENT "商品id",
        `LO_SUPPKEY` bigint NOT NULL COMMENT "供应商id",
        `LO_ORDERDATE` date DEFAULT NULL COMMENT "订单日期",
        `LO_ORDERPRIORITY` varchar(100) DEFAULT NULL COMMENT "订单优先级",
        `LO_SHIPPRIORITY` int DEFAULT NULL COMMENT "物流优先级",
        `LO_QUANTITY` int DEFAULT NULL COMMENT "数量",
        `LO_EXTENDEDPRICE` bigint DEFAULT NULL COMMENT "扩展价格",
        `LO_ORDTOTALPRICE` bigint DEFAULT NULL COMMENT "订单总价",
        `LO_DISCOUNT` int DEFAULT NULL COMMENT "折扣",
        `LO_REVENUE` bigint DEFAULT NULL COMMENT "收入",
        `LO_SUPPLYCOST` int DEFAULT NULL COMMENT "成本",
        `LO_TAX` int DEFAULT NULL COMMENT "税款",
        `LO_COMMITDATE` date DEFAULT NULL COMMENT "承诺日期",
        `LO_SHIPMODE` varchar(100) NOT NULL COMMENT "运输方式"
      ) UNIQUE KEY (
        `LO_ORDERKEY`,
        `LO_LINENUMBER`,
        `LO_CUSTKEY`,
        `LO_PARTKEY`,
        `LO_SUPPKEY`,
        `LO_ORDERDATE`
      ) PARTITION BY RANGE(`LO_ORDERDATE`) (
        PARTITION `p1992`
        VALUES
          LESS THAN ("1993-01-01"),
          PARTITION `p1993`
        VALUES
          LESS THAN ("1994-01-01"),
          PARTITION `p1994`
        VALUES
          LESS THAN ("1995-01-01"),
          PARTITION `p1995`
        VALUES
          LESS THAN ("1996-01-01"),
          PARTITION `p1996`
        VALUES
          LESS THAN ("1997-01-01"),
          PARTITION `p1997`
        VALUES
          LESS THAN ("1998-01-01"),
          PARTITION `p1998`
        VALUES
          LESS THAN ("1999-01-01"),
          PARTITION `p1999`
        VALUES
          LESS THAN ("2000-01-01"),
          PARTITION `p2000`
        VALUES
          LESS THAN ("2001-01-01"),
          PARTITION `p2001`
        VALUES
          LESS THAN ("2002-01-01")
      ) DISTRIBUTED BY HASH(`LO_ORDERDATE`) BUCKETS 1 PROPERTIES (
        "replication_allocation" = "tag.location.default: 3"
      );

  2. 创建Kafka多分区的Topic。

    1. 以客户端安装用户,登录MRS Flink集群中安装了客户端的节点。
    2. 执行以下命令登录客户端。
      source /opt/client/bigdata_env
      kinit 业务用户
    3. 执行以下命令创建Kafka多分区的Topic
      cd /opt/client/Kafka/kafka/bin
      sh kafka-topics.sh --create --topic 主题名称 --partitions 主题占用的分区数 --replication-factor 主题的备份个数 --zookeeper ZooKeeper角色实例所在节点IP地址:clientPort/kafka
      • ZooKeeper角色实例所在节点IP地址:登录FusionInsight Manager,选择“集群 > 服务 > ZooKeeper > 实例”,查看并记录ZooKeeper角色实例业务IP地址获取。
      • clientPort:可在ZooKeeper的全部配置参数中搜索“clientPort”查看。

      例如创建3分区的Topic,则执行以下命令:

      sh kafka-topics.sh --create --topic topic_lineorder --partitions 3 --replication-factor 3 --zookeeper 10.10.10.x:2181/kafka

步骤4:创建Flink SQL作业

  1. 使用步骤 3创建的具有FlinkServer管理员权限的用户(例如flinkuser),登录MRS Flink集群Manager页面。
  2. 选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
  3. 选择“作业管理 > 新建作业”,在新建作业页面配置相关信息后,单击“确定”。

    • 类型:Flink SQL
    • 名称:自定义
    • 作业类型:流作业
    • 描述:自定义

  4. 配置Flink SQL作业信息。

    图2 配置Flink SQL作业
    1. 在作业开发界面配置作业信息,例如,配置如下:
      CREATE TABLE IF NOT EXISTS flink_kafka_source (
        `LO_ORDERKEY` BIGINT,
        `LO_LINENUMBER` INT,
        `LO_CUSTKEY` BIGINT,
        `LO_PARTKEY` BIGINT,
        `LO_SUPPKEY` BIGINT,
        `LO_ORDERDATE` DATE,
        `LO_ORDERPRIORITY` STRING,
        `LO_SHIPPRIORITY` INT,
        `LO_QUANTITY` INT,
        `LO_EXTENDEDPRICE` BIGINT,
        `LO_ORDTOTALPRICE` BIGINT,
        `LO_DISCOUNT` INT,
        `LO_REVENUE` BIGINT,
        `LO_SUPPLYCOST` INT,
        `LO_TAX` INT,
        `LO_COMMITDATE` DATE,
        `LO_SHIPMODE` STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'topic名称',
        'properties.group.id' = 'group1',
        'scan.startup.mode' = 'earliest-offset',
        'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP1:Kafka端口号,Kafka的Broker实例业务IP2:Kafka端口号',
        'format' = 'json',
        'properties.sasl.kerberos.service.name' = 'kafka',
        'properties.security.protocol' = 'SASL_PLAINTEXT',
        'properties.kerberos.domain.name' = 'hadoop.系统域名'
      );
      CREATE TABLE IF NOT EXISTS flink_doris_sink (
        `LO_ORDERKEY` BIGINT,
        `LO_LINENUMBER` INT,
        `LO_CUSTKEY` BIGINT,
        `LO_PARTKEY` BIGINT,
        `LO_SUPPKEY` BIGINT,
        `LO_ORDERDATE` DATE,
        `LO_ORDERPRIORITY` STRING,
        `LO_SHIPPRIORITY` INT,
        `LO_QUANTITY` INT,
        `LO_EXTENDEDPRICE` BIGINT,
        `LO_ORDTOTALPRICE` BIGINT,
        `LO_DISCOUNT` INT,
        `LO_REVENUE` BIGINT,
        `LO_SUPPLYCOST` INT,
        `LO_TAX` INT,
        `LO_COMMITDATE` DATE,
        `LO_SHIPMODE` STRING
      ) WITH (
        'connector' = 'doris',
        'fenodes' = 'Doris的FE实例IP地址1:Doris端口号,Doris的FE实例IP地址2:Doris端口号,Doris的FE实例IP地址3:Doris端口号’,
      ' table.identifier ' = ' database_name.table_name ',
      ' username ' = ' user ',
      ' password ' = ' password ',
      ' sink.label - prefix ' = ' doris_label_xxx ',
      ' doris.enable.https ' = ' true ',
      ' doris.ignore.https.ca ' = ' true '
      );
      
      insert into
      flink_doris_sink
      select
      *
      from
      flink_kafka_source;

      Kafka相关配置参数说明:

      • 'topic' = 'topic名称':已创建kafka多分区的Topic。
      • 'properties.group.id' = 'group1':Kafka的group
      • Kafka的Broker实例业务IP:登录Manager,选择“集群 > 服务 > Kafka> 实例”,查看Broker实例业务IP。
      • Kafka端口号:登录Manager,选择“集群 > 服务 > Kafka > 配置”,搜索Kafka服务的“sasl.port”参数获取。
      • hadoop.系统域名:登录Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。

      Doris相关配置参数说明:

      • Doris的FE实例IP地址:登录Manager,选择“集群 > 服务 > Doris > 实例”,查询FE实例的业务IP地址获取。
      • Doris端口号:登录Manager,选择“集群 > 服务 > Doris > 配置”,查询Doris服务的“https_port”参数获取。
      • database_name.table_name:数据库名,表名。
      • 'username' = 'user':为在两集群上分别创建的具有FlinkServer、Doris管理员权限的同名用户。
      • 'password' = 'password':创建的具有FlinkServer、Doris管理员权限的用户密码。
      • doris_label_xxx:sink.label-prefix的值,必须是唯一的,每次启任务必须保持其值的唯一性。
    2. 在界面左侧配置作业“基础参数”。
      • 并行度:3
      • JobManager内存(MB):4096
      • 提交队列:自定义,默认default
      • 勾选“taskManager”
        • slot数量:1
        • 内存(MB):4096
      • 勾选“开启CheckPoint”
        • 时间间隔(ms):20000(默认最小时间间隔20s)
        • 模式:EXACTLY_ONCE,其他默认参数即可
      • 故障恢复策略:fixed-delay
        • 重试次数:3
        • 失败重试间隔(s):60
      • 其他配置保持默认即可
    3. 配置作业完成后,可以单击上方“语义校验”对输入内容校验,单击“SQL格式化”对SQL语句进行格式化。确认无问题后,单击“保存”,单击“提交”提交作业。
    4. 在“作业管理”页面单击Flink SQL作业“操作”列的“启动”,运行作业。

步骤5:Kafka数据通过FlinkSQL实时同步到Doris

  1. 以客户端安装用户,登录MRS Flink集群中安装了客户端的节点。
  2. 执行以下命令登录客户端。

    source /opt/client/bigdata_env
    kinit 业务用户

  3. 执行以下命令向Kafka写入数据。

    cd /opt/client/Kafka/kafka/bin
    sh kafka-console-producer.sh --broker-list  Kafka角色实例所在节点的IP地址:Kafka端口号 --topic topic名称  --producer.config /opt/newclient/Kafka/kafka/config/producer.properties
    • Kafka角色实例所在节点的IP地址:登录Manager后,选择“集群 > 服务 > Kafka > 实例”,记录Broker角色实例其中任意一个的IP地址。
    • Kafka端口号:登录Manager后,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,集群已启用Kerberos认证(安全模式)搜索并获取“sasl.port”参数的值,集群未启用Kerberos认证(普通模式)搜索并获取“port”的值。

    例如,写入数据内容如下:

    {"LO_ORDERKEY":"1" ,"LO_LINENUMBER":"1" ,"LO_CUSTKEY":"1" ,"LO_PARTKEY":"1" ,"LO_SUPPKEY":"1" ,"LO_ORDERDATE": "1996-01-02","LO_ORDERPRIORITY":"5-LOW" ,"LO_SHIPPRIORITY":"0" ,"LO_QUANTITY":"17" ,"LO_EXTENDEDPRICE":"6511" ,"LO_ORDTOTALPRICE":"9991" ,"LO_DISCOUNT":"4" ,"LO_REVENUE":"99999" ,"LO_SUPPLYCOST":"95999" ,"LO_TAX":"1001" ,"LO_COMMITDATE":"1996-01-09" ,"LO_SHIPMODE": "AIR"}
    {"LO_ORDERKEY":"2" ,"LO_LINENUMBER":"1" ,"LO_CUSTKEY":"1" ,"LO_PARTKEY":"1" ,"LO_SUPPKEY":"1" ,"LO_ORDERDATE": "1996-01-02","LO_ORDERPRIORITY":"5-LOW" ,"LO_SHIPPRIORITY":"0" ,"LO_QUANTITY":"17" ,"LO_EXTENDEDPRICE":"6511" ,"LO_ORDTOTALPRICE":"9991" ,"LO_DISCOUNT":"4" ,"LO_REVENUE":"99999" ,"LO_SUPPLYCOST":"95999" ,"LO_TAX":"1001" ,"LO_COMMITDATE":"1996-01-09" ,"LO_SHIPMODE": "AIR"}
    {"LO_ORDERKEY":"3" ,"LO_LINENUMBER":"1" ,"LO_CUSTKEY":"1" ,"LO_PARTKEY":"1" ,"LO_SUPPKEY":"1" ,"LO_ORDERDATE": "1996-01-02","LO_ORDERPRIORITY":"5-LOW" ,"LO_SHIPPRIORITY":"0" ,"LO_QUANTITY":"17" ,"LO_EXTENDEDPRICE":"6511" ,"LO_ORDTOTALPRICE":"9991" ,"LO_DISCOUNT":"4" ,"LO_REVENUE":"99999" ,"LO_SUPPLYCOST":"95999" ,"LO_TAX":"1001" ,"LO_COMMITDATE":"1996-01-09" ,"LO_SHIPMODE": "AIR"}

  4. 登录Doris集群中已安装好MySQL客户端的节点,执行如下命令进入Doris数据库。

    export LIBMYSQL_ENABLE_CLEARTEXT_PLUGIN=1 //若集群已启用Kerberos认证(安全模式)需执行该命令。
    mysql -u数据库登录用户 -p数据库登录用户密码 -P数据库连接端口 -hDoris FE实例IP地址
    • 数据库连接端口为Doris FE的查询连接端口,可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 实例”,查看任一FE实例的业务IP地址获取。
    • Doris FE实例IP地址可通过登录MRS Doris集群的Manager界面,选择“集群 > 服务 > Doris > 配置 > 全部配置”,搜索并查看“query_port”参数值获取。

  5. 执行以下命令查询新增的数据。

    use 数据库名称;
    select * from lineorder_u;

    例如查看到如下信息,表示查询新增的数据成功。

相关文档