表格存储服务 CloudTable
表格存储服务 CloudTable
- 最新动态
- 产品介绍
- 计费说明
- 快速入门
-
用户指南
- HBase用户指南
- Doris用户指南
- ClickHouse用户指南
- 权限管理
- 审计日志
- 集群日志管理
- 最佳实践
- 开发指南
- API参考
- SDK参考
-
常见问题
-
通用类
- CloudTable集群能够提供什么服务?
- 为什么要选择CloudTable服务?
- 创建CloudTable HBase集群要准备什么?
- 使用CloudTable服务时需要关注什么?
- CloudTable HBase集群支持哪些压缩算法?
- 能停止CloudTable服务吗?
- CloudTable中的HBase外部接口支持哪些编程语言?
- 故障RegionServer个数怎么判断?
- CloudTable HBase支持的特殊符号?
- CloudTable数据进行删除,导致索引表和数据表不对应查询异常处理办法?
- python通过thrift访问cloudtable,多个任务并行缓慢
- 如何查看HBase shell的TTL属性?
- 服务器资源为什么会被释放?
- 资源停止服务或逾期释放说明
- 哪些场景会影响数据均衡?
- 如何调整数据均衡的灵敏度,调整后有哪些影响?
- Doris集群回收站数据处理
- 连接访问类
- 数据读写类
- 数据导入
- 网络配置
- 计费类
-
通用类
- 文档下载
- 通用参考
更新时间:2025-01-15 GMT+08:00
链接复制成功!
使用DLI Flink作业实时同步MRS Kafka数据至CloudTable HBase集群
此章节为您介绍数据实时同步的最佳实践,通过数据湖探索服务DLI Flink作业将MRS kafka数据实时同步给HBase,实现Kafka实时入库到HBase的过程。
使用限制
准备工作
- 已注册华为账号并开通华为云,具体请参见注册华为账号并开通华为云,且在使用CloudTable前检查账号状态,账号不能处于欠费或冻结状态。
- 已创建虚拟私有云和子网,参见创建虚拟私有云和子网。
步骤一:创建CloudTable HBase集群
- 登录表格存储服务控制台,创建CloudTable HBase集群。
- 创建ECS,请参考准备弹性云服务。
- 安装客户端。
- 启动Shell访问集群。执行“bin/hbase shell”,启动Shell访问集群。
- 创建order表。
create 'order', {NAME => 'detail'}
步骤二:MRS集群中创建Flink作业制造数据
- 创建MRS集群。
- 登录Manager,选择“集群 > Flink > 概览”,进入概览页面。
- 单击“Flink WebUI”右侧的链接,访问Flink WebUI。
- 在MRS Flink WebUI中创建Flink任务产生数据。
- 单击作业管理中的“新建作业”,弹出新建作业页面。
- 填写参数,单击“确定”,建立Flink SQL作业。如果修改SQL,单击操作列的“开发”,进入SQL页面添加以下命令。
ip:port获取ip地址和端口。
- ip地址获取:进入集群的Manager页面,单击“集群 > Kafka > 实例 > 管理IP(Broker)”,可获取ip地址。
- port获取:单击配置,进入配置页面,搜索“port”,获取端口(该port是Broker服务监听的PLAINTEXT协议端口号)。
- 建议properties.bootstrap.servers参数添加多个ip:port,防止kafka实例网络不稳定或其他原因宕机,导致作业运行失败。
SQL语句示例:
CREATE TABLE IF NOT EXISTS `lineorder_hbase` ( `order_id` string, `order_channel` string, `order_time` string, `pay_amount` double, `real_pay` double, `pay_time` string, `user_id` string, `user_name` string, `area_id` string ) WITH ( 'connector' = 'kafka', 'topic' = 'test_flink', 'properties.bootstrap.servers' = 'ip:port', 'value.format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka' ); CREATE TABLE lineorder_datagen ( `order_id` string, `order_channel` string, `order_time` string, `pay_amount` double, `real_pay` double, `pay_time` string, `user_id` string, `user_name` string, `area_id` string ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1000' ); INSERT INTO lineorder_hbase SELECT * FROM lineorder_datagen;
- 回到作业管理界面,单击操作列的“启动”。作业状态为“运行中”表示作业运行成功。
步骤三:创建DLI Flink作业进行数据同步
- 创建弹性资源和队列,请参见“创建弹性资源池并添加队列”章节。
- 创建跨源连接,请参见创建增强型跨源连接。
- 分别测试DLI与上游MRS Kafka和下游CloudTable HBase的连通性。
- 弹性资源和队列创建后,单击“资源管理 > 队列管理”,进入队列管理界面测试地址连通性,请参见测试地址连通性。
- 获取上游IP地址和端口:进入集群的Manager页面,单击“集群 > Kafka > 实例 > 管理IP(Broker)”,可获取IP地址。单击配置,进入配置页面,搜索“port”,获取端口(该port是Broker服务监听的PLAINTEXT协议端口号)。
- 获取下游ip地址和端口。
- 获取ip:进入集群详情页 > 集群信息 > ZK链接地址(内网)获取域名,执行以下命令解析ip地址。
ping 访问域名
- 获取端口:进入集群详情页 > 集群信息 > ZK链接地址(内网)获取端口。
- 获取ip:进入集群详情页 > 集群信息 > ZK链接地址(内网)获取域名,执行以下命令解析ip地址。
- 创建Flink作业,请参见使用DLI提交作业Flink作业。
- 选择1中创建的Flink作业,单击操作列的“编辑”,添加SQL进行数据同步。
CREATE TABLE orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'test_flink', 'properties.bootstrap.servers' = 'ip:port', 'properties.group.id' = 'testGroup_1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table hbaseSink( order_id string, detail Row( order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string) ) with ( 'connector' = 'hbase-2.2', 'table-name' = 'order', 'zookeeper.quorum' = 'ip:port', 'sink.buffer-flush.max-rows' = '1' ); insert into hbaseSink select order_id, Row(order_channel,order_time,pay_amount,real_pay,pay_time,user_id,user_name,area_id) from orders;
- 单击“格式化”,再单击“保存”。
请务必先单击“格式化”将SQL代码进行格式化处理,否则可能会因为代码复制和粘贴操作过程中引入新的空字符,而导致作业执行失败。
- 回到DLI控制台首页,单击左侧“作业管理 > Flink作业”。
- 启动1中创建的作业,单击操作列的“启动 > 立即启动”。作业状态为“运行中”表示作业运行成功。
父主题: 数据导入