pyspark样例代码
开发说明
- 前提条件
在DLI管理控制台上已完成创建跨源连接并绑定队列。具体操作请参考《数据湖探索用户指南》。
认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
- 代码实现详解
- import相关依赖包
1 2 3
from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession
- 创建会话
1
sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate()
- import相关依赖包
- 通过DataFrame API 访问
- 连接参数配置
1 2 3 4 5
url = "jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306" dbtable = "test.customer" user = "root" password = "######" driver = "com.mysql.jdbc.Driver"
参数说明请参考表1。
- 设置数据
1
dataList = sparkSession.sparkContext.parallelize([(123, "Katie", 19)])
- 设置schema
1 2 3
schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)])
- 创建DataFrame
1
dataFrame = sparkSession.createDataFrame(dataList, schema)
- 保存数据到RDS
1 2 3 4 5 6 7 8 9
dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Append") \ .save()
mode 有四种保存类型:
- ErrorIfExis:如果已经存在数据,则抛出异常。
- Overwrite:如果已经存在数据,则覆盖原数据。
- Append:如果已经存在数据,则追加保存。
- Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
- 读取RDS上的数据
1 2 3 4 5 6 7 8 9
jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show()
- 操作结果
- 连接参数配置
- 通过SQL API 访问
- 创建DLI跨源访问rds的关联表,填写连接参数。
1 2 3 4 5 6 7
sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS (\ 'url'='jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306',\ 'dbtable'='test.customer',\ 'user'='root',\ 'password'='######',\ 'driver'='com.mysql.jdbc.Driver')")
创建表参数请参考表1。
- 插入数据
1
sparkSession.sql("insert into dli_to_rds values(3,'John',24)")
- 查询数据
1 2
jdbcDF_after = sparkSession.sql("select * from dli_to_rds") jdbcDF_after.show()
- 操作结果
- 创建DLI跨源访问rds的关联表,填写连接参数。
- 提交Spark作业
- 将写好的python代码文件上传至DLI中。
- 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。
- 创建Spark作业完成后,在控制台单击右上角“执行”提交作业,页面显示“批处理作业提交成功”说明Spark作业提交成功,可以在Spark作业管理页面查看提交的作业的状态和日志。
- 创建Spark作业时选择的“所属队列”为创建跨源连接时所绑定的队列。
- 如果选择spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.rds。
- 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 'Spark参数(--conf)' 配置
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/rds/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/rds/*
- 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”。
- 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
完整示例代码
直接复制如下样例代码到py文件中后,需要注意文件内容中的“\”后面可能会有unexpected character的问题。需要将“\”后面的缩进或是空格全部删除。
- 通过DataFrame API访问
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate() # Set cross-source connection parameters. url = "jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306" dbtable = "test.customer" user = "root" password = "######" driver = "com.mysql.jdbc.Driver" # Create a DataFrame and initialize the DataFrame data. dataList = sparkSession.sparkContext.parallelize([(123, "Katie", 19)]) # Setting schema schema = StructType([StructField("id", IntegerType(), False),\ StructField("name", StringType(), False),\ StructField("age", IntegerType(), False)]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(dataList, schema) # Write data to the RDS. dataFrame.write \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .mode("Append") \ .save() # Read data jdbcDF = sparkSession.read \ .format("jdbc") \ .option("url", url) \ .option("dbtable", dbtable) \ .option("user", user) \ .option("password", password) \ .option("driver", driver) \ .load() jdbcDF.show() # close session sparkSession.stop()
- 通过SQL API访问
# _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-rds").getOrCreate() # Createa data table for DLI - associated RDS sparkSession.sql( "CREATE TABLE IF NOT EXISTS dli_to_rds USING JDBC OPTIONS (\ 'url'='jdbc:mysql://to-rds-1174404952-ZgPo1nNC.datasource.com:3306',\ 'dbtable'='test.customer',\ 'user'='root',\ 'password'='######',\ 'driver'='com.mysql.jdbc.Driver')") # Insert data into the DLI data table sparkSession.sql("insert into dli_to_rds values(3,'John',24)") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from dli_to_rds") jdbcDF.show() # close session sparkSession.stop()