pyspark样例代码
开发说明
支持对接CloudTable的HBase和MRS的HBase。
- 前提条件
在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。
认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
- 代码实现详解
- import相关依赖包
1 2 3
from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType from pyspark.sql import SparkSession
- 创建会话
1
sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
- import相关依赖包
- 通过SQL API访问
- 创建DLI跨源访问HBase的关联表
- 如果对接的HBase集群未开启Kerberos认证,样例代码参考如下。
sparkSession.sql( "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\ 'ZKHost' = '192.168.0.189:2181',\ 'TableName' = 'hbtest',\ 'RowKey' = 'id:5',\ 'Cols' = 'location:info.location,city:detail.city')")
- 如果对接的HBase集群开启了Kerberos认证,样例代码参考如下。
sparkSession.sql( "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\ 'ZKHost' = '192.168.0.189:2181',\ 'TableName' = 'hbtest',\ 'RowKey' = 'id:5',\ 'Cols' = 'location:info.location,city:detail.city',\ 'krb5conf' = './krb5.conf',\ 'keytab'='./user.keytab',\ 'principal' ='krbtest')")
与未开启kerberos认证相比,开启了kerberos认证需要多设置三个参数,如表1所示。krb5.conf和keytab文件获取请具体参考开启Kerberos认证时的相关配置文件操作说明。
表参数详情可参考表1。
- 如果对接的HBase集群未开启Kerberos认证,样例代码参考如下。
- 导入数据到HBase
sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
- 读取HBase上的数据
sparkSession.sql("select * from testhbase").show()
- 创建DLI跨源访问HBase的关联表
- 通过DataFrame API访问
- 创建DLI跨源访问HBase的关联表
1 2 3 4 5 6 7 8 9
sparkSession.sql(\ "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG, floatf FLOAT, doublef DOUBLE) using hbase OPTIONS (\ 'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,\ cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,\ cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',\ 'TableName' = 'table_DupRowkey1',\ 'RowKey' = 'id:5,location:6,city:7',\ 'Cols' = 'booleanf:CF1.booleanf, shortf:CF1.shortf, intf:CF1.intf, \ longf:CF1.longf, floatf:CF1.floatf, doublef:CF1.doublef')")
- ZKHost、RowKey、Cols三个参数详情讲解可参考表1。
- TableName:CloudTable中的表名,在保存时如果没有表名,系统会自动创建。
- 构造schema
1 2 3 4 5 6 7 8 9
schema = StructType([StructField("id", StringType()),\ StructField("location", StringType()),\ StructField("city", StringType()),\ StructField("booleanf", BooleanType()),\ StructField("shortf", ShortType()),\ StructField("intf", IntegerType()),\ StructField("longf", LongType()),\ StructField("floatf", FloatType()),\ StructField("doublef", DoubleType())])
- 设置数据
1
dataList = sparkSession.sparkContext.parallelize([("11111", "aaa", "aaa", False, 4, 3, 23, 2.3, 2.34)])
- 创建DataFrame
1
dataFrame = sparkSession.createDataFrame(dataList, schema)
- 导入数据到HBase
1
dataFrame.write.insertInto("test_hbase")
- 读取HBase上的数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Set cross-source connection parameters TableName = "table_DupRowkey1" RowKey = "id:5,location:6,city:7" Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1- WY09px9l.cloudtable.com:2181" // select jdbcDF = sparkSession.read.schema(schema)\ .format("hbase")\ .option("ZKHost",ZKHost)\ .option("TableName",TableName)\ .option("RowKey",RowKey)\ .option("Cols",Cols)\ .load() jdbcDF.filter("id = '12333' or id='11111'").show()
id、location、city:限定了长度,插入数据时须按长度给定数据值,否则查询时会发生编码格式错误。
- 操作结果;
- 创建DLI跨源访问HBase的关联表
- 提交Spark作业
- 将写好的python代码文件上传至DLI中。
- 如果MRS集群开启了Kerberos认证,创建Spark作业时需要将krb5.conf和user.keytab文件添加到作业的其他依赖文件中,未开启Kerberos认证该步骤忽略。如图1所示:
- 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。
控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
- 如果选择spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.hbase。
- 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 'Spark参数(--conf)' 配置
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*
- 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”。
- 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
完整示例代码
- 通过SQL API访问MRS HBase
- 未开启kerberos认证样例代码
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() sparkSession.sql( "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\ 'ZKHost' = '192.168.0.189:2181',\ 'TableName' = 'hbtest',\ 'RowKey' = 'id:5',\ 'Cols' = 'location:info.location,city:detail.city')") sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") sparkSession.sql("select * from testhbase").show() # close session sparkSession.stop()
- 开启kerberos认证样例代码
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark import SparkFiles from pyspark.sql import SparkSession import shutil import time import os if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("Test_HBase_SparkSql_Kerberos").getOrCreate() sc = sparkSession.sparkContext time.sleep(10) krb5_startfile = SparkFiles.get("krb5.conf") keytab_startfile = SparkFiles.get("user.keytab") path_user = os.getcwd() krb5_endfile = path_user + "/" + "krb5.conf" keytab_endfile = path_user + "/" + "user.keytab" shutil.copy(krb5_startfile, krb5_endfile) shutil.copy(keytab_startfile, keytab_endfile) time.sleep(20) sparkSession.sql( "CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " + "using hbase OPTIONS(" + "'ZKHost'='10.0.0.146:2181'," + "'TableName'='hbtest'," + "'RowKey'='id:100'," + "'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF2.longf,floatf:CF1.floatf,doublef:CF2.doublef'," + "'krb5conf'='" + path_user + "/krb5.conf'," + "'keytab'='" + path_user+ "/user.keytab'," + "'principal'='krbtest') ") sparkSession.sql("insert into testhbase values('95274','abc','Jinan')") sparkSession.sql("select * from testhbase").show() # close session sparkSession.stop()
- 未开启kerberos认证样例代码
- 通过DataFrame API访问HBase
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() # Createa data table for DLI-associated ct sparkSession.sql(\ "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,floatf FLOAT,doublef DOUBLE) using hbase OPTIONS ( \ 'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,\ cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,\ cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',\ 'TableName' = 'table_DupRowkey1',\ 'RowKey' = 'id:5,location:6,city:7',\ 'Cols' = 'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')") # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([("11111", "aaa", "aaa", False, 4, 3, 23, 2.3, 2.34)]) # Setting schema schema = StructType([StructField("id", StringType()), StructField("location", StringType()), StructField("city", StringType()), StructField("booleanf", BooleanType()), StructField("shortf", ShortType()), StructField("intf", IntegerType()), StructField("longf", LongType()), StructField("floatf", FloatType()), StructField("doublef", DoubleType())]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the cloudtable-hbase dataFrame.write.insertInto("test_hbase") # Set cross-source connection parameters TableName = "table_DupRowkey1" RowKey = "id:5,location:6,city:7" Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181" # Read data on CloudTable-HBase jdbcDF = sparkSession.read.schema(schema)\ .format("hbase")\ .option("ZKHost", ZKHost)\ .option("TableName",TableName)\ .option("RowKey", RowKey)\ .option("Cols", Cols)\ .load() jdbcDF.filter("id = '12333' or id='11111'").show() # close session sparkSession.stop()