更新时间:2024-08-03 GMT+08:00

Spark Streaming对接Kafka0-10样例程序开发思路

场景说明

假定某个业务Kafka每1秒就会收到1个单词记录。

基于某些业务要求,开发的Spark应用程序实现如下功能:

实时累加计算每个单词的记录总数。

“log1.txt”示例文件:

LiuYang
YuanJing
GuoYijun
CaiXuyu
Liyuan
FangBo
LiuYang
YuanJing
GuoYijun
CaiXuyu
FangBo

数据规划

Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。
  1. 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。
  2. 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”

    在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。

  3. 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”
  4. 创建Topic。

    {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。

    $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic}

  5. 启动Kafka的Producer,向Kafka发送数据。

    java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic}

    其中,ClassPath应包含Spark客户端Kafka jar包的绝对路径,如/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*

开发思路

  1. 接收Kafka中数据,生成相应DStream。
  2. 对单词记录进行分类统计。
  3. 计算结果,并进行打印。

运行前置操作

安全模式下Spark Core样例代码需要读取两个文件(user.keytab、krb5.conf)。user.keytab和krb5.conf文件为安全模式下的认证文件,需要在FusionInsight Manager中下载principal用户的认证凭证,样例代码中使用的用户为:sparkuser,需要修改为准备好的开发用户。

打包项目

  • 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。
  • 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用

    编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。

  • 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。
  • 准备依赖包,将下列jar包上传到Spark客户端所在服务器,“$SPARK_HOME/jars/streamingClient010”目录下。
    • spark-streaming-kafkaWriter-0-10_2.12-3.1.1-hw-ei-311001.jar
    • kafka-clients-xxx.jar
    • kafka_2.12-xxx.jar
    • spark-sql-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar
    • spark-streaming-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar
    • spark-token-provider-kafka-0-10_2.12-3.1.1-hw-ei-311001-SNAPSHOT.jar
    • 版本号中包含hw-ei的依赖包请从华为开源镜像站下载。
    • 版本号中不包含hw-ei的依赖包都来自开源仓库,请从Maven中心仓获取。

运行任务

在运行样例程序时需要指定<checkpointDir> <brokers> <topic> <batchTime>,其中<checkPointDir>指应用程序结果备份到HDFS的路径,<brokers>指获取元数据的Kafka地址,<topic>指读取Kafka上的topic名称,<batchTime>指Streaming分批的处理间隔。

由于Spark Streaming Kafka的依赖包在客户端的存放路径与其他依赖包不同,如其他依赖包路径为“$SPARK_HOME/jars”,而Spark Streaming Kafka依赖包路径为“$SPARK_HOME/jars/streamingClient010”。所以在运行应用程序时,需要在spark-submit命令中添加配置项,指定Spark Streaming Kafka的依赖包路径,如--jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}")

由于运行模式为安全模式,需要添加新配置并修改命令参数:
  1. $SPARK_HOME/conf/jaas.conf添加新配置:
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=false
    useTicketCache=true
    debug=false;
    };
  2. $SPARK_HOME/conf/jaas-zk.conf添加新配置:
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="./user.keytab"
    principal="sparkuser@<系统域名>"
    useTicketCache=false
    storeKey=true
    debug=true;
    };
  3. 使用--files和相对路径提交keytab文件,这样才能保证keytab文件被加载到executor的container中。
  4. <brokers>中的端口,Kafka 0-10 Write To Print样例请使用SASL_PLAINTEXT协议端口号,Write To Kafka 0-10样例请使用PLAINTEXT协议端口号。
进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例):
  • Spark Streaming读取Kafka 0-10 Write To Print代码样例

    bin/spark-submit --master yarn --deploy-mode client --files ./jaas.conf,./user.keytab --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.SecurityKafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>

    其中配置示例如下:

    --files ./jaas.conf,./user.keytab //使用--files指定jaas.conf和keytab文件。
  • Spark Streaming Write To Kafka 0-10代码样例:

    bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.JavaDstreamKafkaWriter /opt/SparkStreamingKafka010JavaExample-1.0.jar <groupId> <brokers> <topics>