更新时间:2024-10-28 GMT+08:00

自定义SparkStreaming作业

获取DIS Spark Streaming Demo

  1. 这里获取“dis-spark-streaming-X.X.X.zip”压缩包。解压“dis-spark-streaming-X.X.X.zip”压缩包,解压之后获得以下目录:

    “dis-spark-streaming-demo”目录包含一个Maven工程样例。

编写SparkStreaming作业

以IntelliJ IDEA社区版为例,说明如何编写SparkStreaming作业。请先确保在IDEA上已经正确配置好

  • JDK 1.8+
  • Scala-sdk-2.11
  • Maven 3.3.*

  1. 打开IntelliJIDEA,选择File > Open。选择解压至本地的dis-spark-streaming-demo目录,双击pom.xml。

  2. 当弹出如下对话框,请选择“Open as Project”,作为工程打开。

  3. 单击“New Window”,在新窗口打开此工程。

  4. 在新打开的IDEA窗口中,单击File > Settings

  5. 在搜索框搜索maven,找到maven的配置,请确保Maven home directory(Maven安装路径),User settings file (settings.xml文件位置)和Local repository(本地仓库地址)配置正确。

    若不正确,请修改,否则步骤 2中安装的sdk无法找到。

  6. 打开DISSparkStreamingExample文件,如果IDEA提示“No Scala SDK in module”,请单击旁边的“Setup Scala SDK”,会显示Scala SDK列表(如果没有可以创建一个并关联scala路径),选择2.11版本的即可。

  7. 在pom.xml上单击右键,选择Maven > Reimport,重新引入maven依赖库。

  8. 此时IDEA打开的DISSparkStreamingExample文件内没有错误即表示开发环境配置成功,此文件的逻辑是读取DIS通道中的数据并统计每个单词出现次数。

    1. DISSparkStreamingExample是一个使用Assign模式的样例,不具备停止再启动时从上一次停止位置开始的能力。使用到的SDK构造方法如下:
    ConsumerStrategies.Assign[String, String](streamName, params, startingOffsets)
    • streamName为DIS通道名称。
    • params为参数Map集合,至少包括endpoint(DIS网关地址),region(区域),ak(用户ak),sk(用户sk),projectId(用户项目ID)。
    • startingOffsets为读取DIS数据的起始位置,LATEST表示从最新的数据开始读取;EARLIEST表示从最旧的数据开始读取;如果要指定每个分区的精确起始位置,则可以写为json字符串,例如{"0":23,"1":-1,"2":-2}表示第0分区起始位置是23,第1分区从最新数据的位置开始,第2分区从最老数据的位置开始,如果有分区没有指定位置,则默认从最新数据位置开始。
    1. DISSparkStreamingSubscribeExample是一个使用Subscribe模式的样例,具备停止再启动时从上一次停止位置开始的能力。使用到的SDK构造方法如下:
    ConsumerStrategies.Subscribe[String, String](Array(streamName), params)
    • streamName为DIS通道名称
    • params为参数Map集合,至少包括endpoint(DIS网关地址),region(区域),ak(用户ak),sk(用户sk),projectId(用户项目ID),group.id(app名称,表示某一个消费组);还可以包含auto.offset.reset,参数含义同Assign模式下的startingOffsets;另外一个参数enable.auto.commit,设置为true会自动每隔5000ms(可通过设置auto.commit.interval.ms修改)提交一次offset,设置为false则不自动提交,用户可以手动调用commitAsync提交,参见示例代码中如下部分
    stream.foreachRDD { rdd =>
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          // commit offset to DIS async.
          stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    }

验证sparkStreaming作业

实际场景中,SparkStreming作业需要提交在Spark集群上运行,但本次验证只介绍在本地IDE上测试,目的是了解sdk基本使用方法。测试完成后用户可自行创建集群(如MRS集群)并提交作业验证。

  1. 使用注册账户登录DIS控制台
  2. 单击管理控制台左上角的,选择区域和项目。
  3. 参考步骤1:开通DIS通道申请开通DIS通道,并持续上传数据到新创建的DIS通道。本次范例上传的内容为hello world。
  4. 打开pom.xml文件,选择<scope>provided</scope>这一行,并按Ctrl+/注释掉此行并保存。

  5. 右键单击pom.xml,选择Maven > Reimport,重新引入依赖包。

  6. 在DISSparkStreamingExample文件内任意地方,右键选择“Create 'DISSparkStreamingExample'”

  7. 在打开的配置页面中,“VM options”中输入-Dspark.master=local[*],表示用local模式运行spark作业;“Program arguments”中输入运行参数,格式为 :

    DIS网关地址Region名称AK SK ProjectID 通道名称起始位置Streaming批次时间
     
    https://dis.${region}.cloud.com ${region} YOU_AK YOU_SK YOU_PROJECTID YOU_STREAM_NAME latest 10

    参数顺序与含义在示例代码中有,可以参考。

    最终IDEA的配置如下图所示,确认无误后单击“OK”关闭此窗口。

  8. 在DISSparkStreamingExample文件内任意地方,右键选择“Run 'DISSparkStreamingExample'”,即可启动作业。

  9. 启动过程中会报一个hadoop binary path的错误,这个可以忽略。

    18/08/28 10:26:10 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
    java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

  10. 如果没有其他错误,则作业会每隔duration运行一个批次,从DIS读取此批次内的数据并输出结果,示例如下:

    -------------------------------------------
    Time: 1535423650000 ms
    -------------------------------------------
    (hello,30)
    (world.,30)

  11. 在本地运行作业验证无误之后,请把pom.xml中的<scope>provided</scope>解除注释(防止以后打包会把spark依赖也打进来),然后停止数据上传程序。