快速开发Spark应用
- 数据处理(Data Processing):可以用来快速处理数据,兼具容错性和可扩展性。
- 迭代计算(Iterative Computation):支持迭代计算,有效应对多步的数据处理逻辑。
- 数据挖掘(Data Mining):在海量数据基础上进行复杂的挖掘分析,可支持各种数据挖掘和机器学习算法。
- 流式处理(Streaming Processing):支持秒级延迟的流式处理,可支持多种外部数据源。
- 查询分析(Query Analysis):支持标准SQL查询分析,同时提供DSL(DataFrame), 并支持多种外部输入。
MRS对外提供了基于Spark组件的应用开发样例工程,本实践用于指导您创建MRS集群后,获取并导入样例工程并在本地进行编译调测,用于实现从Hive表中读取数据并重新写入HBase表。
本章节对应示例场景的开发思路:
- 查询指定Hive表的数据。
- 根据表中数据的key值去HBase指定表中做查询。
- 把相关的数据记录相加后重新写入HBase表。
创建MRS集群
- 购买一个MRS Spark集群,详情请参见购买自定义集群。
本文以购买的MRS 3.1.5版本的集群为例,组件包含Spark2x、Hive、HBase组件,集群开启了Kerberos认证。
- 集群购买成功后,在MRS集群的任一节点内,安装集群客户端,具体操作可参考安装并使用集群客户端。
例如客户端安装在主管理节点中,安装目录为“/opt/client”。
准备集群配置文件
- 集群创建成功后,登录FusionInsight Manager创建用于提交Flink作业的集群用户。
选择“系统 > 权限 > 用户 > 添加用户”,在新增用户界面创建一个人机用户,例如sparkuser。
“用户组”需加入“supergroup”用户组,并关联“System_administrator”角色。
- 使用新创建的用户登录FusionInsight Manager,按照界面提示修改初始密码。
- 选择“user.keytab”文件与“krb5.conf”文件。 ,在用户名为“sparkuser”的操作列选择“更多 > 下载认证凭据”下载认证凭据文件,保存后解压得到该用户的
准备应用程序
- 通过开源镜像站获取样例工程。
下载样例工程的Maven工程源码和配置文件,并在本地配置好相关开发工具,可参考通过开源镜像站获取样例工程。
根据集群版本选择对应的分支,下载并获取MRS相关样例工程。
例如本章节场景对应示例为“SparkHivetoHbase”样例,获取地址:https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.1.5/src/spark-examples/sparksecurity-examples/SparkHivetoHbaseJavaExample。
- 本地使用IDEA工具导入样例工程,等待Maven工程下载相关依赖包。
本地配置好Maven及SDK相关参数后,样例工程会自动加载相关依赖包,具体操作可参考配置并导入样例工程。
图1 Spark Hive to HBase样例程序
在示例程序“SparkHivetoHbase”中,通过使用Spark调用Hive接口来操作Hive表,然后根据key值去HBase表获取相应记录,把两者数据做操作后,更新到HBase表。
关键代码片段如下:
... public class SparkHivetoHbase { public static void main(String[] args) throws Exception { String userPrincipal = "sparkuser"; //指定用于认证的集群用户信息及keytab文件地址。 String userKeytabPath = "/opt/client/user.keytab"; String krb5ConfPath = "/opt/client/krb5.conf"; Configuration hadoopConf = new Configuration(); LoginUtil.login(userPrincipal, userKeytabPath, krb5ConfPath, hadoopConf); // 通过Spark接口获取表中的数据。 SparkConf conf = new SparkConf().setAppName("SparkHivetoHbase"); JavaSparkContext jsc = new JavaSparkContext(conf); HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(jsc); Dataset<Row> dataFrame = sqlContext.sql("select name, account from person"); // 遍历Hive表中的partition,更新到HBase表中。 dataFrame .toJavaRDD() .foreachPartition( new VoidFunction<Iterator<Row>>() { public void call(Iterator<Row> iterator) throws Exception { hBaseWriter(iterator); } }); jsc.stop(); } //在exetutor端更新hbase表记录 private static void hBaseWriter(Iterator<Row> iterator) throws IOException { // 读取HBase表。 String tableName = "table2"; String columnFamily = "cf"; Configuration conf = HBaseConfiguration.create(); Connection connection = ConnectionFactory.createConnection(conf); Table table = connection.getTable(TableName.valueOf(tableName)); try { connection = ConnectionFactory.createConnection(conf); table = connection.getTable(TableName.valueOf(tableName)); List<Row> table1List = new ArrayList<Row>(); List<Get> rowList = new ArrayList<Get>(); while (iterator.hasNext()) { Row item = iterator.next(); Get get = new Get(item.getString(0).getBytes()); table1List.add(item); rowList.add(get); } // 获取HBase表记录。 Result[] resultDataBuffer = table.get(rowList); // 修改HBase表记录。 List<Put> putList = new ArrayList<Put>(); for (int i = 0; i < resultDataBuffer.length; i++) { Result resultData = resultDataBuffer[i]; if (!resultData.isEmpty()) { int hiveValue = table1List.get(i).getInt(1); String hbaseValue = Bytes.toString(resultData.getValue(columnFamily.getBytes(), "cid".getBytes())); Put put = new Put(table1List.get(i).getString(0).getBytes()); // 计算结果。 int resultValue = hiveValue + Integer.valueOf(hbaseValue); put.addColumn( Bytes.toBytes(columnFamily), Bytes.toBytes("cid"), Bytes.toBytes(String.valueOf(resultValue))); putList.add(put); } } if (putList.size() > 0) { table.put(putList); } } catch (IOException e) { e.printStackTrace(); } finally { if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { // 关闭HBase连接。 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } ...
对开启了Kerberos认证的MRS集群,程序需要向服务端进行用户认证,在本示例程序中,通过代码配置认证信息,“userPrincipal”为用于认证的用户名,“userKeytabPath”和“krb5ConfPath”需要修改为该文件所在客户端服务器的实际路径。
- 确认工程内的参数无误后,将工程编译后进行打包,获取打包后的jar文件。
在Maven工具窗口,选择“clean”生命周期,执行Maven构建过程。然后继续选择“package”进行打包,在生成的“target”目录中获取jar包。
[INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 02:36 min [INFO] Finished at: 2023-06-12T20:46:24+08:00 [INFO] ------------------------------------------------------------------------
例如打包后的jar文件为“SparkHivetoHbase-1.0.jar”。
上传jar包及准备源数据
- 将编译后的jar包上传到客户端节点,例如上传到“/opt/client/sparkdemo”目录下。
如果本地网络无法直接连接客户端节点上传文件,可先将jar文件或者源数据上传至OBS文件系统中,然后通过MRS管理控制台集群内的“文件管理”页面导入HDFS中,再通过HDFS客户端使用hdfs dfs -get命令下载到客户端节点本地。
- 将用于认证的用户keytab文件也上传到代码中指定位置,例如上传到“/opt/client”目录下。
- 使用root用户登录安装了集群客户端的节点。
cd /opt/client
source bigdata_env
kinit sparkuser
- 创建Hive表并写入初始数据。
beeline
在Hive Beeline命令行中执行以下命令创建表并插入数据。
create table person ( name STRING, account INT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' ESCAPED BY '\\' STORED AS TEXTFILE;
insert into table person(name,account) values("1","100");
select * from person;
+--------------+-----------------+ | person.name | person.account | +--------------+-----------------+ | 1 | 100 | +--------------+-----------------+
- 创建HBase表并写入初始数据。
退出Hive Beeline命令行,执行spark-beeline,然后执行以下命令创建HBase表。
create table table2 ( key string, cid string ) using org.apache.spark.sql.hbase.HBaseSource options( hbaseTableName "table2", keyCols "key", colsMapping "cid=cf.cid" );
退出Spark Beeline命令行,执行hbase shell命令,进入HBase Shell命令行,执行以下命令插入数据。
put 'table2', '1', 'cf:cid', '1000'
scan 'table2'
ROW COLUMN+CELL 1 column=cf:cid, timestamp=2023-06-12T21:12:50.711, value=1000 1 row(s)
运行程序并查看结果
- 在安装了集群客户端的节点中,执行以下命令运行通过样例工程导出的jar包文件。
cd /opt/client
source bigdata_env
cd Spark2x/spark
vi conf/spark-defaults.conf
修改“spark.yarn.security.credentials.hbase.enabled”参数的值为“true”。
bin/spark-submit --class com.huawei.bigdata.spark.examples.SparkHivetoHbase --master yarn --deploy-mode client /opt/client/sparkdemo/SparkHivetoHbase-1.0.jar
- 任务提交后,使用sparkuser用户登录FusionInsight Manager,单击“集群 > 服务 > Yarn”,进入ResourceManager WebUI界面后,查找到对应的Spark应用程序作业信息,单击应用信息的最后一列“ApplicationMaster”,即可进入SparkUI页面查看任务详情。
图2 查看Spark任务详情
- 任务运行完成中,在HBase Shell中查询HBase表内容,可以看到对应记录已更新。
cd /opt/client
source bigdata_env
hbase shell
scan 'table2'
ROW COLUMN+CELL 1 column=cf:cid, timestamp=2023-06-12T21:22:50.711, value=1100 1 row(s)