更新时间:2022-07-19 GMT+08:00
分享

场景说明

场景说明

假定某个业务Kafka每1秒就会收到1个单词记录。

基于某些业务要求,开发的Spark应用程序实现如下功能:

实时累加计算每个单词的记录总数。

“log1.txt”示例文件:

LiuYang
YuanJing
GuoYijun
CaiXuyu
Liyuan
FangBo
LiuYang
YuanJing
GuoYijun
CaiXuyu
FangBo

数据规划

Spark Streaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有kafka权限用户)。

  1. 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。
  2. 本地新建文件“input_data1.txt”,将“log1.txt”的内容复制保存到“input_data1.txt”。

    在客户端安装节点下创建文件目录:“/home/data”。将上述文件上传到此“/home/data”目录下。

  3. 将kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”(普通集群不需配置)。
  4. 创建Topic。

    {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。

    $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic}

  5. 启动Kafka的Producer,向Kafka发送数据。

    java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:$KAFKA_HOME/libs/*:{JAR_PATH} com.huawei.bigdata.spark.examples.StreamingExampleProducer {BrokerList} {Topic}

    • JAR_PATH为程序jar包所在路径,BrokerList格式为brokerIp:9092。
    • 需要修改程序SecurityKafkaWordCount类中kerberos.domain.name的值为$KAFKA_HOME/config/consumer.properties文件中kerberos.domain.name配置项的值。
    • 若用户需要对接安全Kafka,则还需要在spark客户端的conf目录下的“jaas.conf”文件中增加“KafkaClient”的配置信息,示例如下:
      KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      keyTab = "./user.keytab"
      principal="leoB@HADOOP.COM"
      useTicketCache=false
      storeKey=true
      debug=true;
      };

      在Spark on YARN模式下,jaas.conf和user.keytab通过YARN分发到Spark on YARN的container目录下,因此KafkaClient中对于“keyTab”的配置路径必须为相对jaas.conf的所在路径,例如“./user.keytab”。principal修改为自己创建的用户名及集群域名。

开发思路

  1. 接收Kafka中数据,生成相应DStream。
  2. 对单词记录进行分类统计。
  3. 计算结果,并进行打印。

相关文档