更新时间:2024-07-04 GMT+08:00

pyspark样例代码

开发说明

支持对接CloudTable的HBase和MRS的HBase。

  • 前提条件

    在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。

    认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。

  • 代码实现详解
    1. import相关依赖包
      1
      2
      3
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
      from pyspark.sql import SparkSession
      
    2. 创建会话
      1
      sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
      
  • 通过SQL API访问
    1. 创建DLI跨源访问HBase的关联表
      • 如果对接的HBase集群未开启Kerberos认证,样例代码参考如下。
        sparkSession.sql(
            "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
            'ZKHost' = '192.168.0.189:2181',\
            'TableName' = 'hbtest',\
            'RowKey' = 'id:5',\
            'Cols' = 'location:info.location,city:detail.city')")
      • 如果对接的HBase集群开启了Kerberos认证,样例代码参考如下。
        sparkSession.sql(
            "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
            'ZKHost' = '192.168.0.189:2181',\
            'TableName' = 'hbtest',\
            'RowKey' = 'id:5',\
            'Cols' = 'location:info.location,city:detail.city',\
            'krb5conf' = './krb5.conf',\
            'keytab'='./user.keytab',\
            'principal' ='krbtest')")
        与未开启kerberos认证相比,开启了kerberos认证需要多设置三个参数,如表1所示。
        表1 参数说明

        参数名称与参数值

        参数说明

        'krb5conf' = './krb5.conf'

        krb5.conf的地址。

        'keytab'='./user.keytab'

        Keytab的地址。

        'principal' ='krbtest'

        认证用户名。

        krb5.conf和keytab文件获取请具体参考开启Kerberos认证时的相关配置文件操作说明。

        表参数详情可参考表1

    2. 导入数据到HBase
      sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
    3. 读取HBase上的数据
      sparkSession.sql("select * from testhbase").show()
  • 通过DataFrame API访问
    1. 创建DLI跨源访问HBase的关联表
      1
      2
      3
      4
      5
      6
      7
      8
      9
      sparkSession.sql(\
        "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,
           floatf FLOAT, doublef DOUBLE) using hbase OPTIONS (\
         'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,\
                     cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,\
                     cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',\
         'TableName' = 'table_DupRowkey1',\
         'RowKey' = 'id:5,location:6,city:7',\
         'Cols' = 'booleanf:CF1.booleanf, shortf:CF1.shortf, intf:CF1.intf, \  longf:CF1.longf, floatf:CF1.floatf, doublef:CF1.doublef')")
      
      • ZKHost、RowKey、Cols三个参数详情讲解可参考表1
      • TableName:CloudTable中的表名,在保存时如果没有表名,系统会自动创建。
    2. 构造schema
      1
      2
      3
      4
      5
      6
      7
      8
      9
      schema = StructType([StructField("id", StringType()),\                     
                           StructField("location", StringType()),\                     
                           StructField("city", StringType()),\                     
                           StructField("booleanf", BooleanType()),\                     
                           StructField("shortf", ShortType()),\                     
                           StructField("intf", IntegerType()),\                     
                           StructField("longf", LongType()),\                     
                           StructField("floatf", FloatType()),\                     
                           StructField("doublef", DoubleType())])
      
    3. 设置数据
      1
      dataList = sparkSession.sparkContext.parallelize([("11111", "aaa", "aaa", False, 4, 3, 23, 2.3, 2.34)])
      
    4. 创建DataFrame
      1
      dataFrame = sparkSession.createDataFrame(dataList, schema)
      
    5. 导入数据到HBase
      1
      dataFrame.write.insertInto("test_hbase")
      
    6. 读取HBase上的数据
       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      // Set cross-source connection parameters
      TableName = "table_DupRowkey1"
      RowKey = "id:5,location:6,city:7"
      Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
      ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                cloudtable-cf82-zk1- WY09px9l.cloudtable.com:2181"
      
      // select
      jdbcDF = sparkSession.read.schema(schema)\
                       .format("hbase")\
                       .option("ZKHost",ZKHost)\
                       .option("TableName",TableName)\
                       .option("RowKey",RowKey)\
                       .option("Cols",Cols)\
                       .load()
      jdbcDF.filter("id = '12333' or id='11111'").show()
      

      id、location、city:限定了长度,插入数据时须按长度给定数据值,否则查询时会发生编码格式错误。

    7. 操作结果;

  • 提交Spark作业
    1. 将写好的python代码文件上传至DLI中。

      控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

    2. 如果MRS集群开启了Kerberos认证,创建Spark作业时需要将krb5.conf和user.keytab文件添加到作业的其他依赖文件中,未开启Kerberos认证该步骤忽略。如图1所示:
      图1 添加依赖文件
    3. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。

      控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
      • 如果选择spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.hbase。
      • 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 'Spark参数(--conf)' 配置

        spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*

        spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*

      • 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”表说明
      • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。

完整示例代码

  • 通过SQL API访问MRS HBase
    • 未开启kerberos认证样例代码
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
      from pyspark.sql import SparkSession
      
      if __name__ == "__main__":
        # Create a SparkSession session.    
        sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
      
        sparkSession.sql(
          "CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
          'ZKHost' = '192.168.0.189:2181',\
          'TableName' = 'hbtest',\
          'RowKey' = 'id:5',\
          'Cols' = 'location:info.location,city:detail.city')")
      
      
        sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
      
        sparkSession.sql("select * from testhbase").show()
        # close session    
        sparkSession.stop()
    • 开启kerberos认证样例代码
      # _*_ coding: utf-8 _*_
      from __future__ import print_function
      from pyspark import SparkFiles
      from pyspark.sql import SparkSession
      import shutil
      import time
      import os
      
      if __name__ == "__main__":
          # Create a SparkSession session.
          sparkSession = SparkSession.builder.appName("Test_HBase_SparkSql_Kerberos").getOrCreate()
          sc = sparkSession.sparkContext
          time.sleep(10)
      
          krb5_startfile = SparkFiles.get("krb5.conf")
          keytab_startfile = SparkFiles.get("user.keytab")
          path_user = os.getcwd()
          krb5_endfile = path_user + "/" + "krb5.conf"
          keytab_endfile = path_user + "/" + "user.keytab"
          shutil.copy(krb5_startfile, krb5_endfile)
          shutil.copy(keytab_startfile, keytab_endfile)
          time.sleep(20)
      
          sparkSession.sql(
            "CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " +
            "using hbase OPTIONS(" +
            "'ZKHost'='10.0.0.146:2181'," +
            "'TableName'='hbtest'," +
            "'RowKey'='id:100'," +
            "'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF2.longf,floatf:CF1.floatf,doublef:CF2.doublef'," +
            "'krb5conf'='" + path_user + "/krb5.conf'," +
            "'keytab'='" + path_user+ "/user.keytab'," +
            "'principal'='krbtest') ")
      
            sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
      
          sparkSession.sql("select * from testhbase").show()
          # close session
          sparkSession.stop()
  • 通过DataFrame API访问HBase
    # _*_ coding: utf-8 _*_
    from __future__ import print_function
    from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
      # Create a SparkSession session.    
      sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
    
      # Createa data table for DLI-associated ct    
      sparkSession.sql(\
       "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,floatf FLOAT,doublef DOUBLE) using hbase OPTIONS ( \
        'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,\
                    cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,\
                    cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',\
        'TableName' = 'table_DupRowkey1',\
        'RowKey' = 'id:5,location:6,city:7',\
        'Cols' = 'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")
    
      # Create a DataFrame and initialize the DataFrame data.    
      dataList = sparkSession.sparkContext.parallelize([("11111", "aaa", "aaa", False, 4, 3, 23, 2.3, 2.34)])
    
      # Setting schema    
      schema = StructType([StructField("id", StringType()), 
                           StructField("location", StringType()), 
                           StructField("city", StringType()),                         
                           StructField("booleanf", BooleanType()),                        
                           StructField("shortf", ShortType()),                     
                           StructField("intf", IntegerType()),                 
                           StructField("longf", LongType()),                   
                           StructField("floatf", FloatType()),              
                           StructField("doublef", DoubleType())])
    
      # Create a DataFrame from RDD and schema    
      dataFrame = sparkSession.createDataFrame(dataList, schema)
    
      # Write data to the cloudtable-hbase    
      dataFrame.write.insertInto("test_hbase")
    
      # Set cross-source connection parameters    
      TableName = "table_DupRowkey1"
      RowKey = "id:5,location:6,city:7"
      Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
      ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
                cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
      # Read data on CloudTable-HBase    
      jdbcDF = sparkSession.read.schema(schema)\
                           .format("hbase")\
                           .option("ZKHost", ZKHost)\
                           .option("TableName",TableName)\
                           .option("RowKey", RowKey)\
                           .option("Cols", Cols)\
                           .load()    
      jdbcDF.filter("id = '12333' or id='11111'").show()
    
      # close session    
      sparkSession.stop()