Spark Structured Streaming样例程序开发思路
场景说明
在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。
数据规划
- 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。
- 创建Topic。
{zkQuorum}表示ZooKeeper集群信息,格式为IP:port。
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic}
- 启动Kafka的Producer,向Kafka发送数据。
{ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考在Linux环境中编包并运行Spark程序章节中导出jar包的操作步骤。
java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaWordCountProducer {BrokerList} {Topic} {messagesPerSec} {wordsPerMessage}
开发思路
- 接收Kafka中数据,生成相应DataStreamReader。
- 对单词记录进行分类统计。
- 计算结果,并进行打印。
打包项目
- 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。
- 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/female/” )下。
运行任务
- <brokers>指获取元数据的Kafka地址。
- <subscribe-type>指Kafka订阅类型(如subscribe)。
- <topic>指读取Kafka上的topic名称。
- <checkpointDir>指checkpoint文件存放路径,本地或者HDFS路径下。
由于Spark Structured Streaming Kafka的依赖包在客户端的存放路径与其他依赖包不同,如其他依赖包路径为“$SPARK_HOME/jars”,而Spark Structured Streaming Kafka依赖包路径为“$SPARK_HOME/jars/streamingClient010”。所以在运行应用程序时,需要在spark-submit命令中添加配置项,指定Spark Streaming Kafka的依赖包路径,如--jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}")
用户提交结构流任务时,通常需要通过--jars命令指定kafka相关jar包的路径,当前版本用户除了这一步外还需要将$SPARK_HOME/jars/streamingClient010目录中的kafka-clients jar包复制到$SPARK_HOME/jars目录下,否则会报class not found异常。
- 运行Java或Scala样例代码:
bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.KafkaWordCount /opt/SparkStructuredStreamingScalaExample-1.0.jar <brokers> <subscribe-type> <topic> <checkpointDir>
其中配置示例如下:
如果报没有权限读写本地目录的错误,需要指定“spark.sql.streaming.checkpointLocation”参数,且用户必须具有该参数指定的目录的读、写权限。
- 运行Python样例代码:
运行Python样例代码时需要将打包后的Java项目的jar包添加到streamingClient010/目录下。
bin/spark-submit --master yarn --deploy-mode client --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") /opt/female/SparkStructuredStreamingPythonExample/KafkaWordCount.py <brokers> <subscribe-type> <topic> <checkpointDir>