场景说明
场景说明
在Spark应用中,通过使用StructuredStreaming调用kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。
数据规划
StructuredStreaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有kafka权限用户)。
- 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。
- 将kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”(普通集群不需配置)。
- 创建Topic。
{zkQuorum}表示ZooKeeper集群信息,格式为IP:port。
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic}
- 启动Kafka的Producer,向Kafka发送数据。
{ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考编包并运行Spark应用。
java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{JAR_PATH} com.huawei.bigdata.spark.examples.KafkaWordCountProducer {BrokerList} {Topic} {messagesPerSec} {wordsPerMessage}
- JAR_PATH为程序jar包所在路径; BrokerList格式为brokerIp:9092;
- 若用户需要对接安全Kafka,则还需要在spark客户端的conf目录下的“jaas.conf”文件中增加“KafkaClient”的配置信息,示例如下:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab = "./user.keytab" principal="leoB@HADOOP.COM" useTicketCache=false storeKey=true debug=true; };
在Spark on YARN模式下,jaas.conf和user.keytab通过YARN分发到Spark on YARN的container目录下,因此KafkaClient中对于“keyTab”的配置路径必须为相对jaas.conf的所在路径,例如“./user.keytab”。principal修改为自己创建的用户名及集群域名。
开发思路
- 接收Kafka中数据,生成相应DataStreamReader。
- 对单词记录进行分类统计。
- 计算结果,并进行打印。