更新时间:2024-08-03 GMT+08:00

Spark Structured Streaming对接Kafka样例程序开发思路

场景说明

假定一个广告业务,存在广告请求事件、广告展示事件、广告点击事件,广告主需要实时统计有效的广告展示和广告点击数据。

已知:

  1. 终端用户每次请求一个广告后,会生成广告请求事件,保存到kafka的adRequest topic中。
  2. 请求一个广告后,可能用于多次展示,每次展示,会生成广告展示事件,保存到kafka的adShow topic中。
  3. 每个广告展示,可能会产生多次点击,每次点击,会生成广告点击事件,保存到kafka的adClick topic中。
  4. 广告有效展示的定义如下:
    1. 请求到展示的时长超过A分钟算无效展示。
    2. A分钟内多次展示,每次展示事件为有效展示。
  5. 广告有效点击的定义如下:
    1. 展示到点击时长超过B分钟算无效点击。
    2. B分钟内多次点击,仅首次点击事件为有效点击。

基于此业务场景,模拟简单的数据结构如下:

  • 广告请求事件

    数据结构:adID^reqTime

  • 广告展示事件

    数据结构:adID^showID^showTime

  • 广告点击事件

    数据结构:adID^showID^clickTime

数据关联关系如下:

  • 广告请求事件与广告展示事件通过adID关联。
  • 广告展示事件与广告点击事件通过adID+showID关联。

数据要求:

  • 数据从产生到到达流处理引擎的延迟时间不超过2小时
  • 广告请求事件、广告展示事件、广告点击事件到达流处理引擎的时间不能保证有序和时间对齐

数据规划

  1. 在kafka中生成模拟数据(需要有Kafka权限用户)。
    java -cp $SPARK_HOME/conf:$SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaADEventProducer {BrokerList} {timeOfProduceReqEvent} {eventTimeBeforeCurrentTime} {reqTopic} {reqEventCount} {showTopic} {showEventMaxDelay} {clickTopic} {clickEventMaxDelay}
    • 确保集群安装完成,包括HDFS、Yarn、Spark2x和Kafka。
    • 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。
    • 启动Kafka的Producer,向Kafka发送数据。
    • {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考在Linux环境中调测Spark应用章节中导出jar包的操作步骤。

    命令举例:

    java -cp /opt/client/Spark2x/spark/conf:/opt/StructuredStreamingADScalaExample-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.KafkaADEventProducer 10.132.190.170:21005,10.132.190.165:21005 2h 1h req 10000000 show 5m click 5m

    此命令将在kafka上创建3个topic:req、show、click,在2h内生成1千万条请求事件数据,请求事件的时间取值范围为{当前时间-1h 至 当前时间},并为每条请求事件随机生成0-5条展示事件,展示事件的时间取值范围为{请求事件时间 至请求事件时间+5m },为每条展示事件随机生成0-5条点击事件,点击事件的时间取值范围为{展示事件时间 至展示事件时间+5m }

开发思路

  1. 使用Structured Streaming接收Kafka中数据,生成请求流、展示流、点击流。
  2. 对请求流、展示流、点击流的数据进行关联查询。
  3. 统计结果写入kafka。
  4. 应用中监控流处理任务的状态。

运行前置操作

安全模式下Spark Core样例代码需要读取两个文件(user.keytab、krb5.conf)。user.keytab和krb5.conf文件为安全模式下的认证文件,需要在FusionInsight Manager中下载principal用户的认证凭证,样例代码中使用的用户为:sparkuser,需要修改为准备好的开发用户。

打包项目

  • 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。
  • 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用

    编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。

  • 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。

运行任务

在运行样例程序时需要指定 <kafkaBootstrapServers> <maxEventDelay> <reqTopic> <showTopic> <maxShowDelay> <clickTopic> <maxClickDelay> <triggerInterver> <checkpointDir> <kafkaProtocol> <kafkaService> <kafkaDomain>,其中<kafkaBootstrapServers>指获取元数据的Kafka地址(需使用21007端口),<maxEventDelay>指数据从生成到被流处理引擎的最大延迟时间,<reqTopic>指请求事件的topic名称,<showTopic>指展示事件的topic名称,<maxShowDelay>指有效展示事件的最大延迟时间,<clickTopic>指点击事件的topic名称,<maxClickDelay>指有效点击事件的最大延迟时间,<triggerInterver>指流处理任务的触发间隔,<checkpointDir>指checkpoint文件存放路径,<kafkaProtocol>指安全访问协议(如SASL_PLAINTEXT),<kafkaService>指kerberos服务名称(如kafka),<kafkaDomain>指kerberos域名(如hadoop.<系统域名>)。

由于Spark Structured Streaming Kafka的依赖包在客户端的存放路径与其他依赖包不同,如其他依赖包路径为“$SPARK_HOME/jars”,而Spark Structured Streaming Kafka依赖包路径为“$SPARK_HOME/jars/streamingClient010”。所以在运行应用程序时,需要在spark-submit命令中添加配置项,指定Spark Streaming Kafka的依赖包路径,如--jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}")

由于运行模式为安全模式,需要添加新配置并修改命令参数:
  1. $SPARK_HOME/conf/jaas.conf添加新配置:
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=false
    useTicketCache=true
    debug=false;
    };
  2. $SPARK_HOME/conf/jaas-zk.conf添加新配置:
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="./user.keytab"
    principal="sparkuser@<系统域名>"
    useTicketCache=false
    storeKey=true
    debug=true;
    };
  3. 使用--files和相对路径提交keytab文件,这样才能保证keytab文件被加载到executor的container中

用户提交结构流任务时,通常需要通过--jars命令指定kafka相关jar包的路径,当前版本用户除了这一步外还需要将$SPARK_HOME/jars/streamingClient010目录中的kafka-clients jar包复制到$SPARK_HOME/jars目录下,否则会报class not found异常。

进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令如下(类名与文件名等请与实际代码保持一致,此处仅为示例):

bin/spark-submit --master yarn --deploy-mode client --files <local Path>/jaas.conf,<local path>/user.keytab --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --conf "spark.sql.streaming.statefulOperator.checkCorrectness.enabled=false" --class com.huawei.bigdata.spark.examples.KafkaADCount /opt/StructuredStreamingADScalaExample-1.0.jar <kafkaBootstrapServers> <maxEventDelay> <reqTopic> <showTopic> <maxShowDelay> <clickTopic> <maxClickDelay> <triggerInterver> <checkpointDir> <kafkaProtocol> <kafkaService> <kafkaDomain>

其中配置示例如下:

--files ./jaas.conf,./user.keytab //使用--files指定jaas.conf和keytab文件。