文档首页> > 快速入门

快速入门

分享
更新时间: 2019/03/12 16:56

介绍快速使用CS服务。基本流程如下:

步骤1:准备数据源和数据输出通道

步骤2:创建OBS桶保存输出数据

步骤3:申请实时流计算服务

步骤4:创建作业并提交

步骤5:发送数据到DIS服务

步骤6:查看作业信息及运行结果

步骤7:查看其它资料

步骤8:删除作业

为了方便大家了解使用过程,我们将通过一个简单的Flink SQL作业样例,介绍如何快速使用CS服务。如果需要了解Flink SQL边缘作业、Flink自定义作业和Spark自定义作业,请参见《实时流计算服务用户指南》“作业管理”章节。

样例场景为实时录入车辆信息并输出价格低于30万的奥迪车辆信息。

针对该场景,我们需要创建一个作业,并且该作业有一个输入流和一个输出流。输入流用于实时录入车辆信息,输出流用于输出价格低于30万的奥迪车辆信息。

前提条件

用户已经注册了一个公有云账号。

步骤1:准备数据源和数据输出通道

CS服务支持其他服务作为数据源和数据输出通道,具体内容请参见《实时流计算服务用户指南》准备数据章节。

本样例中采用DIS服务作为数据源和数据输出通道,则我们需要为作业“JobSample”开通数据接入服务(DIS),为CS服务提供数据源和数据输出通道。

例如,为作业“JobSample”开通如下两个DIS通道,具体操作请参见 《数据接入服务用户指南》中的 开通DIS通道章节。
  • 用于作业输入流的DIS通道信息:

    通道名称:csinput

    通道类型:普通

    分区数量:1

    源数据类型:BLOB

    生命周期(天):1

    数据转储:关闭

  • 用于作业输出流的DIS通道信息:

    通道名称:csoutput

    通道类型:普通

    分区数量:1

    源数据类型:BLOB

    生命周期(天):1

    数据转储:关闭

说明:

参数“通道类型”“源数据类型”“生命周期(天)”,用户选择默认即可。

步骤2:创建OBS桶保存输出数据

在本样例中,需要为作业“JobSample”开通对象存储服务(OBS),为CS服务提供Checkpoint、保存作业日志和调试测试数据的存储功能。

例如,为作业“JobSample”创建如下OBS桶,具体操作请参见《对象存储服务控制台指南》中的创建桶章节。

创建的OBS桶信息:

区域:华北-北京一

桶名称:smoke-test

存储类别:标准存储

桶策略:私有

高级设置:不配置

说明:

参数“桶策略”“高级设置”选择默认设置即可。

步骤3:申请实时流计算服务

用户可通过浏览器登录CS管理控制台,并在管理控制台页面申请实时流计算服务。

  1. 登录CS管理控制台,地址为:https://console.huaweicloud.com/cs/

    如您尚未注册公有云,可单击“免费注册”,根据界面提示注册一个公有云账号。

  2. 进入申请实时流计算服务页面。

    图1 申请实时流计算服务

  1. 勾选“我已阅读并同意《华为云用户协议》”,然后单击“申请”
  2. 申请实时流计算服务成功后,系统将自动跳转到“总览”页面。
  3. 系统会自动弹出“CS服务权限委托”窗口,单击“前往授权”

    图2 CS服务权限委托

  4. “云资源访问授权”页面,单击“同意授权”

    图3 云资源访问授权

步骤4:创建作业并提交

使用实时流计算服务,首先要创建一个作业,如“JobSample”。

  1. 在CS管理控制台的左侧导航栏中,单击作业管理,进入“作业管理”页面。

    图4 作业管理

  2. 单击“新建”,弹出“新建作业”页面,配置作业信息。

    图5 新建作业

  3. 单击“确认”,进入“编辑”页面,在SQL语句编辑区域中会显示步骤 2选择的模板的SQL语句。

    图6 作业编辑

  4. 在SQL语句编辑区域中,根据作业的实际需要,编辑SQL语句。

    在本样例中,我们使用 步骤 2选择的模板的SQL语句即可,具体SQL语句如下所示。
    /**
      * 该示例为流分析场景通用模板。数据的输入源和输出通道均由DIS服务提供,需先开通DIS服务并创建相应的输入输出通道。
      * >>>>>>>>>请务必确保您的账户下已在数据接入服务(DIS)里创建了您配置的通道<<<<<<<<<<
      *
      * >>>>>样例输入<<<<<
      *  流名: car_infos(car_id,car_owner,car_brand,car_price):
      *  1,lilei,bmw320i,28
      *  2,hanmeimei,audia4,27
      * >>>>>样例输出<<<<<
      *  流名: audi_cheaper_than_30w(car_id,car_owner,car_brand,car_price):
      *  2,hanmeimei,audia4,27
      **/
    
    /** 创建输入流,从DIS的csinput通道获取数据。
      *
      * 根据实际情况修改以下选项:
      * channel:数据所在通道名
      * partition_count:该通道分区数
      * encode: 数据编码方式,可以是csv或json
      * field_delimiter:当编码格式为csv时,属性之间的分隔符
      **/
    CREATE SOURCE STREAM car_infos (
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_price INT
    )
    WITH (
      type = "dis",
      region = "cn-north-1",
      channel = "csinput",
      partition_count = "1",
      encode = "csv",
      field_delimiter = ","
    );
    
    /** 创建输出流,结果输出到DIS的csoutput通道。
      *
      * 根据实际情况修改以下选项:
      * channel:数据所在通道名
      * partition_key:当通道有多个分区时用来分发的主键
      * encode: 结果编码方式,可以为csv或者json
      * field_delimiter: 当编码格式为csv时,属性之间的分隔符
      **/
    CREATE SINK STREAM audi_cheaper_than_30w (
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_price INT
    )
    WITH (
      type = "dis",
      region = "cn-north-1",
      channel = "csoutput",
      partition_key = "car_owner",
      encode = "csv",
      field_delimiter = ","
    );
    
    /** 将低于30W的奥迪车辆信息输出 **/
    INSERT INTO audi_cheaper_than_30w
    SELECT *
    FROM car_infos
    WHERE car_brand like "audi%" and car_price < 30;
    
    /**************************往输入流csinput插入测试数据*************************/
    CREATE SINK STREAM car_info_data (
      car_id STRING,
      car_owner STRING,
      car_brand STRING,
      car_price INT
    )
    WITH (
      type ="dis",
      region = "cn-north-1",
      channel = "csinput",
      partition_key = "car_owner",
      encode = "csv",
      field_delimiter = ",\n"
    );
    
    INSERT INTO car_info_data
    SELECT "1", "lilei", "bmw320i", 28;
    INSERT INTO car_info_data
    SELECT "2", "hanmeimei", "audia4", 27;
    /************************往输入流插入测试数结束*********************************/
    
    说明:

    SQL语句主要包含3个部分:

    • 创建输入流:流定义的字段需要与入流的数据源格式一致。流定义的with中的参数需要与入流DIS通道的信息一致。
    • 创建输出流:流定义的with中的参数需要与出流的DIS通道的信息一致。
    • 业务逻辑:根据需求场景编写SQL语句,并将结果数据插入到出流。

  5. 单击“语义校验”,确保语义校验成功。

    • 只有语义校验成功后,才可以执行“调试”“提交”“启动”作业的操作。
    • 如果校验成功,提示“SQL语义校验成功”。
    • 如果校验失败,会在错误的SQL语句前面显示红色的“X”记号,鼠标移动到“X”号上可查看详细错误,请根据错误提示修改SQL语句。

  6. “编辑”页面的右侧“运行参数设置”区域,参数配置如下。

    • 配置SPUs并行数,使用默认值即可。
    • 勾选“保存作业日志”,并选择OBS桶“smoke-test”,用户新创建的OBS桶是未授权状态,需要单击“OBS授权”
    • “作业所属集群”,默认选择“共享集群”,用户也可以选择自定义的独享集群。如何创建自定义的独享集群,请参见《实时流计算服务用户指南》“新建集群”
    • “作业所属集群”,默认选择“共享集群”,用户也可以选择自定义的独享集群。如何创建自定义的独享集群,请参见《实时流计算服务用户指南》新建集群
    图7 运行参数设置
    说明:

    CS服务提供了调试功能,供用户使用测试数据对作业的业务逻辑进行验证。具体的使用方法请参考《实时流计算服务用户指南》调试作业章节。

  7. 单击“保存”,保存作业和相关参数。
  8. 单击“提交”,进入“作业配置清单”页面,单击“确认”,将作业提交并启动。

    提交作业后,系统将自动跳转到作业管理页面,新创建的作业将显示在作业列表中,在状态一列中可以查看作业状态。作业提交成功后,状态将由提交中变为运行中

    图8 作业状态

    如果作业状态为提交失败运行异常,表示作业提交或运行失败。请在作业列表中,单击作业名称“JobSample”,然后单击“运行日志”,查看作业运行日志。请根据日志解决故障后,再重新提交作业。

步骤5:发送数据到DIS服务

前面已经申请了DIS通道作为数据源,提交作业后,用户可以将数据通过DIS通道不断上传至DIS服务,实现向CS服务的作业提供实时流数据源。

在本样例中,我们将构造本地数据并通过入流的DIS通道“csinput”上传至DIS服务,具体操作请参见《数据接入服务用户指南》中的发送数据到DIS服务章节。

样例数据示例如下,录入车辆信息,每条记录包含4个字段,即车牌号、车主姓名、车辆品牌、车辆价格:

1,lilei,bmw320i,28
2,hanmeimei,audia4,27

步骤6:查看作业信息及运行结果

启动作业后,可以查看作业的运行情况。操作步骤如下:

  1. 在CS管理控制台的左侧导航栏中,单击作业管理,进入“作业管理”页面。
  2. 在作业列表中,单击作业名称“JobSample”,查看作业的详细信息。

    图9 作业监控

    详细内容请参见《实时流计算服务用户指南》监控作业章节。

    如果用户需要查看作业的输出结果,需要到DIS上查看用于作业输出流的DIS通道上的数据,详细操作请参见《数据接入服务用户指南》从DIS 获取数据章节。

步骤7:查看其它资料

完成如上入门操作步骤后,我们推荐您可以参考如下资料继续对实时流计算服务进行更详细深入的了解:

步骤8:删除作业

样例作业运行成功后,如果不再需要本样例作业,可以将它删除,以免浪费资源或占用配额。

说明:

作业删除后无法恢复,请谨慎操作。

  1. 在CS管理控制台的左侧导航栏中,单击作业管理,进入“作业管理”页面。
  2. 作业管理页面的“JobSample”作业所在行,单击更多 > 删除
  3. 在弹出的确认对话框中,单击“确认”完成操作。
如果您喜欢这篇文档,您还可以:

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

跳转到云社区