更新时间:2024-08-03 GMT+08:00

快速开发Spark应用

Spark是分布式批处理框架,提供分析挖掘与迭代式内存计算能力,支持多种语言的应用开发。 通常适用以下场景:
  • 数据处理(Data Processing):可以用来快速处理数据,兼具容错性和可扩展性。
  • 迭代计算(Iterative Computation):支持迭代计算,有效应对多步的数据处理逻辑。
  • 数据挖掘(Data Mining):在海量数据基础上进行复杂的挖掘分析,可支持各种数据挖掘和机器学习算法。
  • 流式处理(Streaming Processing):支持秒级延迟的流式处理,可支持多种外部数据源。
  • 查询分析(Query Analysis):支持标准SQL查询分析,同时提供DSL(DataFrame), 并支持多种外部输入。

MRS对外提供了基于Spark组件的应用开发样例工程,本实践用于指导您创建MRS集群后,获取并导入样例工程并在本地进行编译调测,用于实现从Hive表中读取数据并重新写入HBase表。

本章节对应示例场景的开发思路:

  1. 查询指定Hive表的数据。
  2. 根据表中数据的key值去HBase指定表中做查询。
  3. 把相关的数据记录相加后重新写入HBase表。

创建MRS集群

  1. 购买一个MRS Spark集群,详情请参见购买自定义集群

    本文以购买的MRS 3.1.5版本的集群为例,组件包含Spark2x、Hive、HBase组件,集群开启了Kerberos认证。

  2. 集群购买成功后,在MRS集群的任一节点内,安装集群客户端,具体操作可参考安装并使用集群客户端

    例如客户端安装在主管理节点中,安装目录为“/opt/client”。

准备集群配置文件

  1. 集群创建成功后,登录FusionInsight Manager创建用于提交Flink作业的集群用户。

    选择“系统 > 权限 > 用户 > 添加用户”,在新增用户界面创建一个人机用户,例如sparkuser

    “用户组”需加入“supergroup”用户组,并关联“System_administrator”角色。

  2. 使用新创建的用户登录FusionInsight Manager,按照界面提示修改初始密码。
  3. 选择系统 > 权限 > 用户,在用户名为“sparkuser”的操作列选择“更多 > 下载认证凭据”下载认证凭据文件,保存后解压得到该用户的“user.keytab”文件与“krb5.conf”文件。

准备应用程序

  1. 通过开源镜像站获取样例工程。

    下载样例工程的Maven工程源码和配置文件,并在本地配置好相关开发工具,可参考通过开源镜像站获取样例工程

    根据集群版本选择对应的分支,下载并获取MRS相关样例工程。

    例如本章节场景对应示例为“SparkHivetoHbase”样例,获取地址:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.5/src/spark-examples/sparksecurity-examples/SparkHivetoHbaseJavaExample

  2. 本地使用IDEA工具导入样例工程,等待Maven工程下载相关依赖包。

    本地配置好Maven及SDK相关参数后,样例工程会自动加载相关依赖包,具体操作可参考配置并导入样例工程

    图1 Spark Hive to HBase样例程序

    在示例程序“SparkHivetoHbase”中,通过使用Spark调用Hive接口来操作Hive表,然后根据key值去HBase表获取相应记录,把两者数据做操作后,更新到HBase表。

    关键代码片段如下:

    ...
    public class SparkHivetoHbase {
        public static void main(String[] args) throws Exception {
            String userPrincipal = "sparkuser";     //指定用于认证的集群用户信息及keytab文件地址。
            String userKeytabPath = "/opt/client/user.keytab";
            String krb5ConfPath = "/opt/client/krb5.conf";
            Configuration hadoopConf = new Configuration();
            LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf);
            // 通过Spark接口获取表中的数据。
            SparkConf conf = new SparkConf().setAppName("SparkHivetoHbase");
            JavaSparkContext jsc = new JavaSparkContext(conf);
            HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(jsc);
            Dataset<Row> dataFrame = sqlContext.sql("select name, account from person");
            // 遍历Hive表中的partition,更新到HBase表中。
                    dataFrame
                    .toJavaRDD()
                    .foreachPartition(
                            new VoidFunction<Iterator<Row>>() {
                                public void call(Iterator<Row> iterator) throws Exception {
                                    hBaseWriter(iterator);
                                }
                            });
            jsc.stop();
        }
        //在exetutor端更新hbase表记录
        private static void hBaseWriter(Iterator<Row> iterator) throws IOException {
            // 读取HBase表。
            String tableName = "table2";
            String columnFamily = "cf";
            Configuration conf = HBaseConfiguration.create();
            Connection connection = ConnectionFactory.createConnection(conf);
            Table table = connection.getTable(TableName.valueOf(tableName));
            try {
                connection = ConnectionFactory.createConnection(conf);
                table = connection.getTable(TableName.valueOf(tableName));
                List<Row> table1List = new ArrayList<Row>();
                List<Get> rowList = new ArrayList<Get>();
                while (iterator.hasNext()) {
                    Row item = iterator.next();
                    Get get = new Get(item.getString(0).getBytes());
                    table1List.add(item);
                    rowList.add(get);
                }
                // 获取HBase表记录。
                Result[] resultDataBuffer = table.get(rowList);
                // 修改HBase表记录。
                List<Put> putList = new ArrayList<Put>();
                for (int i = 0; i < resultDataBuffer.length; i++) {
                    Result resultData = resultDataBuffer[i];
                    if (!resultData.isEmpty()) {
                        int hiveValue = table1List.get(i).getInt(1);
                        String hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes()));
                        Put put = new Put(table1List.get(i).getString(0).getBytes());
                        // 计算结果。
                        int resultValue = hiveValue + Integer.valueOf(hbaseValue);
                        put.addColumn(
                                Bytes.toBytes(columnFamily),
                                Bytes.toBytes("cid"),
                                Bytes.toBytes(String.valueOf(resultValue)));
                        putList.add(put);
                    }
                }
                if (putList.size() > 0) {
                    table.put(putList);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (table != null) {
                    try {
                        table.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                if (connection != null) {
                    try {
                        // 关闭HBase连接。
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    ...

    对开启了Kerberos认证的MRS集群,程序需要向服务端进行用户认证,在本示例程序中,通过代码配置认证信息,“userPrincipal”为用于认证的用户名,“userKeytabPath”和“krb5ConfPath”需要修改为该文件所在客户端服务器的实际路径。

  3. 确认工程内的参数无误后,将工程编译后进行打包,获取打包后的jar文件。

    在Maven工具窗口,选择“clean”生命周期,执行Maven构建过程。然后继续选择“package”进行打包,在生成的“target”目录中获取jar包。

    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  02:36 min
    [INFO] Finished at: 2023-06-12T20:46:24+08:00
    [INFO] ------------------------------------------------------------------------

    例如打包后的jar文件为“SparkHivetoHbase-1.0.jar”。

上传jar包及准备源数据

  1. 将编译后的jar包上传到客户端节点,例如上传到“/opt/client/sparkdemo”目录下。

    如果本地网络无法直接连接客户端节点上传文件,可先将jar文件或者源数据上传至OBS文件系统中,然后通过MRS管理控制台集群内的“文件管理”页面导入HDFS中,再通过HDFS客户端使用hdfs dfs -get命令下载到客户端节点本地。

  2. 将用于认证的用户keytab文件也上传到代码中指定位置,例如上传到“/opt/client”目录下。
  3. 使用root用户登录安装了集群客户端的节点。

    cd /opt/client

    source bigdata_env

    kinit sparkuser

  4. 创建Hive表并写入初始数据。

    beeline

    在Hive Beeline命令行中执行以下命令创建表并插入数据。

    create table person ( name STRING, account INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' STORED AS TEXTFILE;

    insert into table person(name,account) values("1","100");

    select * from person;

    +--------------+-----------------+
    | person.name  | person.account  |
    +--------------+-----------------+
    | 1            | 100             |
    +--------------+-----------------+

  5. 创建HBase表并写入初始数据。

    退出Hive Beeline命令行,执行spark-beeline,然后执行以下命令创建HBase表。

    create table table2 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table2", keyCols "key", colsMapping "cid=cf.cid" );

    退出Spark Beeline命令行,执行hbase shell命令,进入HBase Shell命令行,执行以下命令插入数据。

    put 'table2', '1', 'cf:cid', '1000'

    scan 'table2'

    ROW                                                 COLUMN+CELL                                                                                                                                           
     1                                                 column=cf:cid, timestamp=2023-06-12T21:12:50.711, value=1000                                                                                           
    1 row(s)

运行程序并查看结果

  1. 在安装了集群客户端的节点中,执行以下命令运行通过样例工程导出的jar包文件。

    cd /opt/client

    source bigdata_env

    cd Spark2x/spark

    vi conf/spark-defaults.conf

    修改“spark.yarn.security.credentials.hbase.enabled”参数的值为“true”。

    bin/spark-submit --class com.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn --deploy-mode client /opt/client/sparkdemo/SparkHivetoHbase-1.0.jar

  2. 任务提交后,使用sparkuser用户登录FusionInsight Manager,单击“集群 > 服务 > Yarn”,进入ResourceManager WebUI界面后,查找到对应的Spark应用程序作业信息,单击应用信息的最后一列“ApplicationMaster”,即可进入SparkUI页面查看任务详情。

    图2 查看Spark任务详情

  3. 任务运行完成中,在HBase Shell中查询HBase表内容,可以看到对应记录已更新。

    cd /opt/client

    source bigdata_env

    hbase shell

    scan 'table2'

    ROW                                                 COLUMN+CELL                                                                                                                                           
     1                                                 column=cf:cid, timestamp=2023-06-12T21:22:50.711, value=1100                                                                                           
    1 row(s)