pyspark样例代码
前提条件
在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。
CSS非安全集群
- 开发说明
- 代码实现详解
- import相关依赖包
1 2 3
from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, Row from pyspark.sql import SparkSession
- 创建会话
1
sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate()
- import相关依赖包
- 通过DataFrame API 访问
- 连接配置
1 2
resource = "/mytest" nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
resource为指定在CSS关联的资源名。格式可以用"/index/type"指定资源位置(可简单理解index为database,type为table,但绝不等同)。
- ES 6.X版本中,单个Index只支持唯一type,type名可以自定义。
- ES 7.X版本中,单个Index将使用“_doc”作为type名,不再支持自定义。若访问ES 7.X版本时,该参数只需要填写index即可。
- 构造schema,并添加数据
1 2 3
schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False)]) rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
- 构造DataFrame
1
dataFrame = sparkSession.createDataFrame(rdd, schema)
- 保存数据到CSS
1
dataFrame.write.format("css").option("resource", resource).option("es.nodes", nodes).mode("Overwrite").save()
mode 有四种保存类型:
- ErrorIfExis:如果已经存在数据,则抛出异常。
- Overwrite:如果已经存在数据,则覆盖原数据。
- Append:如果已经存在数据,则追加保存。
- Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
- 读取CSS上的数据
1 2
jdbcDF = sparkSession.read.format("css").option("resource", resource).option("es.nodes", nodes).load() jdbcDF.show()
- 操作结果
- 连接配置
- 通过SQL API 访问
- 创建DLI跨源访问 CSS的关联表。
1 2 3 4 5
sparkSession.sql( "create table css_table(id long, name string) using css options( 'es.nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200', 'es.nodes.wan.only'='true', 'resource'='/mytest')")
创建CSS跨源表的参数详情可参考表1。
- 插入数据
1
sparkSession.sql("insert into css_table values(3,'tom')")
- 查询数据
1 2
jdbcDF = sparkSession.sql("select * from css_table") jdbcDF.show()
- 操作结果
- 创建DLI跨源访问 CSS的关联表。
- 提交Spark作业
- 将写好的python代码文件上传至DLI中。
- 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。
控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
- 如果选择Spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.css。
- 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 “Spark参数(--conf)” 配置
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*
- 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”。
- 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
- 代码实现详解
- 完整示例代码
- 通过DataFrame API 访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate() # Setting cross-source connection parameters resource = "/mytest" nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200" # Setting schema schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False)]) # Construction data rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(rdd, schema) # Write data to the CSS dataFrame.write.format("css").option("resource", resource).option("es.nodes", nodes).mode("Overwrite").save() # Read data jdbcDF = sparkSession.read.format("css").option("resource", resource).option("es.nodes", nodes).load() jdbcDF.show() # close session sparkSession.stop()
- 通过SQL API 访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate() # Create a DLI data table for DLI-associated CSS sparkSession.sql( "create table css_table(id long, name string) using css options( \ 'es.nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',\ 'es.nodes.wan.only'='true',\ 'resource'='/mytest')") # Insert data into the DLI data table sparkSession.sql("insert into css_table values(3,'tom')") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from css_table") jdbcDF.show() # close session sparkSession.stop()
- 通过DataFrame API 访问
CSS安全集群
- 开发说明
- 代码实现详解
- import相关依赖包
1 2 3
from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, Row from pyspark.sql import SparkSession
- 创建会话并设置AK/SK
认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
1 2 3 4 5
sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate() sparkSession.conf.set("fs.obs.access.key", ak) sparkSession.conf.set("fs.obs.secret.key", sk) sparkSession.conf.set("fs.obs.endpoint", enpoint) sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false")
- import相关依赖包
- 通过DataFrame API 访问
- 连接配置
1 2
resource = "/mytest"; nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200"
resource为指定在CSS关联的资源名。格式可以用"/index/type"指定资源位置(可简单理解index为database,type为table,但绝不等同)。
- ES 6.X版本中,单个Index只支持唯一type,type名可以自定义。
- ES 7.X版本中,单个Index将使用“_doc”作为type名,不再支持自定义。若访问ES 7.X版本时,该参数只需要填写index即可。
- 构造schema,并添加数据
1 2 3
schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False)]) rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")])
- 构造DataFrame
1
dataFrame = sparkSession.createDataFrame(rdd, schema)
- 保存数据到CSS
1 2 3 4 5 6 7 8 9 10 11 12
dataFrame.write.format("css") .option("resource", resource) .option("es.nodes", nodes) .option("es.net.ssl", "true") .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***") .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***") .option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .mode("Overwrite") .save()
mode 有四种保存类型:
- ErrorIfExis:如果已经存在数据,则抛出异常。
- Overwrite:如果已经存在数据,则覆盖原数据。
- Append:如果已经存在数据,则追加保存。
- Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
- 读取CSS上的数据
1 2 3 4 5 6 7 8 9 10 11 12
jdbcDF = sparkSession.read.format("css")\ .option("resource", resource)\ .option("es.nodes", nodes)\ .option("es.net.ssl", "true")\ .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks")\ .option("es.net.ssl.keystore.pass", "***")\ .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks")\ .option("es.net.ssl.truststore.pass", "***")\ .option("es.net.http.auth.user", "admin")\ .option("es.net.http.auth.pass", "***")\ .load() jdbcDF.show()
- 操作结果
- 连接配置
- 通过SQL API 访问
- 创建DLI跨源访问 CSS的关联表。
1 2 3 4 5 6 7 8 9 10 11 12
sparkSession.sql( "create table css_table(id long, name string) using css options(\ 'es.nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',\ 'es.nodes.wan.only'='true',\ 'resource'='/mytest',\ 'es.net.ssl'='true',\ 'es.net.ssl.keystore.location'='obs://桶名/path/transport-keystore.jks',\ 'es.net.ssl.keystore.pass'='***',\ 'es.net.ssl.truststore.location'='obs://桶名/path/truststore.jks',\ 'es.net.ssl.truststore.pass'='***',\ 'es.net.http.auth.user'='admin',\ 'es.net.http.auth.pass'='***')")
创建CSS跨源表的参数详情可参考表1。
- 插入数据
1
sparkSession.sql("insert into css_table values(3,'tom')")
- 查询数据
1 2
jdbcDF = sparkSession.sql("select * from css_table") jdbcDF.show()
- 操作结果
- 创建DLI跨源访问 CSS的关联表。
- 提交Spark作业
- 代码实现详解
- 完整示例代码
- 通过DataFrame API 访问
认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate() sparkSession.conf.set("fs.obs.access.key", ak) sparkSession.conf.set("fs.obs.secret.key", sk) sparkSession.conf.set("fs.obs.endpoint", enpoint) sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false") # Setting cross-source connection parameters resource = "/mytest"; nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200" # Setting schema schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False)]) # Construction data rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(rdd, schema) # Write data to the CSS dataFrame.write.format("css") .option("resource", resource) .option("es.nodes", nodes) .option("es.net.ssl", "true") .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***") .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***") .option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .mode("Overwrite") .save() # Read data jdbcDF = sparkSession.read.format("css")\ .option("resource", resource)\ .option("es.nodes", nodes)\ .option("es.net.ssl", "true")\ .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks")\ .option("es.net.ssl.keystore.pass", "***")\ .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***")\ .option("es.net.http.auth.user", "admin")\ .option("es.net.http.auth.pass", "***")\ .load() jdbcDF.show() # close session sparkSession.stop()
- 通过SQL API 访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession import os if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate() # Create a DLI data table for DLI-associated CSS sparkSession.sql("create table css_table(id int, name string) using css options(\ 'es.nodes'='192.168.6.204:9200',\ 'es.nodes.wan.only'='true',\ 'resource'='/mytest',\ 'es.net.ssl'='true',\ 'es.net.ssl.keystore.location' = 'obs://xietest1/lzq/keystore.jks',\ 'es.net.ssl.keystore.pass' = '**',\ 'es.net.ssl.truststore.location'='obs://xietest1/lzq/truststore.jks',\ 'es.net.ssl.truststore.pass'='**',\ 'es.net.http.auth.user'='admin',\ 'es.net.http.auth.pass'='**')") # Insert data into the DLI data table sparkSession.sql("insert into css_table values(3,'tom')") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from css_table") jdbcDF.show() # close session sparkSession.stop()
- 通过DataFrame API 访问