网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
企业路由器 ER
企业交换机 ESW
全球加速 GA
企业连接 EC
云原生应用网络 ANC
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
威胁检测服务 MTD
态势感知 SA
认证测试中心 CTC
边缘安全 EdgeSec
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
API网关 APIG
分布式缓存服务 DCS
多活高可用服务 MAS
事件网格 EG
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
配置审计 Config
应用身份管理服务 OneAccess
资源访问管理 RAM
组织 Organizations
资源编排服务 RFS
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
解决方案
高性能计算 HPC
SAP
混合云灾备
开天工业工作台 MIW
Haydn解决方案工厂
数字化诊断治理专家服务
云生态
云商店
合作伙伴中心
华为云开发者学堂
华为云慧通差旅
开发与运维
软件开发生产线 CodeArts
需求管理 CodeArts Req
流水线 CodeArts Pipeline
代码检查 CodeArts Check
编译构建 CodeArts Build
部署 CodeArts Deploy
测试计划 CodeArts TestPlan
制品仓库 CodeArts Artifact
移动应用测试 MobileAPPTest
CodeArts IDE Online
开源镜像站 Mirrors
性能测试 CodeArts PerfTest
应用管理与运维平台 ServiceStage
云应用引擎 CAE
开源治理服务 CodeArts Governance
华为云Astro轻应用
CodeArts IDE
Astro工作流 AstroFlow
代码托管 CodeArts Repo
漏洞管理服务 CodeArts Inspector
联接 CodeArtsLink
软件建模 CodeArts Modeling
Astro企业应用 AstroPro
CodeArts盘古助手
华为云Astro大屏应用
计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
云手机服务器 CPH
专属主机 DeH
弹性伸缩 AS
镜像服务 IMS
函数工作流 FunctionGraph
云耀云服务器(旧版)
VR云渲游平台 CVR
Huawei Cloud EulerOS
云化数据中心 CloudDC
网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
企业路由器 ER
企业交换机 ESW
全球加速 GA
企业连接 EC
云原生应用网络 ANC
CDN与智能边缘
内容分发网络 CDN
智能边缘云 IEC
智能边缘平台 IEF
CloudPond云服务
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
威胁检测服务 MTD
态势感知 SA
认证测试中心 CTC
边缘安全 EdgeSec
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
可信智能计算服务 TICS
推荐系统 RES
云搜索服务 CSS
数据可视化 DLV
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
湖仓构建 LakeFormation
智能数据洞察 DataArts Insight
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
API网关 APIG
分布式缓存服务 DCS
多活高可用服务 MAS
事件网格 EG
开天aPaaS
应用平台 AppStage
开天企业工作台 MSSE
开天集成工作台 MSSI
API中心 API Hub
云消息服务 KooMessage
交换数据空间 EDS
云地图服务 KooMap
云手机服务 KooPhone
组织成员账号 OrgID
云空间服务 KooDrive
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
配置审计 Config
应用身份管理服务 OneAccess
资源访问管理 RAM
组织 Organizations
资源编排服务 RFS
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
区块链
区块链服务 BCS
数字资产链 DAC
华为云区块链引擎服务 HBS
解决方案
高性能计算 HPC
SAP
混合云灾备
开天工业工作台 MIW
Haydn解决方案工厂
数字化诊断治理专家服务
价格
成本优化最佳实践
专属云商业逻辑
云生态
云商店
合作伙伴中心
华为云开发者学堂
华为云慧通差旅
其他
管理控制台
消息中心
产品价格详情
系统权限
客户关联华为云合作伙伴须知
公共问题
宽限期保留期
奖励推广计划
活动
云服务信任体系能力说明
开发与运维
软件开发生产线 CodeArts
需求管理 CodeArts Req
流水线 CodeArts Pipeline
代码检查 CodeArts Check
编译构建 CodeArts Build
部署 CodeArts Deploy
测试计划 CodeArts TestPlan
制品仓库 CodeArts Artifact
移动应用测试 MobileAPPTest
CodeArts IDE Online
开源镜像站 Mirrors
性能测试 CodeArts PerfTest
应用管理与运维平台 ServiceStage
云应用引擎 CAE
开源治理服务 CodeArts Governance
华为云Astro轻应用
CodeArts IDE
Astro工作流 AstroFlow
代码托管 CodeArts Repo
漏洞管理服务 CodeArts Inspector
联接 CodeArtsLink
软件建模 CodeArts Modeling
Astro企业应用 AstroPro
CodeArts盘古助手
华为云Astro大屏应用
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
存储容灾服务 SDRS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
云存储网关 CSG
专属分布式存储服务 DSS
数据工坊 DWR
地图数据 MapDS
键值存储服务 KVS
容器
云容器引擎 CCE
云容器实例 CCI
容器镜像服务 SWR
云原生服务中心 OSC
应用服务网格 ASM
华为云UCS
数据库
云数据库 RDS
数据复制服务 DRS
文档数据库服务 DDS
分布式数据库中间件 DDM
云数据库 GaussDB
云数据库 GeminiDB
数据管理服务 DAS
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
AI开发平台ModelArts
华为HiLens
图引擎服务 GES
图像识别 Image
文字识别 OCR
自然语言处理 NLP
内容审核 Moderation
图像搜索 ImageSearch
医疗智能体 EIHealth
企业级AI应用开发专业套件 ModelArts Pro
人脸识别服务 FRS
对话机器人服务 CBS
语音交互服务 SIS
人证核身服务 IVS
视频智能分析服务 VIAS
城市智能体
自动驾驶云服务 Octopus
盘古大模型 PanguLargeModels
IoT物联网
设备接入 IoTDA
全球SIM联接 GSL
IoT数据分析 IoTA
路网数字化服务 DRIS
IoT边缘 IoTEdge
设备发放 IoTDP
企业应用
域名注册服务 Domains
云解析服务 DNS
企业门户 EWP
ICP备案
商标注册
华为云WeLink
华为云会议 Meeting
隐私保护通话 PrivateNumber
语音通话 VoiceCall
消息&短信 MSGSMS
云管理网络
SD-WAN 云服务
边缘数据中心管理 EDCM
云桌面 Workspace
应用与数据集成平台 ROMA Connect
ROMA资产中心 ROMA Exchange
API全生命周期管理 ROMA API
政企自服务管理 ESM
视频
实时音视频 SparkRTC
视频直播 Live
视频点播 VOD
媒体处理 MPC
视频接入服务 VIS
数字内容生产线 MetaStudio
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
专属云
专属计算集群 DCC
开发者工具
SDK开发指南
API签名指南
DevStar
华为云命令行工具服务 KooCLI
Huawei Cloud Toolkit
CodeArts API
云化转型
云架构中心
云采用框架
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
客户运营能力
国际站常见问题
支持计划
专业服务
合作伙伴支持计划
我的凭证
华为云公共事业服务云平台
工业软件
工业数字模型驱动引擎
硬件开发工具链平台云服务
工业数据转换引擎云服务
文档首页/ 数据湖探索 DLI/ 快速入门/ 使用DLI提交Flink OpenSource SQL作业查询RDS MySQL数据

使用DLI提交Flink OpenSource SQL作业查询RDS MySQL数据

更新时间:2024-12-26 GMT+08:00
分享

操作场景

DLI Flink作业支持使用其他服务作为数据源和数据输出通道进行数据实时计算操作。

本例采用Kafka服务作为数据源通道,以RDS作为数据输出通道,介绍创建并提交Flink OpenSource SQL作业进行实时计算的操作步骤。

操作流程

样例场景需要创建一个Flink OpenSource SQL作业,且该作业包含一个输入流和一个输出流。输入流用于从Kafka读取数据,输出流用于将数据写入到RDS中。操作流程如操作流程所示。

开始进行如下操作前,请务必参考准备工作完成必要操作。

表1 使用DLI提交SQL作业查询RDS MySQL数据的操作流程

操作步骤

说明

步骤1:准备数据源通道

本样例场景需要创建Kafka实例作为数据源通道。

步骤2:准备数据输出通道

本样例场景需要创建RDS实例作为数据输出通道。

步骤3:创建OBS桶保存输出数据

创建OBS桶,为DLI Flink作业提供Checkpoint、保存作业日志和调试测试数据的存储功能。

步骤4:创建弹性资源池并添加队列

创建提交Flink作业所需的计算资源。

步骤5:创建DLI连接Kafka的增强型跨源连接

通过增强型跨源连接建立DLI弹性资源池与Kafka实例的网络连通。

步骤6:创建DLI连接RDS的增强型跨源连接

通过增强型跨源连接建立DLI弹性资源池与RDS实例的网络连通。

步骤7:使用DEW管理访问凭据,并配置允许DLI访问DEW的委托

跨源分析场景中,使用DEW管理数据源的访问凭证,并创建允许DLI访问DEW的委托。

步骤8:创建Flink OpenSource SQL作业

准备好数据源和数据输出通道后创建Flink OpenSource SQL作业分析数据。

准备工作

  • 已注册华为账号并开通华为云,且在使用DLI前检查账号状态,账号不能处于欠费或冻结状态。
  • 配置DLI委托访问授权
    DLI使用过程中涉及到OBS、VPC、SMN等服务交互,首次使用DLI需要用户配置委托授权,允许访问这些依赖服务。
    1. 使用华为云账号登录DLI管理控制台,在左侧导航栏单击“全局配置 > 服务授权”。
    2. 在委托设置页面,勾选基础使用、跨源场景、运维场景的委托权限后,单击“更新委托权限”。
    3. 查看并了解更新委托的提示信息,单击“确定”。完成DLI委托权限的更新。
      图1 配置DLI委托访问授权
    4. 完成配置后,在IAM控制台的委托列表中,可查看到dli_management_agency的委托信息。

步骤1:准备数据源通道

本例以Kafka数据作为数据源通道。

开通Kafka数据接入服务,具体操作请参见创建Kafka实例

  1. 创建Kafka相关依赖资源
    在创建Kafka实例前您需要提前准备相关依赖资源,包括VPC、子网和安全组,并配置安全组。

    更多信息请参考《分布式消息服务Kafka用户指南》中的“准备实例依赖资源”章节。

  2. 创建用于作业输入流的Kafka专享版实例
    1. 登录分布式消息服务Kafka管理控制台。
    2. 在管理控制台左上角选择区域。
    3. “Kafka专享版”页面,单击右上角“购买Kafka实例”配置相关参数。实例信息如下:
      • 计费模式:按需付费
      • 区域:选择与DLI服务相同的区域
      • 项目:默认
      • 可用区:默认
      • 实例名称:kafka-dliflink
      • 规格类型:默认
      • 企业项目:default
      • 版本:默认
      • CPU架构:默认
      • 代理规格:选择对应的规格
      • 代理数量:默认
      • 存储空间:默认
      • 容量阈值策略:默认
      • 虚拟私有云,子网:选择1中创建的虚拟私有云和子网。
      • 安全组:选择1中创建的安全组。
      • Manager用户名:dliflink(用于登录实例管理页面)
      • 密码:****(请妥善管理密码,系统无法获取您设置的密码内容)
      • 确认密码:****
      • 更多配置:暂不配置。
    4. 单击“立即购买”,弹出规格确认页面。
    5. 确认实例信息无误且阅读并同意《华为云用户协议》后,单击“提交”,完成实例创建,创建实例大约需要10~15分钟请耐心等待。
  3. 创建Kafka的topic。
    1. 单击购买的Kafka实例名称,进入到Kafka实例的基本信息页面。
    2. 单击“Topic管理 > 创建Topic”,创建一个Topic。Topic配置参数如下:
      • Topic名称。本示例输入为:testkafkatopic。
      • 分区数:1。
      • 副本数:1。

      其他参数保持默认即可。

步骤2:准备数据输出通道

采用RDS作为数据输出通道,创建RDS MySQL实例。

具体操作请参见购买RDS for MySQL实例

  1. 登录RDS管理控制台。
  2. 在管理控制台左上角选择区域。
  3. 单击“购买数据库实例”配置相关参数。主要参数的填下说明如下,其他参数保持默认值即可。
    • 计费模式:按需付费
    • 区域:选择与DLI服务相同的区域
    • 实例名称:rds-dliflink
    • 数据库引擎:MySQL
    • 数据库版本:8.0
    • 实例类型:主备
    • 存储类型:SSD云盘
    • 主可用区:自定义
    • 备可用区:自定义
    • 时区:默认
    • 性能规格:2 vCPUs | 8 GB
    • 存储空间:40GB
    • 虚拟私有云、子网:选择1中创建的虚拟私有云和子网。
    • 数据库端口:3306
    • 安全组:选择1中创建的安全组。
    • 管理员密码:****(请妥善管理密码,系统无法获取您设置的密码内容)
    • 确认密码:****
    • 参数模板:Default-MySQL-8.0
    • 只读实例:暂不购买
  4. 单击“立即购买”,确认规格信息。
  5. 单击“提交”,完成RDS实例的创建。
  6. 登录MySQL,并使用下述命令在flink库下创建orders表。
    登录MySQL,单击“SQL窗口”,在SQL查询页面输入以下创建表语句,创建RDS MySQL表。
    CREATE TABLE `flink`.`orders` (
    	`order_id` VARCHAR(32) NOT NULL,
    	`order_channel` VARCHAR(32) NULL,
    	`order_time` VARCHAR(32) NULL,
    	`pay_amount` DOUBLE UNSIGNED NOT NULL,
    	`real_pay` DOUBLE UNSIGNED NULL,
    	`pay_time` VARCHAR(32) NULL,
    	`user_id` VARCHAR(32) NULL,
    	`user_name` VARCHAR(32) NULL,
    	`area_id` VARCHAR(32) NULL,
    	PRIMARY KEY (`order_id`)
    )	ENGINE = InnoDB
    	DEFAULT CHARACTER SET = utf8mb4
    	COLLATE = utf8mb4_general_ci;

步骤3:创建OBS桶保存输出数据

在本样例中,需要为作业“JobSample”开通对象存储服务(OBS),为DLI Flink作业提供Checkpoint、保存作业日志和调试测试数据的存储功能。

具体操作请参见《对象存储服务控制台指南》中的“创建桶”章节。

  1. 在OBS管理控制台左侧导航栏选择“对象存储”。
  2. 在页面右上角单击“创建桶”,配置桶参数。
    • 区域:选择与DLI服务相同的区域
    • 桶名称:具体根据实际情况选择桶名,例如当前选择:obstest
    • 默认存储类别:标准存储
    • 桶策略:私有
    • 默认加密:关闭
    • 归档数据直读:关闭
    • 企业项目:default
  3. 单击“立即创建”。

步骤4:创建弹性资源池并添加队列

创建DLI Flink OpenSource SQL作业,不能使用系统已有的default队列,需要您创建新队列,本例创建弹性资源池“dli_resource_pool”、队列“dli_queue_01”。

  1. 登录DLI管理控制台。
  2. 在左侧导航栏单击“资源管理 > 弹性资源池”,可进入弹性资源池管理页面。
  3. 在弹性资源池管理界面,单击界面右上角的“购买弹性资源池”。
  4. 在“购买弹性资源池”界面,填写具体的弹性资源池参数。
    本例在华东-上海二区域购买按需计费的弹性资源池。相关参数说明如表2所示。
    表2 参数说明

    参数名称

    参数说明

    配置样例

    计费模式

    选择弹性资源池计费模式。

    按需计费

    区域

    选择弹性资源池所在区域。

    华东-上海二

    项目

    每个区域默认对应一个项目,由系统预置。

    系统默认项目

    名称

    弹性资源池名称。

    dli_resource_pool

    规格

    选择弹性资源池规格。

    标准版

    CU范围

    弹性资源池最大最小CU范围。

    64-64

    网段

    规划弹性资源池所属的网段。如需使用DLI增强型跨源,弹性资源池网段与数据源网段不能重合。弹性资源池网段设置后不支持更改

    172.16.0.0/19

    企业项目

    选择对应的企业项目。

    default

  5. 参数填写完成后,单击“立即购买”,在界面上确认当前配置是否正确。
  6. 单击“提交”完成弹性资源池的创建。
  7. 在弹性资源池的列表页,选择要操作的弹性资源池,单击操作列的“添加队列”。
  8. 配置队列的基础配置,具体参数信息如下。
    表3 弹性资源池添加队列基础配置

    参数名称

    参数说明

    配置样例

    名称

    弹性资源池添加的队列名称。

    dli_queue_01

    类型

    选择创建的队列类型。

    • 执行SQL作业请选择SQL队列。
    • 执行Flink或Spark作业请选择通用队列。

    _

    执行引擎

    SQL队列可以选择队列引擎为Spark或者HetuEngine。

    _

    企业项目

    选择对应的企业项目。

    default

  9. 单击“下一步”,配置队列的扩缩容策略。

    单击“新增”,可以添加不同优先级、时间段、“最小CU”和“最大CU”扩缩容策略。

    本例配置的扩缩容策略如图2所示。
    图2 添加队列时配置扩缩容策略
    表4 扩缩容策略参数说明

    参数名称

    参数说明

    配置样例

    优先级

    当前弹性资源池中的优先级数字越大表示优先级越高。本例设置一条扩缩容策略,默认优先级为1。

    1

    时间段

    首条扩缩容策略是默认策略,不能删除和修改时间段配置。

    即设置00-24点的扩缩容策略。

    00-24

    最小CU

    设置扩缩容策略支持的最小CU数。

    16

    最大CU

    当前扩缩容策略支持的最大CU数。

    64

  10. 单击“确定”完成添加队列配置。

步骤5:创建DLI连接Kafka的增强型跨源连接

创建DLI Flink作业,还需要创建增强型跨源连接。具体操作请参考创建增强型跨源连接

说明:
  • 增强型跨源仅支持包年包月队列和按需专属队列。
  • 绑定跨源的DLI队列网段和数据源网段不能重合。
  • 系统default队列不支持创建跨源连接。
  • 访问跨源表需要使用已经创建跨源连接的队列。
  1. 在Kafka的安全组上放通DLI队列网段

    1. 在Kafka管理控制台,选择“Kafka专享版”,单击对应的Kafka名称,进入到Kafka的基本信息页面。
    2. 在“连接信息”中获取该Kafka的“内网连接地址”,在“基本信息”的“网络”中获取该实例的“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
    3. 单击“网络”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。

      例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选择:1,策略选择:允许,协议选择:TCP,端口值不填,类型:IPv4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。

  2. 创建DLI队列连接Kafka的增强型跨源连接。

    1. 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
    2. 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
      • 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_kafka。
      • 弹性资源池:选择步骤4:创建弹性资源池并添加队列中已经创建的资源名称。
      • 虚拟私有云:选择Kafka的虚拟私有云。
      • 子网:选择Kafka的子网。
      • 其他参数可以根据需要选择配置。

      参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为“已激活”后可以进行后续步骤。

    3. 单击“资源管理 > 队列管理”,选择操作的队列,本示例为步骤4:创建弹性资源池并添加队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。
    4. 在“测试连通性”界面,地址栏输入“Kafka内网地址:Kafka数据库端口”,单击“测试”测试DLI到Kafka网络是否可达。注意多个地址要分开单独测试。

步骤6:创建DLI连接RDS的增强型跨源连接

  1. 在RDS的安全组上放通DLI队列网段。

    如果RDS和Kafka在同一VPC下的同一安全组,则无需重复执行此步骤,在步骤1中已在该安全组放通DLI的队列网段。
    1. 在RDS管理控制台,选择“实例管理”,单击对应的RDS实例名称,进入到RDS的基本信息页面。
    2. 在“基本信息”的“连接信息”中获取该实例的“内网地址”、“数据库端口”、“虚拟私有云”和“子网”信息,方便后续操作步骤使用。
    3. 单击“连接信息”中的安全组名称,在“入方向规则”中添加放通队列网段的规则。例如,本示例队列网段为“10.0.0.0/16”,则规则添加为:优先级选择:1,策略选择:允许,协议选择:TCP,端口值不填,类型:IPv4,源地址为:10.0.0.0/16,单击“确定”完成安全组规则添加。

  2. 创建DLI队列连接RDS的增强型跨源连接。

    如果RDS和Kafka在同一VPC和子网,则无需重复执行此步骤,在步骤2中创建的增强型跨源连接已将网络打通。

    如果Kafka和RDS实例分别在两个VPC和子网下,则要执行以下步骤创建DLI队列连接RDS的增强型跨源连接。
    1. 登录DLI管理控制台,在左侧导航栏单击“跨源管理”,在跨源管理界面,单击“增强型跨源”,单击“创建”。
    2. 在增强型跨源创建界面,配置具体的跨源连接参数。具体参考如下。
      • 连接名称:设置具体的增强型跨源名称。本示例输入为:dli_rds。
      • 弹性资源池:选择步骤4:创建弹性资源池并添加队列中已经创建的队列名称。
      • 虚拟私有云:选择RDS的虚拟私有云。
      • 子网:选择RDS的子网。
      • 其他参数可以根据需要选择配置。

      参数配置完成后,单击“确定”完成增强型跨源配置。单击创建的跨源连接名称,查看跨源连接的连接状态,等待连接状态为:“已激活”后可以进行后续步骤。

    3. 单击“资源管理 > 队列管理”,选择操作的队列,本示例为步骤4:创建弹性资源池并添加队列中创建的队列,在操作列,单击“更多 > 测试地址连通性”。
    4. 在“测试连通性”界面,地址栏输入“RDS内网地址:RDS数据库端口”,单击“测试”测试DLI到RDS网络是否可达。

步骤7:使用DEW管理访问凭据,并配置允许DLI访问DEW的委托

跨源分析场景中,需要在connector中设置账号、密码等属性。但是账号密码等信息属于高度敏感数据,需要做加密处理,以保障用户的数据隐私安全。Flink1.15版本支持使用DEW管理凭据,在执行作业前需创建自定义委托并在作业中配置委托信息。

数据加密服务(Data Encryption Workshop,DEW)、云凭据管理服务(Cloud Secret Management Service,CSMS),提供一种安全、可靠、简单易用隐私数据加解密方案。本例介绍Flink Opensource SQL使用DEW管理RDS访问凭据的配置方法。

  1. 创建DLI访问DEW的委托并完成委托授权。
  2. 在DEW创建通用凭证。
  3. 登录DEW管理控制台
  4. 选择“凭据管理”,进入“凭据管理”页面。
  5. 单击“创建凭据”,配置凭据基本信息
    • 凭据名称:待创建凭据的名称。本例名称为secretInfo。
    • 凭据值:配置RDS实例的用户名和密码。
      • 第一行凭据值的键为MySQLUsername,值为RDS实例的用户名。
      • 第二行凭据值的键为MySQLPassword,值为RDS实例的密码。
      图3 设置凭据值
  6. 按需完成其他参数的配置后,单击“确定”保存凭据。

    了解更多请参考创建通用凭据

步骤8:创建Flink OpenSource SQL作业

准备好数据源和数据输出通道之后,就可以创建Flink OpenSource SQL作业了。

  1. 在DLI管理控制台的左侧导航栏中,单击“作业管理 > Flink作业”,进入“Flink作业”页面。
  2. 在“Flink作业”页面右上角单击“创建作业”,弹出“创建作业”对话框。配置参数:
    • 类型:Flink OpenSource SQL
    • 名称:JobSample
    • 描述:不填
    • 模板名称:不选择
    • 标签:不填
  3. 单击“确定”,进入作业“编辑”页面。
  4. 设置作业运行参数。配置必选参数:
    • 所属队列:dli_queue_01
    • Flink版本:Flink 1.12。
    • 保存作业日志:勾选。
    • OBS桶:选择保存作业日志的OBS桶,根据提示进行OBS桶权限授权。
    • 开启Checkpoint:勾选。

    其余参数可不用配置。

  5. 单击“保存”,保存作业和相关参数。
  6. 编辑Flink OpenSource SQL作业。
    在Flink OpenSource SQL语句编辑区域,输入详细的Flink OpenSource SQL语句。具体如下,注意以下加粗的参数值都需要根据注释提示修改。
    CREATE TABLE kafkaSource (
      order_id string,
      order_channel string,
      order_time string, 
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'testkafkatopic',// 要写入kafka的topic,进入kafka控制台,单击已创建的Kafka实例名称,在Topic管理查看Topic名称
      'properties.bootstrap.servers' = "192.168.0.237:9092,192.168.0.252:9092,192.168.0.137:9092", // 替换为kafka的内网连接地址和端口,
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    CREATE TABLE jdbcSink (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string
    ) WITH (
      'connector' = 'jdbc',
      'url' = "jdbc:mysql://172.16.0.116:3306/rds-dliflink",// testrdsdb为创建的RDS的数据库名,IP和端口替换为RDS MySQL的实例IP和端口
      'table-name' = 'orders',
      'pwd_auth_name'="xxxxx", // DLI侧创建的Password类型的跨源认证名称,使用跨源认证则无需在作业中配置账号和密码
      'sink.buffer-flush.max-rows' = '1'
    );
    
    insert into jdbcSink select * from kafkaSource;
  7. 单击“语义校验”,确保语义校验成功。
  8. 单击“启动”,进入“启动Flink作业”页面,确认作业规格和费用后,单击“立即启动”,启动作业。

    启动作业后,系统将自动跳转到Flink作业管理页面,新创建的作业将显示在作业列表中,在“状态”列中可以查看作业状态。作业提交成功后,状态将由“提交中”变为“运行中”。

    如果作业状态为“提交失败”或“运行异常”,表示作业提交或运行失败。用户可以在作业列表中的“状态”列中,将鼠标移动到状态图标上查看错误信息,单击可以复制错误信息。根据错误信息解决故障后,重新提交。

    如果以上错误信息不足以定位问题,还可以参考Flink作业运行异常,如何定位,从OBS桶中下载作业日志对问题进一步定位。

  9. 连接Kafka集群,向Kafka相应的topic中发送如下测试数据:

    Kafka生产和发送数据的方法请参考:连接实例生产消费信息

    {"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"} 
    
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
  10. 查看表中数据,在MySQL中执行sql查询语句。
    select * from orders;
    其结果参考如下(以下数据为从MySQL中复制的结果)。
    202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106
    202103241606060001,appShop,2021-03-24 16:06:06,200.0,180.0,2021-03-24 16:10:06,0001,Alice,330106

后续指引

完成Flink OpenSource SQL作业快速入门操作后,如果您想了解更多关于Flink OpenSource SQL作业相关操作,建议您参考以下指引阅读。

分类

文档

说明

界面操作

Flink作业管理

提供Flink作业管理界面功能介绍。

Flink模板管理

提供Flink作业样例模板和自定义模板功能介绍。您可以根据习惯和业务需要自定义作业模板,方便后续创建提交作业。

开发指南

Flink SQL语法参考

提供Flink OpenSource SQL创建源表、结果表和维表的语法说明和样例指导。

Flink作业样例

提供Flink作业程序开发的样例指导。

使用Flink Jar写入数据到OBS

提供Flink如果将数据处理后写入到OBS的样例代码。

Flink作业相关API

提供Flink相关API的使用说明。

提示

您即将访问非华为云网站,请注意账号财产安全

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容