更新时间:2024-10-31 GMT+08:00

Spark从HBase读取数据再写入HBase样例程序开发思路

场景说明

假定HBase的table1表存储用户当天消费的金额信息,table2表存储用户历史消费的金额信息。

现table1表有记录key=1,cf:cid=100,表示用户1在当天消费金额为100元。

table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。

基于某些业务要求,要求开发Spark应用程序实现如下功能:

根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。

上例所示,运行结果table2表用户key=1的总消费金额为cf:cid=1100元。

数据规划

使用Spark-Beeline工具创建Spark和HBase表table1、table2,并通过HBase插入数据。

  1. 确保JDBCServer已启动。登录Spark2x客户端节点。
  2. 使用Spark-Beeline工具创建Spark表table1。

    create table table1

    (

    key string,

    cid string

    )

    using org.apache.spark.sql.hbase.HBaseSource

    options(

    hbaseTableName "table1",

    keyCols "key",

    colsMapping "cid=cf.cid");

  3. 通过HBase插入数据,命令如下:

    put 'table1', '1', 'cf:cid', '100'

  4. 使用Spark-Beeline工具创建Spark表table2。

    create table table2

    (

    key string,

    cid string

    )

    using org.apache.spark.sql.hbase.HBaseSource

    options(

    hbaseTableName "table2",

    keyCols "key",

    colsMapping "cid=cf.cid");

  5. 通过HBase插入数据,命令如下:

    put 'table2', '1', 'cf:cid', '1000'

开发思路

  1. 查询table1表的数据。
  2. 根据table1表数据的key值去table2表做查询。
  3. 把前两步相应的数据记录做相加操作。
  4. 把上一步骤的结果写到table2表。

打包项目

  1. 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用
  2. 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/female/” )下。

运行任务

进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例):

  • 运行Java或Scala样例代码

    bin/spark-submit --jars --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.SparkHbasetoHbase --master yarn --deploy-mode client /opt/female/SparkHbasetoHbase-1.0.jar

  • 运行Python样例程序
    • 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。

    bin/spark-submit --master yarn --deploy-mode client --conf spark.yarn.user.classpath.first=true --jars /opt/female/SparkHbasetoHbasePythonExample/SparkHbasetoHbase-1.0.jar,/opt/female/protobuf-java-2.5.0.jar /opt/female/SparkHbasetoHbasePythonExample/SparkHbasetoHbasePythonExample.py