更新时间:2024-10-31 GMT+08:00

Spark从HBase读取数据再写入HBase样例程序开发思路

场景说明

假定HBase的table1表存储用户当天消费的金额信息,table2表存储用户历史消费的金额信息。

现table1表有记录key=1,cf:cid=100,表示用户1在当天消费金额为100元。

table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。

基于某些业务要求,要求开发Spark应用程序实现如下功能:

根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。

上例所示,运行结果table2表用户key=1的总消费金额为cf:cid=1100元。

数据规划

使用Spark-Beeline工具创建Spark和HBase表table1、table2,并通过HBase插入数据。

  1. 确保JDBCServer已启动。登录Spark2x客户端节点。
  2. 使用Spark-beeline工具创建Spark表table1。

    create table table1

    (

    key string,

    cid string

    )

    using org.apache.spark.sql.hbase.HBaseSource

    options(

    hbaseTableName "table1",

    keyCols "key",

    colsMapping "cid=cf.cid");

  3. 通过HBase插入数据,命令如下:

    put 'table1', '1', 'cf:cid', '100'

  4. 使用Spark-Beeline工具创建Spark表table2。

    create table table2

    (

    key string,

    cid string

    )

    using org.apache.spark.sql.hbase.HBaseSource

    options(

    hbaseTableName "table2",

    keyCols "key",

    colsMapping "cid=cf.cid");

  5. 通过HBase插入数据,命令如下:

    put 'table2', '1', 'cf:cid', '1000'

开发思路

  1. 查询table1表的数据。
  2. 根据table1表数据的key值去table2表做查询。
  3. 把前两步相应的数据记录做相加操作。
  4. 把上一步骤的结果写到table2表。

运行前置操作

安全模式下Spark Core样例代码需要读取两个文件(user.keytab、krb5.conf)。user.keytab和krb5.conf文件为安全模式下的认证文件,需要在FusionInsight Manager中下载principal用户的认证凭证,样例代码中使用的用户为:sparkuser,需要修改为准备好的开发用户。

打包项目

  1. 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。
  1. 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用

    编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/female/user.keytab”,“/opt/female/krb5.conf”。

  2. 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/female/” )下。

运行任务

进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例):

  • 运行Java或Scala样例代码

    bin/spark-submit --jars {客户端安装路径}/Spark/spark/jars/protobuf-java-2.5.0.jar --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.SparkHbasetoHbase --master yarn --deploy-mode client /opt/female/SparkHbasetoHbase-1.0.jar

  • 运行Python样例程序
    • 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。
    • 由于Python样例代码中未给出认证信息,请在执行应用程序时通过配置项“--keytab”和“--principal”指定认证信息。

    bin/spark-submit --master yarn --deploy-mode client --keytab /opt/FIclient/user.keytab --principal sparkuser --conf spark.yarn.user.classpath.first=true --jars /opt/female/SparkHbasetoHbasePythonExample/SparkHbasetoHbase-1.0.jar,/opt/female/protobuf-java-2.5.0.jar /opt/female/SparkHbasetoHbasePythonExample/SparkHbasetoHbasePythonExample.py