更新时间:2024-08-03 GMT+08:00

场景说明

场景说明

假定某个业务Kafka每30秒就会收到5个用户的消费记录。Hbase的table1表存储用户历史消费的金额信息。

现table1表有10条记录,表示有用户名分别为1-10的用户,用户的历史消费金额初始化都是0元。

基于某些业务要求,开发的Spark应用程序实现如下功能:

实时累加计算用户的消费金额信息:即用户总消费金额=用户的消费金额(kafka数据) + 用户历史消费金额(table1表的值),更新到table1表。

数据规划

  1. 创建HBase表,并插入数据。

    1. 通过HBase创建名为table1的表,命令如下。

      create 'table1', 'cf'

    2. 通过HBase执行如下命令,将数据插入table1表中。
      put 'table1', '1', 'cf:cid', '0'
      put 'table1', '2', 'cf:cid', '0'
      put 'table1', '3', 'cf:cid', '0'
      put 'table1', '4', 'cf:cid', '0'
      put 'table1', '5', 'cf:cid', '0'
      put 'table1', '6', 'cf:cid', '0'
      put 'table1', '7', 'cf:cid', '0'
      put 'table1', '8', 'cf:cid', '0'
      put 'table1', '9', 'cf:cid', '0'
      put 'table1', '10', 'cf:cid', '0'

  2. Spark Streaming样例工程的数据存储在Kafka中。

    1. 确保集群安装完成,包括HDFS、Yarn、Spark。
    2. 将kafka的Broker配置参数“allow.everyone.if.no.acl.found”的值修改为“true”(普通集群不需配置)。
    3. 创建Topic。

      {zkQuorum}表示ZooKeeper集群信息,格式为IP:port。

      $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 3 --topic {Topic}

    4. 启动样例代码的Producer,向Kafka发送数据。

      {ClassPath}表示工程jar包的存放路径,详细路径由用户指定,可参考编包并运行Spark应用章节中导出jar包的操作步骤。

      java -cp $SPARK_HOME/jars/*:$SPARK_HOME/jars/streamingClient/*:{JAR_PATH} com.huawei.bigdata.spark.examples.streaming.StreamingExampleProducer {BrokerList} {Topic}

    • 如果开启了kerberos认证,需要将客户端的配置文件“spark-defaults.conf”和sparkJDBC服务端中的配置项spark.yarn.security.credentials.hbase.enabled置为true。
    • {zkQuorum}格式为zkIp:2181。
    • JAR_PATH为程序jar包所在路径。
    • brokerlist格式为brokerIp:9092。

开发思路

  1. 接收Kafka中数据,生成相应DStream。
  2. 筛选数据信息并分析。
  3. 找到对应的HBase表记录。
  4. 计算结果,写到HBase表。