更新时间:2024-08-03 GMT+08:00

场景说明

场景说明

在Spark应用中,通过使用StructuredStreaming调用kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。

数据规划

StructuredStreaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有kafka权限用户)。

  1. 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。
  2. 将kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”(普通集群不需配置)。
  3. 创建Topic。

    {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。

    $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic}

  4. 启动Kafka的Producer,向Kafka发送数据。

{ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考编包并运行Spark应用

java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{JAR_PATH} com.huawei.bigdata.spark.examples.KafkaWordCountProducer {BrokerList} {Topic} {messagesPerSec} {wordsPerMessage}

  • JAR_PATH为程序jar包所在路径; BrokerList格式为brokerIp:9092;
  • 若用户需要对接安全Kafka,则还需要在spark客户端的conf目录下的“jaas.conf”文件中增加“KafkaClient”的配置信息,示例如下:
    KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab = "./user.keytab"
    principal="leoB@HADOOP.COM"
    useTicketCache=false
    storeKey=true
    debug=true;
    };

    在Spark on YARN模式下,jaas.conf和user.keytab通过YARN分发到Spark on YARN的container目录下,因此KafkaClient中对于“keyTab”的配置路径必须为相对jaas.conf的所在路径,例如“./user.keytab”。principal修改为自己创建的用户名及集群域名。

开发思路

  1. 接收Kafka中数据,生成相应DataStreamReader。
  2. 对单词记录进行分类统计。
  3. 计算结果,并进行打印。