从Kafka读取数据写入到Elasticsearch
本指导仅适用于Flink 1.12版本。
场景描述
本示例场景对用户购买商品的数据信息进行分析,将满足特定条件的数据结果进行汇总输出。购买商品数据信息为数据源发送到Kafka中,再将Kafka数据的分析结果输出到Elasticsearch中。
例如,输入如下样例数据:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0002", "user_name":"Jason", "area_id":"330106"}
DLI从Kafka读取数据写入Elasticsearch,在Elasticsearch集群的Kibana中查看相应结果。
前提条件
- 已创建DMS Kafka实例。
具体步骤可参考:DMS Kafka入门指引。
创建DMS Kafka实例时,不能开启Kafka SASL_SSL。
- 已创建Elasticsearch类型的CSS集群。
具体创建CSS集群的操作可以参考创建CSS集群。
本示例创建的CSS集群版本为:7.6.2,集群为非安全集群。
整体作业开发流程
步骤1:创建队列:创建DLI作业运行的队列。
步骤2:创建Kafka的Topic:创建Kafka生产消费数据的Topic。
步骤3:创建Elasticsearch搜索索引:创建Elasticsearch搜索索引用于接收结果数据。
步骤4:创建增强型跨源连接:DLI上创建连接Kafka和CSS的跨源连接,打通网络。
步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。
步骤6:发送数据和查询结果:Kafka上发送流数据,在CSS上查看运行结果。
步骤1:创建队列
- 登录DLI管理控制台,在左侧导航栏单击“资源管理 > 队列管理”,可进入队列管理页面。
- 在队列管理界面,单击界面右上角的“购买队列”。
- 在“购买队列”界面,填写具体的队列配置参数,具体参数填写参考如下。
- 计费模式:选择“包年/包月”或“按需计费”。本示例选择“按需计费”。
- 区域和项目:保持默认值即可。
- 名称:填写具体的队列名称。
新建的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。
队列名称不区分大小写,系统会自动转换为小写。
- 类型:队列类型选择“通用队列”。“按需计费”时需要勾选“专属资源模式”。
- AZ策略、CPU架构、规格:保持默认即可。
- 企业项目:当前选择为“default”。
- 高级选项:选择“自定义”。
- 网段:配置队列网段。例如,当前配置为10.0.0.0/16。
队列的网段不能和DMS Kafka、RDS MySQL实例的子网网段有重合,否则后续创建跨源连接会失败。
- 其他参数根据需要选择和配置。
- 参数配置完成后,单击“立即购买”,确认配置信息无误后,单击“提交”完成队列创建。
步骤2:创建Kafka的Topic
- 在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。
- 单击“Topic管理 > 创建Topic”,创建一个Topic。Topic配置参数如下:
- Topic名称。本示例输入为:testkafkatopic。
- 分区数:1。
- 副本数:1。
其他参数保持默认即可。
步骤3:创建Elasticsearch搜索索引
- 登录CSS管理控制台,选择“集群管理 > Elasticsearch”。
- 在集群管理界面,在已创建的CSS集群的“操作”列,单击“Kibana”访问集群。
- 在Kibana的左侧导航中选择“Dev Tools”,进入到Console界面。
- 在Console界面,执行如下命令创建索引“shoporders”。
PUT /shoporders { "settings": { "number_of_shards": 1 }, "mappings": { "properties": { "order_id": { "type": "text" }, "order_channel": { "type": "text" }, "order_time": { "type": "text" }, "pay_amount": { "type": "double" }, "real_pay": { "type": "double" }, "pay_time": { "type": "text" }, "user_id": { "type": "text" }, "user_name": { "type": "text" }, "area_id": { "type": "text" } } } }
步骤4:创建增强型跨源连接
- 创建DLI连接Kafka的增强型跨源连接
- 在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。
- 在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取获取该实例的“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
- 单击“网络”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选择:1,策略选择:允许,协议选择:TCP,端口值不填,类型:IPv4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。
- 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
- 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
- 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_kafka。
- 弹性资源池:选择步骤1:创建队列中已经创建的队列。
- 虚拟私有云:选择Kafka的虚拟私有云。
- 子网:选择Kafka的子网。
- 其他参数可以根据需要选择配置。
参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。
- 单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中添加的队列,在操作列,单击“更多 > 测试地址连通性”。
- 在“测试连通性”界面,根据中获取的Kafka连接信息,地址栏输入“Kafka内网地址:Kafka数据库端口”,单击“测试”测试DLI到Kafka网络是否可达。
- 创建DLI连接CSS的增强型跨源连接
- 在CSS管理控制台,选择“集群管理”,单击已创建的CSS集群名称,进入到CSS的基本信息页面。
- 在“基本信息”中获取CSS的“内网访问地址”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
- 单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选择:1,策略选择:允许,协议选择:TCP,端口值不填,类型:IPv4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。
- Kafka和CSS实例属于同一VPC和子网下?
- 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
- 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
- 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_css。
- 弹性资源池:选择步骤1:创建队列中已经创建的队列。
- 虚拟私有云:选择CSS的虚拟私有云。
- 子网:选择CSS的子网。
- 其他参数可以根据需要选择配置。
参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。
- 单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中添加的队列,在操作列,单击“更多 > 测试地址连通性”。
- 在“测试连通性”界面,根据2获取的CSS连接信息,地址栏输入“CSS内网地址:CSS内网端口”,单击“测试”测试DLI到CSS网络是否可达。
步骤5:运行作业
- 在DLI管理控制台,单击“作业管理 > Flink作业”,在Flink作业管理界面,单击“创建作业”。
- 在创建队列界面,类型选择“Flink OpenSource SQL”,名称填写为:FlinkKafkaES。单击“确定”,跳转到Flink作业编辑界面。
- 在Flink OpenSource SQL作业编辑界面,配置如下参数,其他参数默认即可。
- 所属队列:选择步骤1:创建队列中创建的队列。
- Flink版本:选择1.12。
- 保存作业日志:勾选。
- OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。
- 开启Checkpoint:勾选。
- Flink作业编辑框中输入具体的作业SQL,本示例作业参考如下。SQL中加粗的参数需要根据实际情况修改。
本示例使用的Flink版本为1.12,故Flink OpenSource SQL语法也是1.12。本示例数据源是Kafka,写入结果数据到Elasticsearch。
请参考Flink OpenSource SQL 1.12创建Kafka源表和Flink OpenSource SQL 1.12创建Elasticsearch结果表。
- 创建Kafka源表,将DLI和Kafka数据源进行链接。
CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) with ( "connector" = "kafka", "properties.bootstrap.servers" = "10.128.0.120:9092,10.128.0.89:9092,10.128.0.83:9092",--替换为kafka的内网连接地址和端口 "properties.group.id" = "click", "topic" = "testkafkatopic", --创建的Kafka Topic "format" = "json", "scan.startup.mode" = "latest-offset" );
- 创建Elasticsearch结果表,将DLI分析后的数据的结果展示在Elasticsearch结果表上。
CREATE TABLE elasticsearchSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = '192.168.168.125:9200', --替换为CSS集群的内网地址和端口 'index' = 'shoporders' --创建的Elasticsearch搜索引擎 ); --将Kafka数据写入到Elasticsearch索引中 insert into elasticsearchSink select * from kafkaSource;
- 单击“语义校验”确保SQL语义校验成功。单击“保存”,保存作业。单击“启动”,启动作业,确认作业参数信息,单击“立即启动”开始执行作业。等待作业运行状态变为“运行中”。
步骤6:发送数据和查询结果
- Kafaka端发送数据。
使用Kafka客户端向步骤2:创建Kafka的Topic中的Topic发送数据,模拟实时数据流。
Kafka生产和发送数据的方法请参考:DMS - 连接实例生产消费信息。
发送样例数据如下:
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0002", "user_name":"Jason", "area_id":"330106"}
- 查看Elasticsearch端数据处理后的相应结果。
查询结果返回如下:
{ "took" : 0, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 2, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "shoporders", "_type" : "_doc", "_id" : "6fswzIAByVjqg3_qAyM1", "_score" : 1.0, "_source" : { "order_id" : "202103241000000001", "order_channel" : "webShop", "order_time" : "2021-03-24 10:00:00", "pay_amount" : 100.0, "real_pay" : 100.0, "pay_time" : "2021-03-24 10:02:03", "user_id" : "0001", "user_name" : "Alice", "area_id" : "330106" } }, { "_index" : "shoporders", "_type" : "_doc", "_id" : "6vs1zIAByVjqg3_qyyPp", "_score" : 1.0, "_source" : { "order_id" : "202103241606060001", "order_channel" : "appShop", "order_time" : "2021-03-24 16:06:06", "pay_amount" : 200.0, "real_pay" : 180.0, "pay_time" : "2021-03-24 16:10:06", "user_id" : "0002", "user_name" : "Jason", "area_id" : "330106" } } ] } }