场景说明
场景说明
假定某个业务Kafka每1秒就会收到1个单词记录。
基于某些业务要求,开发的Spark应用程序实现如下功能:
实时累加计算每个单词的记录总数。
“log1.txt”示例文件:
LiuYang YuanJing GuoYijun CaiXuyu Liyuan FangBo LiuYang YuanJing GuoYijun CaiXuyu FangBo
数据规划
Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有kafka权限用户)。
- 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。
- 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。
- 将kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”(普通集群不需配置)。
- 创建Topic。
{zkQuorum}表示ZooKeeper集群信息,格式为IP:port。
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic}
- 启动Kafka的Producer,向Kafka发送数据。
java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:$KAFKA_HOME/libs/*:{JAR_PATH} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic}
- JAR_PATH为程序jar包所在路径,BrokerList格式为brokerIp:9092。
- 需要修改程序SecurityKafkaWordCount类中kerberos.domain.name的值为$KAFKA_HOME/config/consumer.properties文件中kerberos.domain.name配置项的值。
- 若用户需要对接安全Kafka,则还需要在spark客户端的conf目录下的“jaas.conf”文件中增加“KafkaClient”的配置信息,示例如下:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab = "./user.keytab" principal="leoB@HADOOP.COM" useTicketCache=false storeKey=true debug=true; };
在Spark on YARN模式下,jaas.conf和user.keytab通过YARN分发到Spark on YARN的container目录下,因此KafkaClient中对于“keyTab”的配置路径必须为相对jaas.conf的所在路径,例如“./user.keytab”。principal修改为自己创建的用户名及集群域名。
开发思路
- 接收Kafka中数据,生成相应DStream。
- 对单词记录进行分类统计。
- 计算结果,并进行打印。