Spark Streaming对接Kafka0-10样例程序开发思路
场景说明
假定某个业务Kafka每1秒就会收到1个单词记录。
基于某些业务要求,开发的Spark应用程序实现如下功能:
实时累加计算每个单词的记录总数。
“log1.txt”示例文件:
LiuYang YuanJing GuoYijun CaiXuyu Liyuan FangBo LiuYang YuanJing GuoYijun CaiXuyu FangBo
数据规划
- 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。
- 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。
在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。
- 创建Topic。
{zkQuorum}表示ZooKeeper集群信息,格式为IP:port。
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic}
- 启动Kafka的Producer,向Kafka发送数据。
java -cp {ClassPath} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic}
其中,ClassPath应包含Spark客户端Kafka jar包的绝对路径,如/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/*
开发思路
- 接收Kafka中数据,生成相应DStream。
- 对单词记录进行分类统计。
- 计算结果,并进行打印。
打包项目
- 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。
- 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。
- 版本号中包含hw-ei的依赖包请从华为开源镜像站下载
- 版本号中不包含hw-ei的依赖包都来自开源仓库,请从Maven中心仓获取。
运行任务
在运行样例程序时需要指定<checkpointDir> <brokers> <topic> <batchTime>,其中<checkPointDir>指应用程序结果备份到HDFS的路径,<brokers>指获取元数据的Kafka地址,<topic>指读取Kafka上的topic名称,<batchTime>指Streaming分批的处理间隔。
由于Spark Streaming Kafka的依赖包在客户端的存放路径与其他依赖包不同,如其他依赖包路径为“$SPARK_HOME/jars”,而Spark Streaming Kafka依赖包路径为“$SPARK_HOME/jars/streamingClient010”。所以在运行应用程序时,需要在spark-submit命令中添加配置项,指定Spark Streaming Kafka的依赖包路径,如--jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}")
- Spark Streaming读取Kafka 0-10 Write To Print代码样例
bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStreamingKafka010JavaExample-1.0.jar <checkpointDir> <brokers> <topic> <batchTime>
- Spark Streaming Write To Kafka 0-10代码样例:
bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.JavaDstreamKafkaWriter /opt/SparkStreamingKafka010JavaExample-1.0.jar <groupId> <brokers> <topics>