计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
弹性伸缩 AS
镜像服务 IMS
专属主机 DeH
函数工作流 FunctionGraph
云手机服务器 CPH
Huawei Cloud EulerOS
网络
虚拟私有云 VPC
弹性公网IP EIP
虚拟专用网络 VPN
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
VPC终端节点 VPCEP
云连接 CC
企业路由器 ER
企业交换机 ESW
全球加速 GA
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
边缘安全 EdgeSec
威胁检测服务 MTD
CDN与智能边缘
内容分发网络 CDN
CloudPond云服务
智能边缘云 IEC
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
云搜索服务 CSS
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
数据可视化 DLV
数据湖工厂 DLF
湖仓构建 LakeFormation
企业应用
云桌面 Workspace
应用与数据集成平台 ROMA Connect
云解析服务 DNS
专属云
专属计算集群 DCC
IoT物联网
IoT物联网
设备接入 IoTDA
智能边缘平台 IEF
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
国际站常见问题
ICP备案
我的凭证
支持计划
客户运营能力
合作伙伴支持计划
专业服务
区块链
区块链服务 BCS
Web3节点引擎服务 NES
解决方案
SAP
高性能计算 HPC
视频
视频直播 Live
视频点播 VOD
媒体处理 MPC
实时音视频 SparkRTC
数字内容生产线 MetaStudio
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
存储容灾服务 SDRS
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
专属分布式存储服务 DSS
容器
云容器引擎 CCE
容器镜像服务 SWR
应用服务网格 ASM
华为云UCS
云容器实例 CCI
管理与监管
云监控服务 CES
统一身份认证服务 IAM
资源编排服务 RFS
云审计服务 CTS
标签管理服务 TMS
云日志服务 LTS
配置审计 Config
资源访问管理 RAM
消息通知服务 SMN
应用运维管理 AOM
应用性能管理 APM
组织 Organizations
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
应用身份管理服务 OneAccess
数据库
云数据库 RDS
文档数据库服务 DDS
数据管理服务 DAS
数据复制服务 DRS
云数据库 GeminiDB
云数据库 GaussDB
分布式数据库中间件 DDM
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
人脸识别服务 FRS
图引擎服务 GES
图像识别 Image
内容审核 Moderation
文字识别 OCR
AI开发平台ModelArts
图像搜索 ImageSearch
对话机器人服务 CBS
华为HiLens
视频智能分析服务 VIAS
语音交互服务 SIS
应用中间件
分布式缓存服务 DCS
API网关 APIG
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
多活高可用服务 MAS
事件网格 EG
企业协同
华为云会议 Meeting
云通信
消息&短信 MSGSMS
云生态
合作伙伴中心
云商店
开发者工具
SDK开发指南
API签名指南
Terraform
华为云命令行工具服务 KooCLI
其他
产品价格详情
系统权限
管理控制台
客户关联华为云合作伙伴须知
消息中心
公共问题
开发与运维
应用管理与运维平台 ServiceStage
软件开发生产线 CodeArts
需求管理 CodeArts Req
部署 CodeArts Deploy
性能测试 CodeArts PerfTest
编译构建 CodeArts Build
流水线 CodeArts Pipeline
制品仓库 CodeArts Artifact
测试计划 CodeArts TestPlan
代码检查 CodeArts Check
代码托管 CodeArts Repo
云应用引擎 CAE
开天aPaaS
云消息服务 KooMessage
云手机服务 KooPhone
云空间服务 KooDrive

创建并提交Flink SQL作业

更新时间:2022-12-07 GMT+08:00

使用DLI提交Flink SQL作业进行实时计算。基本流程如下:

步骤1:登录云

步骤2:准备数据源和数据输出通道

步骤3:创建OBS桶保存输出数据

步骤4:登录DLI管理控制台

步骤5:创建队列

步骤6:创建增强型跨源连接

步骤7:创建跨源认证

步骤8:配置安全组规则和测试地址连通性

步骤9:创建Flink SQL作业

样例场景需要创建一个Flink SQL作业,并且该作业有一个输入流和一个输出流。输入流用于从DIS读取数据,输出流用于将数据写入到Kafka中。

步骤1:登录云

使用DLI服务,首先要登录云。

  1. 打开产品首页。
  2. 在登录页面输入“帐号名”“密码”,单击“登录”

步骤2:准备数据源和数据输出通道

DLI Flink作业支持其他服务作为数据源和数据输出通道,具体内容请参见 《数据湖探索用户指南》>《Flink作业管理》>《准备数据》。

本样例中,假设作业名称为“JobSample”,采用DIS服务作为数据源,开通数据接入服务(DIS),具体操作请参见《数据接入服务用户指南》中的开通DIS通道章节。采用分布式消息服务Kafka作为数据输出通道,创建Kafka专享版实例,具体操作请参见《分布式消息服务Kafka用户指南》中的购买实例章节。
  • 创建用于作业输入流的DIS通道:
    1. 登录DIS管理控制台。
    2. 在管理控制台左上角选择区域和项目。
    3. 单击“购买接入通道”配置相关参数。通道信息如下:
      • 区域:选择与DLI服务相同的区域
      • 通道名称:csinput
      • 通道类型:普通
      • 分区数量:1
      • 生命周期(小时):24
      • 源数据类型:BLOB
      • 自动扩缩容:关闭
      • 企业项目:default
      • 高级配置:暂不配置
    4. 单击“立即购买”,进入“规格确认”页面。
    5. 单击“提交”,完成通道接入。
  • 创建用于作业输出流的Kafka专享版实例:
    1. 在创建Kafka实例前您需要提前准备相关依赖资源,包括VPC、子网和安全组,并配置安全组。
      • 创建VPC和子网的操作指导请参考《虚拟私有云用户指南》>创建虚拟私有云和子网,若需要在已有VPC上创建和使用新的子网,请参考《虚拟私有云用户指南》>为虚拟私有云创建新的子网。
        说明:
        • 创建的VPC与使用的Kafka服务应在相同的区域。
        • 创建VPC和子网时,如无特殊需求,配置参数使用默认配置即可。
      • 创建安全组的操作指导请参考《虚拟私有云用户指南》>创建安全组,为安全组添加规则的操作指导请参考《虚拟私有云用户指南》>添加安全组规则。

      更多信息请参考《分布式消息服务Kafka用户指南》中的准备实例依赖资源章节。

    2. 登录分布式消息服务Kafka管理控制台。
    3. 在管理控制台左上角选择区域。
    4. “Kafka专享版”页面,单击右上角“购买Kafka实例”配置相关参数。实例信息如下:
      • 区域:选择与DLI服务相同的区域
      • 项目:默认
      • 可用区:默认
      • 实例名称:kafka-dliflink
      • 企业项目:default
      • 版本:默认
      • CPU架构:默认
      • 规格:选择对应的规格
      • 代理个数:默认
      • 存储空间:默认
      • 容量阈值策略:默认
      • 虚拟私有云,子网:选择1中创建的虚拟私有云和子网。
      • 安全组:选择1中创建的安全组。
      • Manager用户名:dliflink(用于登录实例管理页面)
      • 密码:****(请妥善管理密码,系统无法获取您设置的密码内容)
      • 确认密码:****
      • 更多配置:开启参数“Kafka SASL_SSL”,根据界面提示配置SSL认证的用户名和密码。其他参数可暂不配置。
    5. 单击“立即购买”,弹出“规格确认”页面。
    6. 单击“提交”,完成实例创建。
    7. 在分布式消息服务Kafka管理,单击“Kafka专享版”,单击已创建的Kafka实例名称,例如kafka-dliflink,进入实例详情页面。
    8. 在“基本信息 > 高级配置 > SSL 证书”所在行,单击下载按钮。下载压缩包到本地并解压,获取压缩包中的客户端证书文件:client.truststore.jks,给后续步骤做准备。

步骤3:创建OBS桶保存输出数据

在本样例中,需要为作业“JobSample”开通对象存储服务(OBS),为DLI Flink作业提供Checkpoint、保存作业日志和调试测试数据的存储功能。

具体操作请参见《对象存储服务控制台指南》中的创建桶章节。

  1. 在OBS管理控制台左侧导航栏选择“对象存储”。
  2. 在页面右上角单击“创建桶”,配置桶参数。
    • 区域:选择与DLI服务相同的区域
    • 桶名称:具体根据实际情况选择桶名,例如当前选择:smoke-test
    • 存储类别:标准存储
    • 桶策略:私有
    • 默认加密:关闭
    • 归档数据直读:关闭
    • 企业项目:default
    • 标签:不填写
  3. 单击“立即创建”。

步骤4:登录DLI管理控制台

  1. 在列表中,选择“数据湖探索 DLI”
  2. 进入DLI管理控制台页面。第一次进入数据湖探索管理控制台需要进行授权,以获取访问OBS的权限。

步骤5:创建队列

创建DLI Flink SQL作业,不能使用系统已有的default队列,需要您创建队列,例如创建名为“Flinktest”的队列。创建队列详细介绍请参考《数据湖探索用户指南》>《创建队列》。

  1. 在DLI管理控制台总览页,单击右上角“购买队列”进入购买队列页面。
  2. 配置参数。
    • 队列名称:Flinktest
    • 队列类型:通用队列。勾选“专属资源模式”。
    • 队列规格:16CUs
    • 企业项目:default
    • 描述:不填
    • 高级选项:自定义配置
    • 网段:配置的网段不能与Kafka的子网网段冲突
  3. 单击“立即购买”,确认配置。
  4. 配置确认无误,提交请求。

步骤6:创建增强型跨源连接

创建DLI Flink作业,还需要创建增强型跨源连接。具体操作请参考《数据湖探索用户指南》>《跨源连接》>《增强型跨源连接》。

说明:
  • 绑定跨源的DLI队列网段和数据源网段不能重合。
  • 系统default队列不支持创建跨源连接。
  • 访问跨源表需要使用已经创建跨源连接的队列。
  1. 在DLI管理控制台左侧导航栏中,选择“跨源连接”。
  2. 选择“增强型跨源”页签,单击左上角的“创建”按钮。配置参数:

    • 连接名称:diskafka
    • 绑定队列:Flinktest
    • 虚拟私有云:vpc-dli
    • 子网:dli-subnet
      说明:

      创建跨源连接的虚拟私有云和子网需要和Kafka实例保持一致。

  3. 单击“确定”,完成创建增强型跨源连接。
  4. 在“增强型跨源”页签,单击创建的连接名称:diskafka,查看对等连接ID及连接状态,连接状态为“已激活”表示连接成功。

步骤7:创建跨源认证

创建跨源认证的具体操作请参考《数据湖探索用户指南》>《跨源连接》>《跨源认证》。

  1. 步骤2:准备数据源和数据输出通道中获取的kafka认证文件“client.truststore.jks”上传到步骤3:创建OBS桶保存输出数据中的OBS桶“smoke-test”下。
  2. 在DLI管理控制台选择“跨源连接”。
  3. 在“跨源认证”页签,单击“创建”,创建认证信息。配置参数:
    • 认证信息名称:Flink
    • 类型:Kafka_SSL
    • Truststore路径:obs://smoke-test/client.truststore.jks
    • Truststore密码:dms@kafka

    其余参数可不用配置。

  4. 单击“确定”,完成创建跨源认证。

步骤8:配置安全组规则和测试地址连通性

  1. DLI管理控制台,单击“资源管理 > 队列管理”,选择绑定的队列,点开队列左边的箭头,查看队列详情,获取队列的网段信息。
  2. 登录分布式消息服务Kafka管理控制台,单击“Kafka专享版”,单击已创建的Kafka实例名称,例如kafka-dliflink,进入实例基本信息页面。
  3. 在实例基本信息页面,在“连接地址”配置下的获取Kafka的连接地址和端口。

  4. 在实例基本信息页面,在“网络”配置下的“安全组”,单击安全组名称,进入安全组配置页面。
  5. 在Kafka实例对应的安全组配置页面,单击“入方向规则 > 添加规则”,协议选择“TCP”,端口选择“9093”,源地址填写DLI队列的网段。单击“确定”完成配置。
  6. 登录DLI管理控制台,选择“资源管理 > 队列管理”,在所在Flink队列行,单击“更多 > 测试地址连通性”,在“地址”参数中按照“IP:端口”的格式输入Kafka的连接地址和端口,单击“测试”,返回地址可达后进行后续操作步骤。注意多个地址要分开单独测试。

步骤9:创建Flink SQL作业

准备好数据源和数据输出通道之后,就可以创建Flink SQL作业了。

  1. 在DLI管理控制台的左侧导航栏中,单击“作业管理”>“Flink作业”,进入“Flink作业”页面。
  2. 在“Flink作业”页面右上角单击“创建作业”,弹出“创建作业”对话框。配置参数:
    • 类型:Flink SQL
    • 名称:DIS-Flink-Kafka
    • 描述:不填
    • 模板名称:不选择
  3. 单击“确定”,进入作业“编辑”页面。
  4. 编辑SQL作业。

    在SQL语句编辑区域,输入详细的SQL语句。具体如下,注意以下加粗的参数值都需要根据注释提示修改。

    CREATE SOURCE STREAM car_info (
      a1 string,
      a2 string,
      a3 string,
      a4 INT
    )
    WITH (
      type = "dis",
      region = "xxx",//需要修改为当前DLI队列所在的region
      channel = "csinput",
      encode = "csv",
      FIELD_DELIMITER = ";"
    );
    
     CREATE SINK STREAM kafka_sink ( 
      a1 string,
      a2 string,
      a3 string,
      a4 INT
    ) // 输出字段
    
    WITH (
      type="kafka",
      kafka_bootstrap_servers =  "192.x.x.x:9093, 192.x.x.x:9093, 192.x.x.x:9093",//需要修改为kafka实例的连接地址
      kafka_topic = "testflink", // 要写入kafka的topic,进入kafka控制台,单击已创建的Kafka实例名称,在Topic管理查看Topic名称
      encode = "csv", // 编码格式,支持json/csv
      kafka_certificate_name = "Flink",//kafka_certificate_name参数值为步骤7中创建的kafka跨源认证名称
      kafka_properties_delimiter = ",",
      //kafka_properties中的username和password的值xxx需要替换为步骤2中kafka创建SSL认证的用户名和密码
      kafka_properties = "sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxx\" password=\"xxx\";,sasl.mechanism=PLAIN,security.protocol=SASL_SSL"
    );
    
    INSERT INTO kafka_sink
    SELECT * FROM car_info;
    
    CREATE sink STREAM car_info1 (
      a1 string,
      a2 string,
      a3 string,
      a4 INT
    )
    WITH (
      type = "dis",
      region = "xxx",//需要修改为当前DLI队列所在的region
      channel = "csinput",
      encode = "csv",
      FIELD_DELIMITER = ";"
    );
    
    insert into car_info1 select 'id','owner','brand',1;
    insert into car_info1 select 'id','owner','brand',2;
    insert into car_info1 select 'id','owner','brand',3;
    insert into car_info1 select 'id','owner','brand',4;
    insert into car_info1 select 'id','owner','brand',5;
    insert into car_info1 select 'id','owner','brand',6;
    insert into car_info1 select 'id','owner','brand',7;
    insert into car_info1 select 'id','owner','brand',8;
    insert into car_info1 select 'id','owner','brand',9;
    insert into car_info1 select 'id','owner','brand',10;
  5. 单击“语义校验”,确保语义校验成功。
  6. 设置作业运行参数。配置必选参数:
    • 所属队列:Flinktest
    • CU数量:2
    • 管理单元:1
    • 并行数:1
    • 保存作业日志:勾选
    • OBS桶:选择作业日志保存的OBS桶,并进行授权。

    其余参数可不用配置。

  7. 单击“保存”,保存作业和相关参数。
  8. 单击“启动”,进入“启动Flink作业”页面,确认作业规格和费用后,单击“立即启动”,启动作业。

    启动作业后,系统将自动跳转到Flink作业管理页面,新创建的作业将显示在作业列表中,在“状态”列中可以查看作业状态。作业提交成功后,状态将由“提交中”变为“运行中”。

    如果作业状态为“提交失败”或“运行异常”,表示作业提交或运行失败。用户可以在作业列表中的“状态”列中,将鼠标移动到状态图标上查看错误信息,单击可以复制错误信息。根据错误信息解决故障后,重新提交。

  9. 作业运行完成后,可登录分布式消息服务Kafka管理控制台,查看对应的Kafka专享实例。单击实例名称,选择“消息查询”页签,选择Flink SQL作业中写入的kafka的Topic名称,单击“搜索”,在操作列单击“查看消息正文”查看写入的消息内容。

我们使用cookie来确保您的高速浏览体验。继续浏览本站,即表示您同意我们使用cookie。 详情

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容