使用DLI Flink作业实时同步MRS Kafka数据至CloudTable HBase集群
此章节为您介绍数据实时同步的最佳实践,通过数据湖探索服务DLI Flink作业将MRS kafka数据实时同步给HBase,实现Kafka实时入库到HBase的过程。

使用限制
准备工作
- 已注册华为账号并开通华为云,具体请参见注册华为账号并开通华为云,且在使用CloudTable前检查账号状态,账号不能处于欠费或冻结状态。
- 已创建虚拟私有云和子网,参见创建虚拟私有云和子网。
步骤一:创建CloudTable HBase集群
- 登录表格存储服务控制台,创建CloudTable HBase集群。
- 创建ECS。
- 安装客户端。
- 启动Shell访问集群。执行“bin/hbase shell”,启动Shell访问集群。
- 创建order表。
create 'order', {NAME => 'detail'}
步骤二:MRS集群中创建Flink作业制造数据
- 创建MRS集群。
- 登录Manager,选择“集群 > Flink > 概览”,进入概览页面。
- 单击“Flink WebUI”右侧的链接,访问Flink WebUI。
- 在MRS Flink WebUI中创建Flink任务产生数据。
- 单击作业管理中的“新建作业”,弹出新建作业页面。
- 填写参数,单击“确定”,建立Flink SQL作业。如果修改SQL,单击操作列的“开发”,进入SQL页面添加以下命令。
ip:port获取ip地址和端口。
- ip地址获取:进入集群的Manager页面,单击“集群 > Kafka > 实例 > 管理IP(Broker)”,可获取IP地址。
- port获取:单击配置,进入配置页面,搜索“port”,获取端口(该port是Broker服务监听的PLAINTEXT协议端口号)。
- 建议properties.bootstrap.servers参数添加多个ip:port,防止kafka实例网络不稳定或其他原因宕机,导致作业运行失败。
SQL语句示例:
CREATE TABLE IF NOT EXISTS `lineorder_hbase` ( `order_id` string, `order_channel` string, `order_time` string, `pay_amount` double, `real_pay` double, `pay_time` string, `user_id` string, `user_name` string, `area_id` string ) WITH ( 'connector' = 'kafka', 'topic' = 'test_flink', 'properties.bootstrap.servers' = 'ip:port', 'value.format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka' ); CREATE TABLE lineorder_datagen ( `order_id` string, `order_channel` string, `order_time` string, `pay_amount` double, `real_pay` double, `pay_time` string, `user_id` string, `user_name` string, `area_id` string ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1000' ); INSERT INTO lineorder_hbase SELECT * FROM lineorder_datagen;
- 回到作业管理界面,单击操作列的“启动”。作业状态为“运行中”表示作业运行成功。
步骤三:创建DLI Flink作业进行数据同步
- 创建弹性资源池和队列,请参见“创建弹性资源池并添加队列”章节。
- 创建跨源连接,请参见创建增强型跨源连接。
- 分别测试DLI与上游MRS Kafka和下游CloudTable HBase的连通性。
- 弹性资源池和队列创建后,单击“资源管理 > 队列管理”,进入队列管理界面测试地址连通性,请参见测试地址连通性。
- 获取上游IP地址和端口:进入集群的Manager页面,单击“集群 > Kafka > 实例 > 管理IP(Broker)”,可获取IP地址。单击配置,进入配置页面,搜索“port”,获取端口(该port是Broker服务监听的PLAINTEXT协议端口号)。
- 获取下游ip地址和端口。
- 获取ip:进入集群“详情页 > 集群信息 > ZK链接地址(内网)”获取域名,执行以下命令解析ip地址。
ping 访问域名
- 获取端口:进入集群“详情页 > 集群信息 > ZK链接地址(内网)”获取端口。
- 获取ip:进入集群“详情页 > 集群信息 > ZK链接地址(内网)”获取域名,执行以下命令解析ip地址。
- 创建Flink作业,请参见使用DLI提交作业Flink作业。
- 选择4中创建的Flink作业,单击操作列的“编辑”,添加SQL进行数据同步。
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'test_flink', 'properties.bootstrap.servers' = 'ip:port', 'properties.group.id' = 'testGroup_1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table hbaseSink( order_id string, detail Row( order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string) ) with ( 'connector' = 'hbase-2.2', 'table-name' = 'order', 'zookeeper.quorum' = 'ip:port', 'sink.buffer-flush.max-rows' = '1' ); insert into hbaseSink select order_id, Row(order_channel,order_time,pay_amount,real_pay,pay_time,user_id,user_name,area_id) from orders;
- 单击“格式化”,再单击“保存”。
请务必先单击“格式化”将SQL代码进行格式化处理,否则可能会因为代码复制和粘贴操作过程中引入新的空字符,而导致作业执行失败。
- 回到DLI控制台首页,单击左侧“作业管理 > Flink作业”。
- 启动4中创建的作业,单击操作列的“启动 > 立即启动”。作业状态为“运行中”表示作业运行成功。