快速入门
介绍快速使用CS服务。基本流程如下:
为了方便大家了解使用过程,我们将通过一个简单的Flink SQL作业样例,介绍如何快速使用CS服务。如果需要了解Flink SQL边缘作业、Flink自定义作业和Spark自定义作业,请参见《实时流计算服务用户指南》中“作业管理”章节。
样例场景为实时录入车辆信息并输出价格低于30万人民币的奥迪车辆信息。
针对该场景,我们需要创建一个作业,并且该作业有一个输入流和一个输出流。输入流用于实时录入车辆信息,输出流用于输出价格低于30万人民币的奥迪车辆信息。
前提条件
用户已经注册了一个公有云账号。
步骤1:准备数据源和数据输出通道
CS服务支持其他服务作为数据源和数据输出通道,具体内容请参见《实时流计算服务用户指南》中准备数据章节。
本样例中采用DIS服务作为数据源和数据输出通道,则我们需要为作业“JobSample”开通数据接入服务(DIS),为CS服务提供数据源和数据输出通道。
- 用于作业输入流的DIS通道信息:
通道名称:csinput
通道类型:普通
分区数量:1
源数据类型:BLOB
生命周期(天):1
数据转储:关闭
- 用于作业输出流的DIS通道信息:
通道名称:csoutput
通道类型:普通
分区数量:1
源数据类型:BLOB
生命周期(天):1
数据转储:关闭

参数“通道类型”、“源数据类型”和“生命周期(天)”,用户选择默认即可。
步骤2:创建OBS桶保存输出数据
在本样例中,需要为作业“JobSample”开通对象存储服务(OBS),为CS服务提供Checkpoint、保存作业日志和调试测试数据的存储功能。
例如,为作业“JobSample”创建如下OBS桶,具体操作请参见《对象存储服务控制台指南》中的 章节。
创建的OBS桶信息:
区域:华北-北京一
桶名称:smoke-test
存储类别:标准存储
桶策略:私有

其他参数选择默认设置即可。
步骤3:申请实时流计算服务
用户可通过浏览器登录CS管理控制台,并在管理控制台页面申请实时流计算服务。
- 登录CS管理控制台,地址为:https://console.huaweicloud.com/cs/。
如您尚未注册公有云,可单击“免费注册”,根据界面提示注册一个公有云账号。
- 进入 页面。图1 申请实时流计算服务
- 勾选“我已阅读并同意《华为云用户协议》”,然后单击“申请”。
- 申请实时流计算服务成功后,系统将自动跳转到“总览”页面。
- 系统会自动弹出“CS服务权限委托”窗口,单击“前往授权”。图2 CS服务权限委托
- 在“云资源访问授权”页面,单击“同意授权”。图3 云资源访问授权
步骤4:创建作业并提交
使用实时流计算服务,首先要创建一个作业,如“JobSample”。
- 在CS管理控制台的左侧导航栏中,单击“作业管理”页面。 ,进入图4 作业管理
- 单击“新建”,弹出“新建作业”页面,配置作业信息。图5 新建作业
- 单击“确认”,进入“编辑”页面,在SQL语句编辑区域中会显示步骤 2选择的模板的SQL语句。图6 作业编辑
- 在SQL语句编辑区域中,根据作业的实际需要,编辑SQL语句。在本样例中,我们使用步骤 2选择的模板的SQL语句即可,具体SQL语句如下所示。
/** * 该示例为流分析场景通用模板。数据的输入源和输出通道均由DIS服务提供,需先开通DIS服务并创建相应的输入输出通道。 * >>>>>>>>>请务必确保您的账户下已在数据接入服务(DIS)里创建了您配置的通道<<<<<<<<<< * * >>>>>样例输入<<<<< * 流名: car_infos(car_id,car_owner,car_brand,car_price): * 1,lilei,bmw320i,28 * 2,hanmeimei,audia4,27 * >>>>>样例输出<<<<< * 流名: audi_cheaper_than_30w(car_id,car_owner,car_brand,car_price): * 2,hanmeimei,audia4,27 **/ /** 创建输入流,从DIS的csinput通道获取数据。 * * 根据实际情况修改以下选项: * channel:数据所在通道名 * partition_count:该通道分区数 * encode: 数据编码方式,可以是csv或json * field_delimiter:当编码格式为csv时,属性之间的分隔符 **/ CREATE SOURCE STREAM car_infos ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", region = "cn-north-1" , channel = "csinput", partition_count = "1", encode = "csv", field_delimiter = "," ); /** 创建输出流,结果输出到DIS的csoutput通道。 * * 根据实际情况修改以下选项: * channel:数据所在通道名 * partition_key:当通道有多个分区时用来分发的主键 * encode: 结果编码方式,可以为csv或者json * field_delimiter: 当编码格式为csv时,属性之间的分隔符 **/ CREATE SINK STREAM audi_cheaper_than_30w ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type = "dis", channel = "csoutput", region = "cn-north-1" , partition_key = "car_owner", encode = "csv", field_delimiter = "," ); /** 将低于30W的奥迪车辆信息输出 **/ INSERT INTO audi_cheaper_than_30w SELECT * FROM car_infos WHERE car_brand like "audi%" and car_price < 30; /**************************往输入流csinput插入测试数据*************************/ CREATE SINK STREAM car_info_data ( car_id STRING, car_owner STRING, car_brand STRING, car_price INT ) WITH ( type ="dis", region = "cn-north-1" , channel = "csinput", partition_key = "car_owner", encode = "csv", field_delimiter = ",\n" ); INSERT INTO car_info_data SELECT "1", "lilei", "bmw320i", 28; INSERT INTO car_info_data SELECT "2", "hanmeimei", "audia4", 27; /************************往输入流插入测试数结束*********************************/
说明:
SQL语句主要包含3个部分:
- 创建输入流:流定义的字段需要与入流的数据源格式一致。流定义的with中的参数需要与入流DIS通道的信息一致。
- 创建输出流:流定义的with中的参数需要与出流的DIS通道的信息一致。
- 业务逻辑:根据需求场景编写SQL语句,并将结果数据插入到出流。
- 单击“语义校验”,确保语义校验成功。
- 只有语义校验成功后,才可以执行“调试”、“提交”或“启动”作业的操作。
- 如果校验成功,提示“SQL语义校验成功”。
- 如果校验失败,会在错误的SQL语句前面显示红色的“X”记号,鼠标移动到“X”号上可查看详细错误,请根据错误提示修改SQL语句。
- 在“编辑”页面的右侧“运行参数设置”区域,参数配置如下。
- 配置 和 ,使用默认值即可。
- 勾选“保存作业日志”,并选择OBS桶“smoke-test”,用户新创建的OBS桶是未授权状态,需要单击“OBS授权”。
- “作业所属集群”,默认选择“共享集群”,用户也可以选择自定义的独享集群。如何创建自定义的独享集群,请参见《实时流计算服务用户指南》中新建集群。
图7 运行参数设置 - 单击“保存”,保存作业和相关参数。
- 单击“提交”,进入“作业配置清单”页面,单击“确认”,将作业提交并启动。
提交作业后,系统将自动跳转到
页面,新创建的作业将显示在作业列表中,在 一列中可以查看作业状态。作业提交成功后,状态将由 变为 。图8 作业状态如果作业状态为
或 ,表示作业提交或运行失败。请在作业列表中,单击作业名称“JobSample”,然后单击“运行日志”,查看作业运行日志。请根据日志解决故障后,再重新提交作业。
步骤5:发送数据到DIS服务
前面已经申请了DIS通道作为数据源,提交作业后,用户可以将数据通过DIS通道不断上传至DIS服务,实现向CS服务的作业提供实时流数据源。
在本样例中,我们将构造本地数据并通过入流的DIS通道“csinput”上传至DIS服务,具体操作请参见《数据接入服务用户指南》中的 章节。
样例数据示例如下,录入车辆信息,每条记录包含4个字段,即车牌号、车主姓名、车辆品牌、车辆价格:
1,lilei,bmw320i,28 2,hanmeimei,audia4,27
步骤6:查看作业信息及运行结果
启动作业后,可以查看作业的运行情况。操作步骤如下:
- 在CS管理控制台的左侧导航栏中,单击“作业管理”页面。 ,进入
- 在作业列表中,单击作业名称“JobSample”,查看作业的详细信息。图9 作业监控
详细内容请参见《实时流计算服务用户指南》中监控作业章节。
如果用户需要查看作业的输出结果,需要到DIS上查看用于作业输出流的DIS通道上的数据,详细操作请参见《数据接入服务用户指南》中 章节。
步骤7:查看其它资料
完成如上入门操作步骤后,我们推荐您可以参考如下资料继续对实时流计算服务进行更详细深入的了解:
- 《实时流计算服务用户指南》:本指南在此入门的基础上,对作业、模板和集群的概念和相关操作提供全面详细的信息。
- 《实时流计算服务API参考》:本API参考主要是帮助您了解实时流计算服务接口使用的方法,让您可以使用API接口对作业进行相关操作。
- 《实时流计算服务流生态开发指南》:本指南介绍了实时流计算服务的云服务生态和开源生态,指导您如何利用流生态进行开发。
- 《实时流计算服务SDK参考》:本SDK参考指导您如何安装和配置开发环境,如何通过调用CS SDK提供的接口函数进行二次开发。
- 《实时流计算服务SQL语法参考》:本SQL语法详细介绍实时流计算服务的常用SQL语法。
