文档首页/ 表格存储服务 CloudTable/ 最佳实践/ 数据导入/ 使用DLI Flink作业实时同步MRS Kafka数据至CloudTable Doris集群
更新时间:2025-07-31 GMT+08:00
分享

使用DLI Flink作业实时同步MRS Kafka数据至CloudTable Doris集群

此章节为您介绍数据实时同步的最佳实践,通过数据湖探索服务DLI Flink作业将MRS kafka数据实时同步给Doris,实现Kafka实时入库到Doris的过程。

图1 数据同步流程图

使用限制

  • MRS集群未开启Kerberos认证。
  • 为了确保网络连通,MRS集群必须与CloudTable集群的安全组、区域、VPC、子网保持一致。
  • MRS与CloudTable安全组入方向添加DLI队列弹性资源网段,建立跨源连接,请参见创建增强型跨源连接
  • 必须打通DLI上下游的网络连通性,请参考测试地址连通性

准备工作

步骤一:创建CloudTable Doris集群

  1. 登录表格存储服务控制台,创建Doris集群
  2. 准备弹性云服务
  3. 安装客户端并连接集群。
  4. 创建数据库。
    create database flink;
  5. 使用数据库。
    use flink;
  6. 创建表。
    create table flink.order_doris(
    order_id varchar,
    order_channel varchar,
    order_time varchar,
    pay_amount decimal,
    real_pay decimal,
    pay_time varchar,
    user_id varchar,
    user_name varchar,
    area_id varchar
    ) ENGINE =OLAP
    COMMENT 'OLAP'
    DISTRIBUTED BY HASH(`order_id`) BUCKETS AUTO
    PROPERTIES (
    'replication_num'='3'
    );
  7. 查看FE节点状态。
    show frontends;

步骤二:MRS集群中创建Flink作业制造数据

  1. 创建MRS集群
  2. 登录Manager,选择“集群 > Flink > 概览”,进入概览页面。
  3. 单击“Flink WebUI”右侧的链接,访问Flink WebUI。
  4. 在MRS Flink WebUI中创建Flink任务产生数据。
    1. 单击作业管理中的“新建作业”,弹出新建作业页面。
    2. 填写参数,单击“确定”,建立Flink SQL作业。如果修改SQL,单击操作列的“开发”,进入SQL页面修改命令。

      获取IP地址和端口:进入集群的Manager页面,单击“集群 > Kafka > 配置 > 全部配置”进入全部配置页面,在页面右上角搜索框搜索“default.bootstrap.severs”参数可获取IP和端口。

      SQL语句示例:

      CREATE TABLE IF NOT EXISTS `lineorder_doris` (
        `order_id` string,
        `order_channel` string,
        `order_time` string,
        `intercept_flag` int,
        `pay_amount` double,
        `real_pay` double,
        `pay_time` string,
        `user_id` string,
        `user_name` string,
        `area_id` string
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'test_flink',
        'properties.bootstrap.servers' = 'ip:port',
        'value.format' = 'json',
        'properties.sasl.kerberos.service.name' = 'kafka'
      );
      CREATE TABLE lineorder_datagen (
        `order_id` string,
        `order_channel` string,
        `order_time` string,
        `intercept_flag` int,
        `pay_amount` double,
        `real_pay` double,
        `pay_time` string,
        `user_id` string,
        `user_name` string,
        `area_id` string
      ) WITH (
        'connector' = 'datagen',
        'fields.intercept_flag.kind' = 'sequence',
        'fields.intercept_flag.start' = '1',
        'fields.intercept_flag.end' = '2000000',
        'rows-per-second' = '1000'
      );
      INSERT INTO
        lineorder_doris
      SELECT
        *
      from
        lineorder_datagen;
    3. 回到作业管理界面,单击操作列的“启动”。作业状态为“运行中”表示作业运行成功。

步骤三:创建DLI Flink作业进行数据同步

  1. 创建弹性资源池和队列,请参见“创建弹性资源池并添加队列”章节。
  2. 创建跨源连接,请参见创建增强型跨源连接
  3. 分别测试DLI与上游MRS Kafka和下游CloudTable Doris的连通性。
    1. 弹性资源和队列创建后,单击“资源管理 > 队列管理”,进入队列管理界面测试地址连通性,请参见测试地址连通性
    2. 获取上游IP地址和端口:进入集群的Manager页面,单击“集群 > Kafka > 配置 > 全部配置”进入全部配置页面,在页面右上角搜索框搜索“default.bootstrap.severs”参数可获取IP和端口(该port是Broker服务监听的PLAINTEXT协议端口号)。
    3. 获取下游IP地址和端口:进入目标集群详情页可查看节点IP和端口。
  4. 创建Flink作业,请参见使用DLI提交作业Flink作业
  5. 选择4中创建的Flink作业,单击操作列的“编辑”,添加SQL进行数据同步。
    create table orders (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      intercept_flag int,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'test_flink',
      'properties.bootstrap.servers' = 'ip:port',
      'properties.group.id' = 'testGroup',
      'format' = 'json',
      'json.fail-on-missing-field' = 'false',  --这个参数表示在json数据中如果某个字段不存在时不会报错,而是会返回null值。
      'json.ignore-parse-errors' = 'true',    --这个参数表示在解析json数据时如果出现错误(比如格式不正确)时会忽略这条数据,而不会报错。
      'scan.topic-partition-discovery.interval' = '300000',   --这个参数表示flink会每隔300秒(5分钟)自动检查一次Kafka中的topic和partition信息,以便及时发现新增或删除的topic和partition
      'scan.startup.mode' = 'latest-offset'   ---这个参数表示flink会从Kafka中最早的offset开始读取数据,即从最开始的数据开始处理。
    );
    
    create table dorisSink(
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) with (
      'connector' = 'doris',
      'fenodes' = 'ip:port',
      'password' = '***',
      'username' = '***',
      'table.identifier' = 'flink.order_doris',  --Doris 数据库中的哪个表
      --'sink.properties.strip_outer_array' = 'true',   --这个配置项表示将输出的 JSON 数据中的外层数组去掉,只输出 JSON 对象
      --'sink.properties.format' = 'json',
      --'doris.enable.https' = 'true',
      --'doris.ignore.https.ca' = 'true',
      --'sink.batch.interval' = '1s',   ---这个配置项表示每隔150秒批量输出一次数据flink1.12有这个参数
      'sink.label-prefix' = 'doris_label_118_061',
      'doris.batch.size' = '10', --flink1.12 为sink.batch.size
      'sink.max-retries'='10'   --这个配置项表示在数据输出失败时最多重试10次
    );
    insert into 
    dorisSink 
    select 
    order_id,
    order_channel,
    order_time,
    pay_amount,
    real_pay,
    pay_time,
    user_id,
    user_name,
    area_id 
    from orders;
  6. 单击“格式化”,再单击“保存”。

    请务必先单击“格式化”将SQL代码进行格式化处理,否则可能会因为代码复制和粘贴操作过程中引入新的空字符,而导致作业执行失败。

  7. 回到DLI控制台首页,单击左侧“作业管理 > Flink作业”。
  8. 启动4中创建的作业,单击操作列的“启动 > 立即启动”。作业状态为“运行中”表示作业运行成功。

步骤四:结果验证

  1. 待MRS Flink任务和DLI任务运行成功后,返回Doris集群运行命令的窗口,进入集群客户端。
  2. 查看Doris表中的数据是否和source_temp_table表中的一致。
    select * from order_doris;

相关文档