- 最新动态
- 产品介绍
- 计费说明
- 快速入门
-
用户指南
- HBase用户指南
- Doris用户指南
- ClickHouse用户指南
- 权限管理
- 审计日志
- 集群日志管理
- 最佳实践
- 开发指南
- API参考
- SDK参考
-
常见问题
-
通用类
- CloudTable集群能够提供什么服务?
- 为什么要选择CloudTable服务?
- 创建CloudTable HBase集群要准备什么?
- 使用CloudTable服务时需要关注什么?
- CloudTable HBase集群支持哪些压缩算法?
- 能停止CloudTable服务吗?
- CloudTable中的HBase外部接口支持哪些编程语言?
- 故障RegionServer个数怎么判断?
- CloudTable HBase支持的特殊符号?
- CloudTable数据进行删除,导致索引表和数据表不对应查询异常处理办法?
- python通过thrift访问cloudtable,多个任务并行缓慢
- 如何查看HBase shell的TTL属性?
- 服务器资源为什么会被释放?
- 资源停止服务或逾期释放说明
- 哪些场景会影响数据均衡?
- 如何调整数据均衡的灵敏度,调整后有哪些影响?
- Doris集群回收站数据处理
- 连接访问类
- 数据读写类
- 数据导入
- 网络配置
- 计费类
-
通用类
- 文档下载
- 通用参考
本文导读
链接复制成功!
使用DLI Flink作业实时同步MRS Kafka数据至CloudTable ClickHouse集群
此章节为您介绍数据实时同步的最佳实践,通过数据湖探索服务DLI Flink作业将MRS kafka任务制造数据实时同步给ClickHouse,实现Kafka实时入库到ClickHouse的过程。
使用限制
准备工作
- 已注册华为账号并开通华为云,具体请参见注册华为账号并开通华为云,且在使用CloudTable前检查账号状态,账号不能处于欠费或冻结状态。
- 已创建虚拟私有云和子网,参见创建虚拟私有云和子网。
步骤一:创建CloudTable ClickHouse集群
- 登录表格存储服务控制台,创建非安全ClickHouse集群。
- 下载客户端和客户端校验文件。
- 准备弹性云服务。
- 安装客户端并校验客户端。
- 建立flink数据库。
create database flink;
使用flink数据库。
use flink;
- 创建flink.order表。
create table flink.order(order_id String,order_channel String,order_time String,pay_amount Float64,real_pay Float64,pay_time String,user_id String,user_name String,area_id String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/flink/order', '{replica}')ORDER BY order_id;
- 查看表是否创建成功。
select * from flink.order;
步骤二:MRS集群中创建Flink作业制造数据
- 创建MRS集群。
- 登录Manager,选择“集群 > Flink > 概览”,进入概览页面。
- 单击“Flink WebUI”右侧的链接,访问Flink WebUI。
- 在MRS Flink WebUI中创建Flink任务产生数据。
- 单击作业管理中的“新建作业”,弹出新建作业页面。
- 填写参数,单击“确定”,建立Flink SQL作业。如果修改SQL,单击操作列的“开发”,进入SQL页面添加以下命令。
说明:
ip:port获取ip地址和端口:
- ip地址获取:进入集群的Manager页面,单击“集群 > Kafka > 实例 > 管理IP(Broker)”,可获取ip地址。
- port获取:单击配置,进入配置页面,搜索“port”,获取端口(该port是Broker服务监听的PLAINTEXT协议端口号)。
- 建议properties.bootstrap.servers参数添加多个ip:port,防止kafka实例网络不稳定或其他原因宕机,导致作业运行失败。
SQL语句示例:
CREATE TABLE IF NOT EXISTS `lineorder_ck` ( `order_id` string, `order_channel` string, `order_time` string, `pay_amount` double, `real_pay` double, `pay_time` string, `user_id` string, `user_name` string, `area_id` string ) WITH ( 'connector' = 'kafka', 'topic' = 'test_flink', 'properties.bootstrap.servers' = 'ip:port', 'value.format' = 'json', 'properties.sasl.kerberos.service.name' = 'kafka' ); CREATE TABLE lineorder_datagen ( `order_id` string, `order_channel` string, `order_time` string, `pay_amount` double, `real_pay` double, `pay_time` string, `user_id` string, `user_name` string, `area_id` string ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1000' ); INSERT INTO lineorder_ck SELECT * FROM lineorder_datagen;
- 回到作业管理界面,单击操作列的“启动”。作业状态为“运行中”表示作业运行成功。
步骤三:创建DLI Flink任务进行数据同步
- 创建弹性资源和队列,请参见“创建弹性资源池并添加队列”章节。
- 创建跨源连接,请参见创建增强型跨源连接。
- 分别测试DLI与上游MRS Kafka和下游CloudTable HBase的连通性。
- 弹性资源和队列创建后,单击“资源管理 > 队列管理”,进入队列管理界面测试地址连通性,请参见测试地址连通性。
- 获取上游IP地址和端口:进入集群的Manager页面,单击“集群 > Kafka > 实例 > 管理IP(Broker)”,可获取IP地址。单击配置,进入配置页面,搜索“port”,获取端口(该port是Broker服务监听的PLAINTEXT协议端口号)。
- 获取下游ip地址和端口:进入集群详情页可查看节点ip和端口。
- 创建Flink作业,请参见使用DLI提交作业Flink作业。
- 选择1中创建的Flink作业,单击操作列的“编辑”,添加SQL进行数据同步。
create table orders ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'test_flink', 'properties.bootstrap.servers' = 'ip:port', 'properties.group.id' = 'testGroup_1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create table clickhouseSink( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) with ( 'connector' = 'clickhouse', 'url' = 'jdbc:clickhouse://ip:port/flink', 'username' = 'admin', 'password' = '****', 'table-name' = 'order', 'sink.buffer-flush.max-rows' = '10', 'sink.buffer-flush.interval' = '3s' ); insert into clickhouseSink select * from orders;
- 单击“格式化”,再单击“保存”。
须知:
请务必先单击“格式化”将SQL代码进行格式化处理,否则可能会因为代码复制和粘贴操作过程中引入新的空字符,而导致作业执行失败。
- 回到DLI控制台首页,单击左侧“作业管理 > Flink作业”。
- 启动1中创建的作业,单击操作列的“启动 > 立即启动”。作业状态为“运行中”表示作业运行成功。
父主题: 数据导入