文档首页/ 表格存储服务 CloudTable/ 最佳实践/ 数据导入/ 使用DLI Flink作业实时同步MRS Kafka数据至CloudTable HBase集群
更新时间:2025-07-31 GMT+08:00
分享

使用DLI Flink作业实时同步MRS Kafka数据至CloudTable HBase集群

此章节为您介绍数据实时同步的最佳实践,通过数据湖探索服务DLI Flink作业将MRS kafka数据实时同步给HBase,实现Kafka实时入库到HBase的过程。

图1 数据同步流程图

使用限制

  • MRS集群未开启Kerberos认证。
  • 为了确保网络连通,MRS集群必须与CloudTable集群的安全组、区域、VPC、子网保持一致。
  • MRS与CloudTable安全组入方向添加DLI队列弹性资源网段,建立跨源连接,请参见创建增强型跨源连接
  • 必须打通DLI上下游的网络连通性,请参考测试地址连通性

准备工作

步骤一:创建CloudTable HBase集群

  1. 登录表格存储服务控制台,创建CloudTable HBase集群
  2. 创建ECS
  3. 安装客户端
  4. 启动Shell访问集群。执行“bin/hbase shell”,启动Shell访问集群。
  5. 创建order表。
    create 'order', {NAME => 'detail'}

步骤二:MRS集群中创建Flink作业制造数据

  1. 创建MRS集群
  2. 登录Manager,选择“集群 > Flink > 概览”,进入概览页面。
  3. 单击“Flink WebUI”右侧的链接,访问Flink WebUI。
  4. 在MRS Flink WebUI中创建Flink任务产生数据。
    1. 单击作业管理中的“新建作业”,弹出新建作业页面。
    2. 填写参数,单击“确定”,建立Flink SQL作业。如果修改SQL,单击操作列的“开发”,进入SQL页面添加以下命令。

      ip:port获取ip地址和端口。

      • ip地址获取:进入集群的Manager页面,单击“集群 > Kafka > 实例 > 管理IP(Broker)”,可获取IP地址。
      • port获取:单击配置,进入配置页面,搜索“port”,获取端口(该port是Broker服务监听的PLAINTEXT协议端口号)。
      • 建议properties.bootstrap.servers参数添加多个ip:port,防止kafka实例网络不稳定或其他原因宕机,导致作业运行失败。

      SQL语句示例:

      CREATE TABLE IF NOT EXISTS `lineorder_hbase` (
      `order_id` string,
      `order_channel` string,
      `order_time` string,
      `pay_amount` double,
      `real_pay` double,
      `pay_time` string,
      `user_id` string,
      `user_name` string,
      `area_id` string
      ) WITH (
      'connector' = 'kafka',
      'topic' = 'test_flink',
      'properties.bootstrap.servers' = 'ip:port',
      'value.format' = 'json',
      'properties.sasl.kerberos.service.name' = 'kafka'
      );
      CREATE TABLE lineorder_datagen (
      `order_id` string,
      `order_channel` string,
      `order_time` string,
      `pay_amount` double,
      `real_pay` double,
      `pay_time` string,
      `user_id` string,
      `user_name` string,
      `area_id` string
      ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1000'
      );
      INSERT INTO
      lineorder_hbase
      SELECT
      *
      FROM
      lineorder_datagen;
    3. 回到作业管理界面,单击操作列的“启动”。作业状态为“运行中”表示作业运行成功。

步骤三:创建DLI Flink作业进行数据同步

  1. 创建弹性资源池和队列,请参见“创建弹性资源池并添加队列”章节。
  2. 创建跨源连接,请参见创建增强型跨源连接
  3. 分别测试DLI与上游MRS Kafka和下游CloudTable HBase的连通性。
    1. 弹性资源池和队列创建后,单击“资源管理 > 队列管理”,进入队列管理界面测试地址连通性,请参见测试地址连通性
    2. 获取上游IP地址和端口:进入集群的Manager页面,单击“集群 > Kafka > 实例 > 管理IP(Broker)”,可获取IP地址。单击配置,进入配置页面,搜索“port”,获取端口(该port是Broker服务监听的PLAINTEXT协议端口号)。
    3. 获取下游ip地址和端口。
      1. 获取ip:进入集群“详情页 > 集群信息 > ZK链接地址(内网)”获取域名,执行以下命令解析ip地址。
        ping 访问域名
      2. 获取端口:进入集群“详情页 > 集群信息 > ZK链接地址(内网)”获取端口。
  4. 创建Flink作业,请参见使用DLI提交作业Flink作业
  5. 选择4中创建的Flink作业,单击操作列的“编辑”,添加SQL进行数据同步。
    CREATE TABLE orders (
    order_id string,
    order_channel string,
    order_time string,
    pay_amount double,
    real_pay double,
    pay_time string,
    user_id string,
    user_name string,
    area_id string
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'test_flink',
    'properties.bootstrap.servers' = 'ip:port',
    'properties.group.id' = 'testGroup_1',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
    );
    create table hbaseSink(
    order_id string,
    detail Row(
    order_channel string,
    order_time string,
    pay_amount double,
    real_pay double,
    pay_time string,
    user_id string,
    user_name string,
    area_id string)
    ) with (
    'connector' = 'hbase-2.2',
    'table-name' = 'order',
    'zookeeper.quorum' = 'ip:port',
    'sink.buffer-flush.max-rows' = '1'
    );
    insert into hbaseSink select order_id, Row(order_channel,order_time,pay_amount,real_pay,pay_time,user_id,user_name,area_id) from orders;
  6. 单击“格式化”,再单击“保存”。

    请务必先单击“格式化”将SQL代码进行格式化处理,否则可能会因为代码复制和粘贴操作过程中引入新的空字符,而导致作业执行失败。

  7. 回到DLI控制台首页,单击左侧“作业管理 > Flink作业”。
  8. 启动4中创建的作业,单击操作列的“启动 > 立即启动”。作业状态为“运行中”表示作业运行成功。

步骤四:结果验证

  1. 待MRS Flink任务和DLI任务运行成功后,回到HBase集群运行命令的窗口,启动下游HBase shell客户端。
    scan 'order'
  2. 可以看到数据源持续更新。

相关文档