计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
弹性伸缩 AS
镜像服务 IMS
专属主机 DeH
函数工作流 FunctionGraph
云手机服务器 CPH
Huawei Cloud EulerOS
网络
虚拟私有云 VPC
弹性公网IP EIP
虚拟专用网络 VPN
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
VPC终端节点 VPCEP
云连接 CC
企业路由器 ER
企业交换机 ESW
全球加速 GA
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
边缘安全 EdgeSec
威胁检测服务 MTD
CDN与智能边缘
内容分发网络 CDN
CloudPond云服务
智能边缘云 IEC
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
云搜索服务 CSS
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
数据可视化 DLV
数据湖工厂 DLF
湖仓构建 LakeFormation
企业应用
云桌面 Workspace
应用与数据集成平台 ROMA Connect
云解析服务 DNS
专属云
专属计算集群 DCC
IoT物联网
IoT物联网
设备接入 IoTDA
智能边缘平台 IEF
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
国际站常见问题
ICP备案
我的凭证
支持计划
客户运营能力
合作伙伴支持计划
专业服务
区块链
区块链服务 BCS
Web3节点引擎服务 NES
解决方案
SAP
高性能计算 HPC
视频
视频直播 Live
视频点播 VOD
媒体处理 MPC
实时音视频 SparkRTC
数字内容生产线 MetaStudio
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
存储容灾服务 SDRS
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
专属分布式存储服务 DSS
容器
云容器引擎 CCE
容器镜像服务 SWR
应用服务网格 ASM
华为云UCS
云容器实例 CCI
管理与监管
云监控服务 CES
统一身份认证服务 IAM
资源编排服务 RFS
云审计服务 CTS
标签管理服务 TMS
云日志服务 LTS
配置审计 Config
资源访问管理 RAM
消息通知服务 SMN
应用运维管理 AOM
应用性能管理 APM
组织 Organizations
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
应用身份管理服务 OneAccess
数据库
云数据库 RDS
文档数据库服务 DDS
数据管理服务 DAS
数据复制服务 DRS
云数据库 GeminiDB
云数据库 GaussDB
分布式数据库中间件 DDM
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
人脸识别服务 FRS
图引擎服务 GES
图像识别 Image
内容审核 Moderation
文字识别 OCR
AI开发平台ModelArts
图像搜索 ImageSearch
对话机器人服务 CBS
华为HiLens
视频智能分析服务 VIAS
语音交互服务 SIS
应用中间件
分布式缓存服务 DCS
API网关 APIG
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
多活高可用服务 MAS
事件网格 EG
企业协同
华为云会议 Meeting
云通信
消息&短信 MSGSMS
云生态
合作伙伴中心
云商店
开发者工具
SDK开发指南
API签名指南
Terraform
华为云命令行工具服务 KooCLI
其他
产品价格详情
系统权限
管理控制台
客户关联华为云合作伙伴须知
消息中心
公共问题
开发与运维
应用管理与运维平台 ServiceStage
软件开发生产线 CodeArts
需求管理 CodeArts Req
部署 CodeArts Deploy
性能测试 CodeArts PerfTest
编译构建 CodeArts Build
流水线 CodeArts Pipeline
制品仓库 CodeArts Artifact
测试计划 CodeArts TestPlan
代码检查 CodeArts Check
代码托管 CodeArts Repo
云应用引擎 CAE
开天aPaaS
云消息服务 KooMessage
云手机服务 KooPhone
云空间服务 KooDrive
文档首页/ 数据湖探索 DLI/ 快速入门/ 使用DLI提交Flink OpenSource SQL作业查询RDS MySQL数据

使用DLI提交Flink OpenSource SQL作业查询RDS MySQL数据

更新时间:2025-01-09 GMT+08:00

操作场景

DLI Flink作业支持使用其他服务作为数据源和数据输出通道进行数据实时计算操作。

本例采用Kafka服务作为数据源通道,以RDS作为数据输出通道,介绍创建并提交Flink OpenSource SQL作业进行实时计算的操作步骤。

操作流程

样例场景需要创建一个Flink OpenSource SQL作业,且该作业包含一个输入流和一个输出流。输入流用于从Kafka读取数据,输出流用于将数据写入到RDS中。操作流程如操作流程所示。

开始进行如下操作前,请务必参考准备工作完成必要操作。

表1 使用DLI提交SQL作业查询RDS MySQL数据的操作流程

操作步骤

说明

步骤1:准备数据源通道

本样例场景需要创建Kafka实例作为数据源通道。

步骤2:准备数据输出通道

本样例场景需要创建RDS实例作为数据输出通道。

步骤3:创建OBS桶保存输出数据

创建OBS桶,为DLI Flink作业提供Checkpoint、保存作业日志和调试测试数据的存储功能。

步骤4:创建弹性资源池并添加队列

创建提交Flink作业所需的计算资源。

步骤5:创建DLI连接Kafka的增强型跨源连接

通过增强型跨源连接建立DLI弹性资源池与Kafka实例的网络连通。

步骤6:创建DLI连接RDS的增强型跨源连接

通过增强型跨源连接建立DLI弹性资源池与RDS实例的网络连通。

步骤7:使用DEW管理访问凭据,并配置允许DLI访问DEW的委托

跨源分析场景中,使用DEW管理数据源的访问凭证,并创建允许DLI访问DEW的委托。

步骤8:创建Flink OpenSource SQL作业

准备好数据源和数据输出通道后创建Flink OpenSource SQL作业分析数据。

准备工作

  • 已注册华为账号并开通华为云,且在使用DLI前检查账号状态,账号不能处于欠费或冻结状态。
  • 配置DLI委托访问授权
    DLI使用过程中涉及到OBS、VPC、SMN等服务交互,首次使用DLI需要用户配置委托授权,允许访问这些依赖服务。
    1. 使用华为云账号登录DLI管理控制台,在左侧导航栏单击“全局配置 > 服务授权”。
    2. 在委托设置页面,勾选基础使用、跨源场景、运维场景的委托权限后,单击“更新委托权限”。
    3. 查看并了解更新委托的提示信息,单击“确定”。完成DLI委托权限的更新。
      图1 配置DLI委托访问授权
    4. 完成配置后,在IAM控制台的委托列表中,可查看到dli_management_agency的委托信息。

步骤1:准备数据源通道

本例以Kafka数据作为数据源通道。

开通Kafka数据接入服务,具体操作请参见创建Kafka实例

  1. 创建Kafka相关依赖资源
    在创建Kafka实例前您需要提前准备相关依赖资源,包括VPC、子网和安全组,并配置安全组。

    更多信息请参考《分布式消息服务Kafka用户指南》中的“准备实例依赖资源”章节。

  2. 创建用于作业输入流的Kafka专享版实例
    1. 登录分布式消息服务Kafka管理控制台。
    2. 在管理控制台左上角选择区域。
    3. “Kafka专享版”页面,单击右上角“购买Kafka实例”配置相关参数。实例信息如下:

      更多Kafka专享版实例参数配置说明请参考购买Kafka实例

      表2 购买Kafka专享版实例参数说明

      参数名称

      参数说明

      配置样例

      区域

      不同区域的云服务产品之间内网互不相通。请就近选择靠近您业务的区域,可减少网络时延,提高访问速度。

      华东-上海二

      可用区

      可用区指在同一区域下,电力、网络隔离的物理区域,可用区之间内网互通,不同可用区之间物理隔离。

      可用区1、可用区2、可用区3

      版本

      Kafka的版本号,实例创建后,版本号不支持修改。

      3.x

      部署架构

      根据需求选择“单机”或“集群”。

      集群

      代理规格

      请根据业务需求选择相应的代理规格。

      kafka.2u4g.cluster.small

      代理数量

      选择代理数量。

      3

      单个代理存储空间

      选择存储Kafka数据的磁盘类型和磁盘大小。Kafka实例创建后,磁盘类型不支持修改。

      高I/O 100GB

      虚拟私有云

      选择已经创建好的或共享的虚拟私有云。

      选择1中创建的虚拟私有云

      子网

      选择已经创建好的或共享的子网。

      子网在Kafka实例创建完成后,不支持修改。

      选择1中创建的子网

      安全组

      选择已经创建好的安全组。

      选择1中创建的安全组

      接入方式

      接入方式分为以下两种:

      • 明文接入:表示客户端连接Kafka实例时,无需进行SASL认证。
      • 密文接入:表示客户端连接Kafka实例时,需要进行SASL认证。

      明文接入

      实例名称

      按命名规则自定义实例名称。

      kafka-dliflink

      企业项目

      企业项目是一种云资源管理方式,企业项目管理服务提供统一的云资源按项目管理,以及项目内的资源管理、成员管理,默认项目为default。

      default

    4. 单击“立即购买”,弹出规格确认页面。
    5. 确认实例信息无误且阅读并同意《华为云用户协议》后,单击“提交”,完成实例创建,创建实例大约需要10~15分钟请耐心等待。
  3. 创建Kafka的topic。
    1. 单击购买的Kafka实例名称,进入到Kafka实例的基本信息页面。
    2. 单击“Topic管理 > 创建Topic”,创建一个Topic。Topic配置参数如下:
      • Topic名称。本示例输入为:testkafkatopic。
      • 分区数:1。
      • 副本数:1。

      其他参数保持默认即可。

步骤2:准备数据输出通道

采用RDS作为数据输出通道,创建RDS MySQL实例。

  1. 登录RDS管理控制台。
  2. 在管理控制台左上角选择区域。
  3. 单击“购买数据库实例”配置相关参数。主要参数的填下说明如下,其他参数保持默认值即可。

    详细参数说明请参考购买RDS for MySQL实例

    表3 RDS MySQL实例参数配置信息

    参数名称

    参数说明

    取值样例

    计费模式

    选择RDS实例的计费模式。

    按需计费

    区域

    选择与DLI服务相同的区域。

    华东-上海二

    实例名称

    实例名称。

    rds-dliflink

    数据库引擎

    MySQL

    MySQL

    数据库版本

    选用RDS for MySQL数据库时,请根据实际业务需求选择合适的数据库引擎版本。建议您选择当前可用的最高版本数据库,因其性能更稳定,安全性更高,使用更可靠。

    8.0

    实例类型

    选择实例的主备类型。

    主备

    存储类型

    实例的存储类型决定实例的读写速度。最大吞吐量越高,读写速度越快。

    SSD云盘

    可用区

    对于单机实例,仅需选择单个可用区。

    自定义

    时区

    由于世界各国家与地区经度不同,地方时也有所不同,因此会划分为不同的时区。时区可在创建实例时选择,后期可修改。

    默认

    性能规格

    实例的CPU和内存。不同性能规格对应不同连接数和最大IOPS。

    2vCPUs | 4GB

    存储空间

    如果存储类型为SSD云盘或极速型SSD,可设置存储空间自动扩容,当存储空间可用率过小时,会自动扩容存储空间。

    40GB

    磁盘加密

    选择是否开启磁盘加密功能。

    不加密

    虚拟私有云、子网

    选择已有的虚拟私有云和子网。

    如需重新创建VPC和子网,请参考。

    说明:

    跨源场景数据源网段和弹性资源池的网段不能重合。

    选择1中创建的虚拟私有云和子网。

    数据库端口

    默认使用3306端口。

    3306

    安全组

    安全组限制实例的安全访问规则,加强云数据库RDS服务与其他服务间的安全访问。

    数据源的安全组需放通DLI弹性资源池的网段。

    选择1中创建的安全组。

    设置密码

    设置实例的登录密码。

    -

    管理员账号

    root

    root

    管理员密码

    设置管理员密码。

    -

    参数模板

    数据库参数模板就像是数据库引擎配置值的容器,参数模板中的参数可应用于一个或多个相同类型的数据库实例。

    Default-MySQL-8.0

    企业项目

    对于已成功关联企业项目的用户,仅需在“企业项目”下拉框中选择目标项目。

    default

    购买数量

    实例购买数量。

    1

  4. 单击“立即购买”,确认规格信息。
  5. 单击“提交”,完成RDS实例的创建。
  6. 登录MySQL,并使用下述命令在flink库下创建orders表。
    登录MySQL,单击“SQL窗口”,在SQL查询页面输入以下创建表语句,创建RDS MySQL表。
    CREATE TABLE `flink`.`orders` (
    	`order_id` VARCHAR(32) NOT NULL,
    	`order_channel` VARCHAR(32) NULL,
    	`order_time` VARCHAR(32) NULL,
    	`pay_amount` DOUBLE UNSIGNED NOT NULL,
    	`real_pay` DOUBLE UNSIGNED NULL,
    	`pay_time` VARCHAR(32) NULL,
    	`user_id` VARCHAR(32) NULL,
    	`user_name` VARCHAR(32) NULL,
    	`area_id` VARCHAR(32) NULL,
    	PRIMARY KEY (`order_id`)
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4
    	COLLATE = utf8mb4_general_ci;

步骤3:创建OBS桶保存输出数据

在本样例中,需要为作业“JobSample”开通对象存储服务(OBS),为DLI Flink作业提供Checkpoint、保存作业日志和调试测试数据的存储功能。

具体操作请参见《对象存储服务控制台指南》中的“创建桶”章节。

  1. 在OBS管理控制台左侧导航栏选择“对象存储”。
  2. 在页面右上角单击“创建桶”,配置桶参数。
    表4 OBS桶参数说明

    参数名称

    参数说明

    取值样例

    区域

    桶所属区域。请选择与DLI服务相同的区域。

    华东-上海二

    桶名称

    桶的名称。需全局唯一,不能与已有的任何桶名称重复,包括其他用户创建的桶。桶创建成功后,不支持修改名称,创建时,请设置合适的桶名。

    obstest

    默认存储类别

    桶的存储类别。不同的存储类别可以满足客户业务对存储性能、成本的不同诉求。

    标准存储

    桶策略

    桶的读写权限控制。

    私有桶除桶ACL授权外的其他用户无桶的访问权限。

    私有

    企业项目

    将桶加入到企业项目中统一管理。

    default

  3. 单击“立即创建”。

步骤4:创建弹性资源池并添加队列

创建DLI Flink OpenSource SQL作业,不能使用系统已有的default队列,需要您创建新队列,本例创建弹性资源池“dli_resource_pool”、队列“dli_queue_01”。

  1. 登录DLI管理控制台。
  2. 在左侧导航栏单击“资源管理 > 弹性资源池”,可进入弹性资源池管理页面。
  3. 在弹性资源池管理界面,单击界面右上角的“购买弹性资源池”。
  4. 在“购买弹性资源池”界面,填写具体的弹性资源池参数。
    本例在华东-上海二区域购买按需计费的弹性资源池。相关参数说明如表5所示。
    表5 参数说明

    参数名称

    参数说明

    配置样例

    区域

    选择弹性资源池所在区域。

    华东-上海二

    项目

    每个区域默认对应一个项目,由系统预置。

    系统默认项目

    名称

    弹性资源池名称。

    dli_resource_pool

    规格

    选择弹性资源池规格。

    标准版

    CU范围

    弹性资源池最大最小CU范围。

    64-64

    网段

    规划弹性资源池所属的网段。如需使用DLI增强型跨源,弹性资源池网段与数据源网段不能重合。弹性资源池网段设置后不支持更改

    172.16.0.0/19

    企业项目

    选择对应的企业项目。

    default

  5. 参数填写完成后,单击“立即购买”,在界面上确认当前配置是否正确。
  6. 单击“提交”完成弹性资源池的创建。
  7. 在弹性资源池的列表页,选择要操作的弹性资源池,单击操作列的“添加队列”。
  8. 配置队列的基础配置,具体参数信息如下。
    表6 弹性资源池添加队列基础配置

    参数名称

    参数说明

    配置样例

    名称

    弹性资源池添加的队列名称。

    dli_queue_01

    类型

    选择创建的队列类型。

    • 执行SQL作业请选择SQL队列。
    • 执行Flink或Spark作业请选择通用队列。

    _

    执行引擎

    SQL队列可以选择队列引擎为Spark或者HetuEngine。

    _

    企业项目

    选择对应的企业项目。

    default

  9. 单击“下一步”,配置队列的扩缩容策略。

    单击“新增”,可以添加不同优先级、时间段、“最小CU”和“最大CU”扩缩容策略。

    本例配置的扩缩容策略如图2所示。
    图2 添加队列时配置扩缩容策略
    表7 扩缩容策略参数说明

    参数名称

    参数说明

    配置样例

    优先级

    当前弹性资源池中的优先级数字越大表示优先级越高。本例设置一条扩缩容策略,默认优先级为1。

    1

    时间段

    首条扩缩容策略是默认策略,不能删除和修改时间段配置。

    即设置00-24点的扩缩容策略。

    00-24

    最小CU

    设置扩缩容策略支持的最小CU数。

    16

    最大CU

    当前扩缩容策略支持的最大CU数。

    64

  10. 单击“确定”完成添加队列配置。

步骤5:创建DLI连接Kafka的增强型跨源连接

创建DLI Flink作业,还需要创建增强型跨源连接。具体操作请参考《数据湖探索用户指南》>《跨源连接》>《增强型跨源连接》。

说明:
  • 增强型跨源仅支持按需专属队列。
  • 绑定跨源的DLI队列网段和数据源网段不能重合。
  • 系统default队列不支持创建跨源连接。
  • 访问跨源表需要使用已经创建跨源连接的队列。
  1. 在Kafka的安全组上放通DLI队列网段

    1. 在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。
    2. 在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取该实例的“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
    3. 单击“网络”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。

      例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选择:1,策略选择:允许,协议选择:TCP,端口值不填,类型:IPv4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。

  2. 创建DLI队列连接Kafka的增强型跨源连接。

    1. 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
    2. 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
      • 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_kafka。
      • 弹性资源池:选择步骤4:创建弹性资源池并添加队列中已经创建的资源名称。
      • 虚拟私有云:选择Kafka的虚拟私有云。
      • 子网:选择Kafka的子网。
      • 其他参数可以根据需要选择配置。

      参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为“已激活”后可以进行后续步骤。

    3. 单击“资源管理 > 队列管理”,选择操作的队列,本示例为步骤4:创建弹性资源池并添加队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。
    4. 在“测试连通性”界面,地址栏输入“Kafka内网地址:Kafka数据库端口”,单击“测试”测试DLI到Kafka网络是否可达。注意多个地址要分开单独测试。

步骤6:创建DLI连接RDS的增强型跨源连接

  1. 在RDS的安全组上放通DLI队列网段。

    如果RDS和Kafka在同一VPC下的同一安全组,则无需重复执行此步骤,在步骤1中已在该安全组放通DLI的队列网段。
    1. 在RDS管理控制台,选择“实例管理”,单击对应的RDS实例名称,进入到RDS的基本信息页面。
    2. 在“基本信息”的“连接信息”中获取该实例的“内网地址”、“数据库端口”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
    3. 单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选择:1,策略选择:允许,协议选择:TCP,端口值不填,类型:IPv4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。

  2. 创建DLI队列连接RDS的增强型跨源连接。

    如果RDS和Kafka在同一VPC和子网,则无需重复执行此步骤,在步骤2中创建的增强型跨源连接已将网络打通。

    如果Kafka和RDS实例分别在两个VPC和子网下,则要执行以下步骤创建DLI队列连接RDS的增强型跨源连接。
    1. 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
    2. 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
      • 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_rds。
      • 弹性资源池:选择步骤4:创建弹性资源池并添加队列中已经创建的队列名称。
      • 虚拟私有云:选择RDS的虚拟私有云。
      • 子网:选择RDS的子网。
      • 其他参数可以根据需要选择配置。

      参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。

    3. 单击“资源管理 > 队列管理”,选择操作的队列,本示例为步骤4:创建弹性资源池并添加队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。
    4. 在“测试连通性”界面,地址栏输入“RDS内网地址:RDS数据库端口”,单击“测试”测试DLI到RDS网络是否可达。

步骤7:使用DEW管理访问凭据,并配置允许DLI访问DEW的委托

跨源分析场景中,需要在connector中设置账号、密码等属性。但是账号密码等信息属于高度敏感数据,需要做加密处理,以保障用户的数据隐私安全。Flink1.15版本支持使用DEW管理凭据,在执行作业前需创建自定义委托并在作业中配置委托信息。

数据加密服务(Data Encryption Workshop,DEW)、云凭据管理服务(Cloud Secret Management Service,CSMS),提供一种安全、可靠、简单易用隐私数据加解密方案。本例介绍Flink Opensource SQL使用DEW管理RDS访问凭据的配置方法。

  1. 创建DLI访问DEW的委托并完成委托授权。
  2. 在DEW创建通用凭证。
  3. 登录DEW管理控制台
  4. 选择“凭据管理”,进入“凭据管理”页面。
  5. 单击“创建凭据”,配置凭据基本信息
    • 凭据名称:待创建凭据的名称。本例名称为secretInfo。
    • 凭据值:配置RDS实例的用户名和密码。
      • 第一行凭据值的键为MySQLUsername,值为RDS实例的用户名。
      • 第二行凭据值的键为MySQLPassword,值为RDS实例的密码。
      图3 设置凭据值
  6. 按需完成其他参数的配置后,单击“确定”保存凭据。

步骤8:创建Flink OpenSource SQL作业

准备好数据源和数据输出通道之后,就可以创建Flink OpenSource SQL作业了。

  1. 在DLI管理控制台的左侧导航栏中,单击“作业管理 > Flink作业”,进入“Flink作业”页面。
  2. 在“Flink作业”页面右上角单击“创建作业”,弹出“创建作业”对话框。配置参数:
    • 类型:Flink OpenSource SQL
    • 名称:JobSample
    • 描述:不填
    • 模板名称:不选择
    • 标签:不填
  3. 单击“确定”,进入作业“编辑”页面。
  4. 设置作业运行参数。配置必选参数:
    • 所属队列:dli_queue_01
    • Flink版本:Flink 1.12。
    • 保存作业日志:勾选。
    • OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。
    • 开启Checkpoint:勾选。

    其余参数可不用配置。

  5. 单击“保存”,保存作业和相关参数。
  6. 编辑Flink OpenSource SQL作业。
    在Flink OpenSource SQL语句编辑区域,输入详细的Flink OpenSource SQL语句。具体如下,注意以下加粗的参数值都需要根据注释提示修改。
    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'testkafkatopic',// 要写入kafka的topic,进入kafka控制台,单击已创建的Kafka实例名称,在Topic管理查看Topic名称
      'properties.bootstrap.servers' = "192.168.0.237:9092,192.168.0.252:9092,192.168.0.137:9092", // 替换为kafka的内网连接地址和端口,
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE jdbcSink (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'jdbc',
      'url' = "jdbc:mysql://172.16.0.116:3306/rds-dliflink",// testrdsdb为创建的RDS的数据库名,IP和端口替换为RDS MySQL的实例IP和端口
      'table-name' = 'orders',
      'pwd_auth_name'="xxxxx", // DLI侧创建的Password类型的跨源认证名称,使用跨源认证则无需在作业中配置账号和密码
      'sink.buffer-flush.max-rows' = '1'
    );
    
    insert into jdbcSink select * from kafkaSource;
  7. 单击“语义校验”,确保语义校验成功。
  8. 单击“启动”,进入“启动Flink作业”页面,确认作业规格和费用后,单击“立即启动”,启动作业。

    启动作业后,系统将自动跳转到Flink作业管理页面,新创建的作业将显示在作业列表中,在“状态”列中可以查看作业状态。作业提交成功后,状态将由“提交中”变为“运行中”。

    如果作业状态为“提交失败”或“运行异常”,表示作业提交或运行失败。用户可以在作业列表中的“状态”列中,将鼠标移动到状态图标上查看错误信息,单击可以复制错误信息。根据错误信息解决故障后,重新提交。

  9. 连接Kafka集群,向Kafka相应的topic中发送如下测试数据:

    Kafka生产和发送数据的方法请参考:连接实例生产消费信息

    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  10. 查看表中数据,在MySQL中执行sql查询语句。
    select * from orders;
    其结果参考如下(以下数据为从MySQL中复制的结果)。
    202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106
    202103241606060001,appShop,2021-03-24 16:06:06,200.0,180.0,2021-03-24 16:10:06,0001,Alice,330106

我们使用cookie来确保您的高速浏览体验。继续浏览本站,即表示您同意我们使用cookie。 详情

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容