更新时间:2022-04-24 GMT+08:00
pyspark样例代码
开发说明
- 前提条件
在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。
- 代码实现详解
- import相关依赖包
1 2 3
from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession
- 创建会话
1
sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate()
- import相关依赖包
- 通过 DataFrame API 访问
- 连接参数配置
1 2 3 4 5
url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres" dbtable = "customer" user = "dbadmin" password = "######" driver = "org.postgresql.Driver"
- 设置数据
1
dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)])
- 设置schema
1 2 3
schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False), StructField("age", IntegerType(), False)])
- 创建DataFrame
1
dataFrame = sparkSession.createDataFrame(dataList, schema)
- 保存数据到DWS
1 2 3 4 5 6 7 8 9
dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Overwrite") \ .save()
mode 有四种保存类型:
- ErrorIfExis:如果已经存在数据,则抛出异常。
- Overwrite:如果已经存在数据,则覆盖原数据。
- Append:如果已经存在数据,则追加保存。
- Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
- 读取DWS上的数据
1 2 3 4 5 6 7 8 9
jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show()
- 操作结果
- 连接参数配置
- 通过SQL API 访问
- 创建DLI跨源访问 dws 的关联表。
1 2 3 4 5 6 7
sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ( 'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres', 'dbtable'='customer', 'user'='dbadmin', 'password'='######', 'driver'='org.postgresql.Driver')")
- 插入数据
1
sparkSession.sql("insert into dli_to_dws values(2,'John',24)")
- 查询数据
1
jdbcDF = sparkSession.sql("select * from dli_to_dws").show()
- 操作结果
- 创建DLI跨源访问 dws 的关联表。
- 提交Spark作业
完整示例代码
- 通过DataFrame API访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate() # Set cross-source connection parameters url = "jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres" dbtable = "customer" user = "dbadmin" password = "######" driver = "org.postgresql.Driver" # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False), StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the DWS table dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Overwrite") \ .save() # Read data jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() # close session sparkSession.stop()
- 通过SQL API访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-dws").getOrCreate() # Createa data table for DLI - associated DWS sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ( 'url'='jdbc:postgresql://to-dws-1174404951-W8W4cW8I.datasource.com:8000/postgres', 'dbtable'='customer', 'user'='dbadmin', 'password'='######', 'driver'='org.postgresql.Driver')") # Insert data into the DLI data table sparkSession.sql("insert into dli_to_dws values(2,'John',24)") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from dli_to_dws").show() # close session sparkSession.stop()
父主题: 对接DWS
