更新时间:2024-06-06 GMT+08:00

从Kafka读取数据写入到RDS

本指导仅适用于Flink 1.12版本。

场景描述

该场景为根据商品的实时点击量,获取每小时内点击量最高的3个商品及其相关信息。商品的实时点击量数据为输入源发送到Kafka中,再将Kafka数据的分析结果输出到RDS中。

例如,输入如下样例数据:

{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:01:00", "product_id":"0002", "product_name":"name1"}
{"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:02:00", "product_id":"0002", "product_name":"name1"}
{"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:06:00", "product_id":"0004", "product_name":"name2"}
{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:10:00", "product_id":"0003", "product_name":"name3"}
{"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:15:00", "product_id":"0005", "product_name":"name4"}
{"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:16:00", "product_id":"0005", "product_name":"name4"}
{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:56:00", "product_id":"0004", "product_name":"name2"}
{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:05:00", "product_id":"0005", "product_name":"name4"} 
{"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:10:00", "product_id":"0006", "product_name":"name5"}
{"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 09:13:00", "product_id":"0006", "product_name":"name5"}
预期输出:
2021-03-24 08:00:00 - 2021-03-24 08:59:59,0002,name1,2
2021-03-24 08:00:00 - 2021-03-24 08:59:59,0004,name2,2
2021-03-24 08:00:00 - 2021-03-24 08:59:59,0005,name4,2
2021-03-24 09:00:00 - 2021-03-24 09:59:59,0006,name5,2
2021-03-24 09:00:00 - 2021-03-24 09:59:59,0005,name4,1

前提条件

  1. 已创建DMS Kafka实例。

    具体步骤可参考:DMS Kafka入门指引

    创建DMS Kafka实例时,不能开启Kafka SASL_SSL

  2. 已创建RDS MySQL实例。

    本示例创建的RDS MySQL数据库版本选择为:8.0。

    具体步骤可参考:RDS MySQL快速入门

整体作业开发流程

整体作业开发流程参考图1
图1 作业开发流程

步骤1:创建队列:创建DLI作业运行的队列。

步骤2:创建Kafka的Topic:创建Kafka生产消费数据的Topic。

步骤3:创建RDS数据库和表:创建RDS MySQL数据库和表信息。

步骤4:创建增强型跨源连接:DLI上创建连接Kafka和RDS的跨源连接,打通网络。

步骤5:运行作业:DLI上创建和运行Flink OpenSource作业。

步骤6:发送数据和查询结果:Kafka上发送流数据,在RDS上查看运行结果。

步骤1:创建队列

  1. 登录DLI管理控制台,在左侧导航栏单击“资源管理 > 队列管理”,可进入队列管理页面。
  2. 在队列管理界面,单击界面右上角的“购买队列”。
  3. 在“购买队列”界面,填写具体的队列配置参数,具体参数填写参考如下。
    • 计费模式:选择“按需计费”。
    • 区域和项目:保持默认值即可。
    • 名称:填写具体的队列名称。

      新建的队列名称,名称只能包含数字、英文字母和下划线,但不能是纯数字,且不能以下划线开头。长度限制:1~128个字符。

      队列名称不区分大小写,系统会自动转换为小写。

    • 类型:队列类型选择“通用队列”。“按需计费”时需要勾选“专属资源模式”
    • AZ策略、规格:保持默认即可。
    • 企业项目:当前选择为“default”。
    • 高级选项:选择“自定义”。
    • 网段:配置队列网段。例如,当前配置为10.0.0.0/16。

      队列的网段不能和DMS Kafka、RDS MySQL实例的子网网段有重合,否则后续创建跨源连接会失败。

    • 其他参数根据需要选择和配置。
  4. 参数配置完成后,单击“立即购买”,确认配置信息无误后,单击“提交”完成队列创建。

步骤2:创建Kafka的Topic

  1. 登录Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka实例名称,进入到Kafka实例的基本信息页面。
  2. 单击“Topic管理 > 创建Topic”,创建一个Topic。Topic配置参数如下:
    • Topic名称。本示例输入为:testkafkatopic。
    • 分区数:1。
    • 副本数:1。

    其他参数保持默认即可。

步骤3:创建RDS数据库和表

  1. 登录RDS管理控制台,在“实例管理”界面,选择已创建的RDS MySQL实例,选择操作列的“更多 > 登录”,进入数据管理服务实例登录界面。
  2. 输入实例登录的用户名和密码。单击“登录”,即可进入RDS MySQL数据库并进行管理。
  3. 在数据库实例界面,单击“新建数据库”,数据库名定义为:testrdsdb,字符集保持默认即可。
  4. 在已创建的数据库的操作列,单击“SQL查询”,输入以下创建表语句,创建RDS MySQL表。
    CREATE TABLE clicktop (
    	`range_time` VARCHAR(64) NOT NULL,
    	`product_id` VARCHAR(32) NOT NULL,
    	`product_name` VARCHAR(32),
    	`event_count` VARCHAR(32),
    	PRIMARY KEY (`range_time`,`product_id`)
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4;

步骤4:创建增强型跨源连接

  • 创建DLI连接Kafka的增强型跨源连接
    1. 在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。
    2. 在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取该实例的“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
    3. 单击“网络”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPv4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。
    4. 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
    5. 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
      • 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_kafka。
      • 弹性资源池:选择步骤1:创建队列中已经创建的队列名称。(未添加至资源池的队列,请直接选择队列名称。)
      • 虚拟私有云:选择Kafka的虚拟私有云。
      • 子网:选择Kafka的子网。
      • 其他参数可以根据需要选择配置。

      参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为“已激活”后可以进行后续步骤。

    6. 单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。
    7. 在“测试连通性”界面,根据2中获取的Kafka连接信息,地址栏输入“Kafka内网地址:Kafka数据库端口”,单击“测试”测试DLI到Kafka网络是否可达。
  • 创建DLI连接RDS的增强型跨源连接
    1. 在RDS管理控制台,选择“实例管理”,单击对应的RDS实例名称,进入到RDS的基本信息页面。
    2. 在“基本信息”的“连接信息”中获取该实例的“内网地址”、“数据库端口”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
    3. 单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选为:1,策略选为:允许,协议选择:TCP,端口值不填,类型:IPv4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。
    4. Kafka和RDS实例属于同一VPC和子网下?
      1. 是,执行7。Kafka和RDS实例在同一VPC和子网,不用再重复创建增强型跨源连接。
      2. 否,执行5。Kafka和RDS实例分别在两个VPC和子网下,则要分别创建增强型跨源连接打通网络。
    5. 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
    6. 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
      • 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_rds。
      • 弹性资源池:选择步骤1:创建队列中已经创建的队列名称。(未添加至资源池的队列,请直接选择队列名称。)
      • 虚拟私有云:选择RDS的虚拟私有云。
      • 子网:选择RDS的子网。
      • 其他参数可以根据需要选择配置。

      参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。

    7. 单击“队列管理”,选择操作的队列,本示例为步骤1:创建队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。
    8. 在“测试连通性”界面,根据2中获取的RDS连接信息,地址栏输入“RDS内网地址:RDS数据库端口”,单击“测试”测试DLI到RDS网络是否可达。

步骤5:运行作业

  1. 在DLI管理控制台,单击“作业管理 > Flink作业”,在Flink作业管理界面,单击“创建作业”。
  2. 在创建作业界面,作业类型选择“Flink OpenSource SQL”,名称填写为:FlinkKafkaRds。单击“确定”,跳转到Flink作业编辑界面。
  3. 在Flink OpenSource SQL作业编辑界面,配置如下参数,其他参数默认即可。
    • 所属队列:选择步骤1:创建队列中创建的队列。
    • Flink版本:选择1.12。
    • 保存作业日志:勾选。
    • OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。
    • 开启Checkpoint:勾选。
    • Flink作业编辑框中输入具体的作业SQL,本示例作业参考如下。SQL中加粗的参数需要根据实际情况修改。

      本示例使用的Flink版本为1.12,故Flink OpenSource SQL语法也是1.12。本示例数据源是Kafka,写入结果数据到RDS。

      请参考Flink OpenSource SQL 1.12创建Kafka源表Flink OpenSource SQL 1.12创建JDBC结果表(RDS连接)。

      create table click_product(
          user_id string, --点击用户的id
          user_name string, --用户名称
          event_time string, --点击时间
          product_id string, --商品id
          product_name string --商品名称
      ) with (
          "connector" = "kafka",
          "properties.bootstrap.servers" = "10.128.0.120:9092,10.128.0.89:9092,10.128.0.83:9092",--替换为kafka的内网连接地址和端口
          "properties.group.id" = "click",
          "topic" = "testkafkatopic",--创建的Kafka Topic名称
          "format" = "json",
          "scan.startup.mode" = "latest-offset"
      );
      
      --结果表
      create table top_product (
          range_time string, --计算的时间范围
          product_id string, --商品id
          product_name string, --商品名称
          event_count bigint, --点击次数
          primary key (range_time, product_id) not enforced
      ) with (
          "connector" = "jdbc",
          "url" = "jdbc:mysql://192.168.12.148:3306/testrdsdb",--testrdsdb为创建的RDS的数据库名,IP和端口替换为RDS MySQL的实例IP和端口
          "table-name" = "clicktop",
          "pwd_auth_name"="xxxxx", --DLI侧创建的Password类型的跨源认证名称。使用跨源认证则无需在作业中配置账号和密码。
          "sink.buffer-flush.max-rows" = "1000",
          "sink.buffer-flush.interval" = "1s"
      );
      
      create view current_event_view
      as
          select product_id, product_name, count(1) as click_count, concat(substring(event_time, 1, 13), ":00:00") as min_event_time, concat(substring(event_time, 1, 13), ":59:59") as max_event_time
          from click_product group by substring (event_time, 1, 13), product_id, product_name;
      
      insert into top_product
          select
              concat(min_event_time, " - ", max_event_time) as range_time,
              product_id,
              product_name,
              click_count
          from (
              select *,
              row_number() over (partition by min_event_time order by click_count desc) as row_num
              from current_event_view
          )
          where row_num <= 3
  4. 单击“语义校验”确保SQL语义校验成功。单击“保存”,保存作业。单击“启动”,启动作业,确认作业参数信息,单击“立即启动”开始执行作业。等待作业运行状态变为“运行中”。

步骤6:发送数据和查询结果

  1. 使用Kafka客户端向步骤2:创建Kafka的Topic中的Topic发送数据,模拟实时数据流。

    Kafka生产和发送数据的方法请参考:DMS - 连接实例生产消费信息

    发送样例数据如下:

    {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:01:00", "product_id":"0002", "product_name":"name1"}
    {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:02:00", "product_id":"0002", "product_name":"name1"}
    {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 08:06:00", "product_id":"0004", "product_name":"name2"}
    {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:10:00", "product_id":"0003", "product_name":"name3"}
    {"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:15:00", "product_id":"0005", "product_name":"name4"}
    {"user_id":"0003", "user_name":"Cindy", "event_time":"2021-03-24 08:16:00", "product_id":"0005", "product_name":"name4"}
    {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 08:56:00", "product_id":"0004", "product_name":"name2"}
    {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:05:00", "product_id":"0005", "product_name":"name4"} 
    {"user_id":"0001", "user_name":"Alice", "event_time":"2021-03-24 09:10:00", "product_id":"0006", "product_name":"name5"}
    {"user_id":"0002", "user_name":"Bob", "event_time":"2021-03-24 09:13:00", "product_id":"0006", "product_name":"name5"}
  2. 登录RDS控制台,单击RDS数据库实例,单击创建的数据库名,如“testrdsdb”,在创建的表“clicktop”所在行的“操作”列,单击“SQL查询”,输入以下查询语句。
    select * from `clicktop`;
  3. 在“SQL查询”界面,单击“执行SQL”,查看RDS表数据已写入成功。
    图2 RDS表数据