Spark从HBase读取数据再写入HBase样例程序开发思路
场景说明
假定HBase的table1表存储用户当天消费的金额信息,table2表存储用户历史消费的金额信息。
现table1表有记录key=1,cf:cid=100,表示用户1在当天消费金额为100元。
table2表有记录key=1,cf:cid=1000,表示用户1的历史消息记录金额为1000元。
基于某些业务要求,要求开发Spark应用程序实现如下功能:
根据用户名累计用户的历史消费金额,即用户总消费金额=100(用户当天的消费金额) + 1000(用户历史消费金额)。
上例所示,运行结果table2表用户key=1的总消费金额为cf:cid=1100元。
数据规划
使用Spark-Beeline工具创建Spark和HBase表table1、table2,并通过HBase插入数据。
- 确保JDBCServer已启动。登录Spark2x客户端节点。
- 使用Spark-Beeline工具创建Spark表table1。
create table table1
(
key string,
cid string
)
using org.apache.spark.sql.hbase.HBaseSource
options(
hbaseTableName "table1",
keyCols "key",
colsMapping "cid=cf.cid");
- 通过HBase插入数据,命令如下:
put 'table1', '1', 'cf:cid', '100'
- 使用Spark-Beeline工具创建Spark表table2。
create table table2
(
key string,
cid string
)
using org.apache.spark.sql.hbase.HBaseSource
options(
hbaseTableName "table2",
keyCols "key",
colsMapping "cid=cf.cid");
- 通过HBase插入数据,命令如下:
put 'table2', '1', 'cf:cid', '1000'
开发思路
- 查询table1表的数据。
- 根据table1表数据的key值去table2表做查询。
- 把前两步相应的数据记录做相加操作。
- 把上一步骤的结果写到table2表。
打包项目
- 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用。
- 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/female/” )下。
运行任务
进入Spark客户端目录,调用bin/spark-submit脚本运行代码,运行命令分别如下(类名与文件名等请与实际代码保持一致,此处仅为示例):
- 运行Java或Scala样例代码
bin/spark-submit --jars --conf spark.yarn.user.classpath.first=true --class com.huawei.bigdata.spark.examples.SparkHbasetoHbase --master yarn --deploy-mode client /opt/female/SparkHbasetoHbase-1.0.jar
- 运行Python样例程序
- 由于pyspark不提供Hbase相关api,本样例使用Python调用Java的方式实现。将所提供 Java代码使用maven打包成jar,并放在相同目录下,运行python程序时要使用--jars把jar包加载到classpath中。
bin/spark-submit --master yarn --deploy-mode client --conf spark.yarn.user.classpath.first=true --jars /opt/female/SparkHbasetoHbasePythonExample/SparkHbasetoHbase-1.0.jar,/opt/female/protobuf-java-2.5.0.jar /opt/female/SparkHbasetoHbasePythonExample/SparkHbasetoHbasePythonExample.py