更新时间:2024-08-03 GMT+08:00

Spark Structured Streaming样例程序开发思路

场景说明

在Spark应用中,通过使用StructuredStreaming调用Kafka接口来获取单词记录,然后把单词记录分类统计,得到每个单词记录数。

数据规划

StructuredStreaming样例工程的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户)。
  1. 确保集群安装完成,包括HDFS、Yarn、Spark和Kafka。
  2. 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”
  3. 创建Topic。

    {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。

    $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic}

  4. 启动Kafka的Producer,向Kafka发送数据。

    {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考在Linux环境中调测Spark应用章节中导出jar包的操作步骤。

    java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaWordCountProducer {BrokerList} {Topic} {messagesPerSec} {wordsPerMessage}

开发思路

  1. 接收Kafka中数据,生成相应DataStreamReader。
  2. 对单词记录进行分类统计。
  3. 计算结果,并进行打印。

运行前置操作

安全模式下Spark Core样例代码需要读取两个文件(user.keytab、krb5.conf)。user.keytab和krb5.conf文件为安全模式下的认证文件,需要在FusionInsight Manager中下载principal用户的认证凭证,样例代码中使用的用户为:sparkuser,需要修改为准备好的开发用户。

打包项目

  • 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。
  • 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用

    编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。

  • 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。
  • 将commons-pool2-xxx.jar上传到“$SPARK_HOME/jars/streamingClient010/”目录下(jar包可从$SPARK_HOME/tool/carbonPrequery目录下获取)。

运行任务

  • 在运行样例程序时需要指定<brokers> <subscribe-type> <topic> <protocol> <service> <domain><checkpointDir>,其中<brokers>指获取元数据的Kafka地址(需使用21007端口),<subscribe-type>指Kafka订阅类型(如subscribe),<topic>指读取Kafka上的topic名称,<protocol>指安全访问协议(如SASL_PLAINTEXT),<service>指kerberos服务名称(如kafka),<domain>指kerberos域名(如hadoop.<系统域名>),<checkpointDir>指checkpoint文件存放路径。
    • 由于Spark Structured Streaming Kafka的依赖包在客户端的存放路径与其他依赖包不同,如其他依赖包路径为“$SPARK_HOME/jars”,而Spark Structured Streaming Kafka依赖包路径为“$SPARK_HOME/jars/streamingClient010”。所以在运行应用程序时,需要在spark-submit命令中添加配置项,指定Spark Streaming Kafka的依赖包路径,如--jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}")
      由于运行模式为安全模式,需要添加新配置并修改命令参数:
      1. $SPARK_HOME/conf/jaas.conf添加新配置:
        KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=false
        useTicketCache=true
        debug=false;
        };
      2. $SPARK_HOME/conf/jaas-zk.conf添加新配置:
        KafkaClient {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        keyTab="./user.keytab"
        principal="sparkuser@<系统域名>"
        useTicketCache=false
        storeKey=true
        debug=true;
        };
      3. 使用--files和相对路径提交keytab文件,这样才能保证keytab文件被加载到executor的container中。
    • 用户提交结构流任务时,通常需要通过--jars命令指定kafka相关jar包的路径,当前版本用户除了这一步外还需要将$SPARK_HOME/jars/streamingClient010目录中的kafka-clients jar包复制到$SPARK_HOME/jars目录下,否则会报class not found异常。
进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例):
  • 运行JavaScala样例代码:

    bin/spark-submit --master yarn --deploy-mode client --files <local Path>/jaas.conf,<local path>/user.keytab --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.SecurityKafkaWordCount /opt/SparkStructuredStreamingScalaExample-1.0.jar <brokers> <subscribe-type> <topic> <protocol> <service> <domain> <checkpointDir>

    其中配置示例如下:
    --files <local Path>/jaas.conf,<local Path>/user.keytab //使用--files指定jaas.conf和keytab文件。
  • 运行Python样例代码:

    运行Python样例代码时需要将打包后的Java项目的jar包添加到streamingClient010/目录下。

    bin/spark-submit --master yarn --deploy-mode client --files /opt/FIclient/user.keytab --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") /opt/female/SparkStructuredStreamingPythonExample/SecurityKafkaWordCount.py <brokers> <subscribe-type> <topic> <protocol> <service> <domain> <checkpointDir>