更新时间:2024-08-03 GMT+08:00
场景说明
场景说明
假定某个业务Kafka每30秒就会收到5个用户的消费记录。Hbase的table1表存储用户历史消费的金额信息。
现table1表有10条记录,表示有用户名分别为1-10的用户,用户的历史消费金额初始化都是0元。
基于某些业务要求,开发的Spark应用程序实现如下功能:
实时累加计算用户的消费金额信息:即用户总消费金额=用户的消费金额(kafka数据) + 用户历史消费金额(table1表的值),更新到table1表。
数据规划
- 创建HBase表,并插入数据。
- 通过HBase创建名为table1的表,命令如下。
- 通过HBase执行如下命令,将数据插入table1表中。
put 'table1', '1', 'cf:cid', '0' put 'table1', '2', 'cf:cid', '0' put 'table1', '3', 'cf:cid', '0' put 'table1', '4', 'cf:cid', '0' put 'table1', '5', 'cf:cid', '0' put 'table1', '6', 'cf:cid', '0' put 'table1', '7', 'cf:cid', '0' put 'table1', '8', 'cf:cid', '0' put 'table1', '9', 'cf:cid', '0' put 'table1', '10', 'cf:cid', '0'
- Spark Streaming样例工程的数据存储在Kafka中。
- 确保集群安装完成,包括HDFS、Yarn、Spark。
- 将kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”(普通集群不需配置)。
- 创建Topic。
{zkQuorum}表示ZooKeeper集群信息,格式为IP:port。
$KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic}
- 启动样例代码的Producer,向Kafka发送数据。
{ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考编包并运行Spark应用章节中导出jar包的操作步骤。
java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient/*:{JAR_PATH} com.huawei.bigdata.spark.examples.streaming.StreamingExampleProducer {BrokerList} {Topic}
- 如果开启了kerberos认证,需要将客户端的配置文件“spark-defaults.conf”和sparkJDBC服务端中的配置项spark.yarn.security.credentials.hbase.enabled置为true。
- {zkQuorum}格式为zkIp:2181。
- JAR_PATH为程序jar包所在路径。
- brokerlist格式为brokerIp:9092。
开发思路
- 接收Kafka中数据,生成相应DStream。
- 筛选数据信息并分析。
- 找到对应的HBase表记录。
- 计算结果,写到HBase表。