更新时间:2024-08-03 GMT+08:00

Spark同步HBase数据到CarbonData开发思路

场景说明

数据实时写入HBase,用于点查业务,数据每隔一段时间批量同步到CarbonData表中,用于分析型查询业务。

运行前置操作

安全模式下该样例代码需要读取两个文件(user.keytab、krb5.conf)。user.keytab和krb5.conf文件为安全模式下的认证文件,需要在FusionInsight Manager中下载principal用户的认证凭证,样例代码中使用的用户为:sparkuser,需要修改为准备好的开发用户。

打包项目

  1. 将user.keytab、krb5.conf 两个文件上传客户端所在服务器上。
  1. 通过IDEA自带的Maven工具,打包项目,生成jar包。具体操作请参考在Linux环境中调测Spark应用
    • 编译打包前,样例代码中的user.keytab、krb5.conf文件路径需要修改为该文件所在客户端服务器的实际路径。例如:“/opt/user.keytab”,“/opt/krb5.conf”。
  2. 将打包生成的jar包上传到Spark客户端所在服务器的任意目录(例如“ /opt/” )下。

数据规划

  1. 创建HBase表,构造数据,列需要包含key,modify_time,valid。其中每条数据key值全表唯一,modify_time代表修改时间,valid代表是否为有效数据(该样例中'1'为有效,'0'为无效数据)。

    示例:进入hbase shell,执行如下命令:

    create 'hbase_table','key','info'

    put 'hbase_table','1','info:modify_time','2019-11-22 23:28:39'

    put 'hbase_table','1','info:valid','1'

    put 'hbase_table','2','info:modify_time','2019-11-22 23:28:39'

    put 'hbase_table','2','info:valid','1'

    put 'hbase_table','3','info:modify_time','2019-11-22 23:28:39'

    put 'hbase_table','3','info:valid','0'

    put 'hbase_table','4','info:modify_time','2019-11-22 23:28:39'

    put 'hbase_table','4','info:valid','1'

    上述数据的modify_time列可设置为当前时间之前的值。

    put 'hbase_table','5','info:modify_time','2021-03-03 15:20:39'

    put 'hbase_table','5','info:valid','1'

    put 'hbase_table','6','info:modify_time','2021-03-03 15:20:39'

    put 'hbase_table','6','info:valid','1'

    put 'hbase_table','7','info:modify_time','2021-03-03 15:20:39'

    put 'hbase_table','7','info:valid','0'

    put 'hbase_table','8','info:modify_time','2021-03-03 15:20:39'

    put 'hbase_table','8','info:valid','1'

    put 'hbase_table','4','info:valid','0'

    put 'hbase_table','4','info:modify_time','2021-03-03 15:20:39'

    上述数据的modify_time列可设置为样例程序启动后30分钟内的时间值(此处的30分钟为样例程序默认的同步间隔时间,可修改)。

    put 'hbase_table','9','info:modify_time','2021-03-03 15:32:39'

    put 'hbase_table','9','info:valid','1'

    put 'hbase_table','10','info:modify_time','2021-03-03 15:32:39'

    put 'hbase_table','10','info:valid','1'

    put 'hbase_table','11','info:modify_time','2021-03-03 15:32:39'

    put 'hbase_table','11','info:valid','0'

    put 'hbase_table','12','info:modify_time','2021-03-03 15:32:39'

    put 'hbase_table','12','info:valid','1'

    上述数据的modify_time列可设置为样例程序启动后30分钟到60分钟内的时间值,即第二次同步周期。

  2. 在sparksql中创建HBase的hive外表,命令如下:

    create table external_hbase_table(key string ,modify_time STRING, valid STRING)

    using org.apache.spark.sql.hbase.HBaseSource

    options(hbaseTableName "hbase_table", keyCols "key", colsMapping "modify_time=info.modify_time,valid=info.valid");

  3. 在sparksql中创建CarbonData表:

    create table carbon01(key string,modify_time STRING, valid STRING) stored as carbondata;

  4. 初始化加载当前hbase表中所有数据到CarbonData表;

    insert into table carbon01 select * from external_hbase_table where valid='1';

  5. 用spark-submit提交命令:

    spark-submit --master yarn --deploy-mode client --keytab /opt/FIclient/user.keytab --principal sparkuser --class com.huawei.bigdata.spark.examples.HBaseExternalHivetoCarbon /opt/example/HBaseExternalHivetoCarbon-1.0.jar