Spark Structured Streaming状态操作样例程序开发思路
场景说明
假设需要跨批次统计每个session期间发生了多少次event以及本session的开始和结束timestamp;
同时输出本批次被更新状态的session。
数据规划
- 在kafka中生成模拟数据(需要有Kafka权限用户)。
- 确保集群安装完成,包括安装HDFS、Yarn、Spark2x和Kafka服务。
- 将Kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”。
- 创建Topic。
{zkQuorum}表示ZooKeeper集群信息,格式为IP:port。
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 1 --topic {Topic}
- 启动Kafka的Producer,向Kafka发送数据。
{ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考在Linux环境中编包并运行Spark程序章节中导出jar包的操作步骤。
java -cp $SPARK_HOME/conf:$SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient010/*:{ClassPath} com.huawei.bigdata.spark.examples.KafkaProducer {brokerlist} {topic} {number of events produce every 0.02s}
示例:
java -cp /opt/client/Spark2x/spark/conf:/opt/StructuredStreamingState-1.0.jar:/opt/client/Spark2x/spark/jars/*:/opt/client/Spark2x/spark/jars/streamingClient010/* com.huawei.bigdata.spark.examples.KafkaProducer xxx.xxx.xxx.xxx:21005,xxx.xxx.xxx.xxx:21005,xxx.xxx.xxx.xxx:21005 mytopic 10
开发思路
- 接收Kafka中数据,生成相应DataStreamReader。
- 进行分类统计。
- 计算结果,并进行打印。
运行前置操作
安全模式下Spark Core样例代码需要读取两个文件(user.keytab、krb5.conf)。user.keytab和krb5.conf文件为安全模式下的认证文件,需要在FusionInsight Manager中下载principal用户的认证凭证,样例代码中使用的用户为:sparkuser,需要修改为准备好的开发用户。
打包项目
- 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中编包并运行Spark程序。
- 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt” )下。
- 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上(文件上传的路径需要和生成的jar包路径一致)。
运行任务
在运行样例程序时需要指定 <brokers> <subscribe-type><kafkaProtocol> <kafkaService> <kafkaDomain> <topic> <checkpointLocation>,其中<brokers>指获取元数据的Kafka地址(需使用21007端口),<subscribe-type> 指定kakfa的消费方式,<kafkaProtocol>指安全访问协议(如SASL_PLAINTEXT),<kafkaService>指kerberos服务名称(如kafka),<kafkaDomain>指kerberos域名(如hadoop.<系统域名>),<topic>指要消费的kafka topic,<checkpointLocation> 指spark任务的checkpoint保存地址。
由于Spark Structured Streaming Kafka的依赖包在客户端的存放路径与其他依赖包不同,如其他依赖包路径为“$SPARK_HOME/jars”,而Spark Streaming Structured Kafka依赖包路径为“$SPARK_HOME/jars/streamingClient010”。所以在运行应用程序时,需要在spark-submit命令中添加配置项,指定Spark Streaming Kafka的依赖包路径,如--jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}")
- $SPARK_HOME/conf/jaas.conf添加新配置:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=false useTicketCache=true debug=false; };
- $SPARK_HOME/conf/jaas-zk.conf添加新配置:
KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="./user.keytab" principal="sparkuser@<系统域名>" useTicketCache=false storeKey=true debug=true; };
- 使用--files和相对路径提交keytab文件,这样才能保证keytab文件被加载到executor的container中
进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令如下(类名与文件名等请与实际代码保持一致,此处仅为示例):
- bin/spark-submit --master yarn --deploy-mode client --files <local Path>/jaas.conf,<local path>/user.keytab --jars $(files=($SPARK_HOME/jars/streamingClient010/*.jar); IFS=,; echo "${files[*]}") --class com.huawei.bigdata.spark.examples.kafkaSessionization /opt/StructuredStreamingState-1.0.jar <brokers> <subscribe-type> <kafkaProtocol> <kafkaService> <kafkaDomain> <topic> <checkpointLocation>
其中配置示例如下:
--files ./jaas.conf,./user.keytab //使用--files指定jaas.conf和keytab文件。
用户提交结构流任务时,通常需要通过--jars命令指定kafka相关jar包的路径,当前版本用户除了这一步外还需要将$SPARK_HOME/jars/streamingClient010目录中的kafka-clients jar包复制到$SPARK_HOME/jars目录下,否则会报class not found异常。