scala样例代码
前提条件
在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。
CSS非安全集群
- 开发说明
- 构造依赖信息,创建SparkSession
- 导入依赖
涉及到的mvn依赖库
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
import相关依赖包1 2
import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
- 创建会话。
1
val sparkSession = SparkSession.builder().getOrCreate()
- 导入依赖
- 通过SQL API访问
- 创建DLI跨源访问 CSS的关联表。
1 2 3 4
sparkSession.sql("create table css_table(id int, name string) using css options( 'es.nodes' 'to-css-1174404221-Y2bKVIqY.datasource.com:9200', 'es.nodes.wan.only'='true', 'resource' '/mytest/css')")
表1 创建表参数 参数
说明
es.nodes
CSS的连接地址,需要先创建跨源连接。具体操作请参考《数据湖探索用户指南》。
创建经典型跨源连接后,使用经典型跨源连接中返回的连接地址。
创建增强型跨源连接后,使用CSS提供的"内网访问地址",格式为"IP1:PORT1,IP2:PORT2"。
resource
指定在CSS关联的资源名,用"/index/type"指定资源位置(可简单理解index为database,type为table,但绝不等同)。
说明:- ES 6.X版本中,单个Index只支持唯一type,type名可以自定义。
- ES 7.X版本中,单个Index将使用“_doc”作为type名,不再支持自定义。若访问ES 7.X版本时,该参数只需要填写index即可。
pushdown
CSS的下压功能是否开启,默认为“true”。包含大量IO传输的表在有where过滤条件的情况下能够开启pushdown降低IO。
strict
CSS的下压是否是严格的,默认为“false”。精确匹配的场景下比pushdown降低更多IO。
batch.size.entries
单次batch插入entry的条数上限,默认为1000。如果单条数据非常大,在bulk存储设置的数据条数前提前到达了单次batch的总数据量上限,则停止存储数据,以batch.size.bytes为准,提交该批次的数据。
batch.size.bytes
单次batch的总数据量上限,默认为1mb。如果单条数据非常小,在bulk存储到总数据量前提前到达了单次batch的条数上限,则停止存储数据,以batch.size.entries为准,提交该批次的数据。
es.nodes.wan.only
是否仅通过域名访问es节点,默认为false。使用经典型跨源的连接地址作为es.nodes时,该参数需要配置为true;使用css服务提供的原始内网IP地址作为es.nodes时,不需要填写该参数或者配置为false。
es.mapping.id
指定一个字段,其值作为es中Document的id。
说明:- 相同/index/type下的Document id是唯一的。如果作为Document id的字段存在重复值,则在执行插入es时,重复id的Document将会被覆盖。
- 该特性可以用作容错解决方案。当插入数据执行一半时,DLI作业失败,会有部分数据已经插入到es中,这部分为冗余数据。如果设置了Document id,则在重新执行DLI作业时,会覆盖上一次的冗余数据。
batch.size.entries和batch.size.bytes分别对数据条数和数据量大小进行限制。
- 插入数据。
1
sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')")
- 查询数据。
1 2
val dataFrame = sparkSession.sql("select * from css_table") dataFrame.show()
插入数据前:
插入数据后:
- 删除数据表
1
sparkSession.sql("drop table css_table")
- 创建DLI跨源访问 CSS的关联表。
- 通过DataFrame API访问
- 连接配置。
1 2
val resource = "/mytest/css" val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
- 构造schema,并添加数据。
1 2
val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false))) val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))
- 导入数据到CSS。
1 2 3 4 5 6 7
val dataFrame_1 = sparkSession.createDataFrame(rdd, schema) dataFrame_1.write .format("css") .option("resource", resource) .option("es.nodes", nodes) .mode(SaveMode.Append) .save()
SaveMode 有四种保存类型:
- ErrorIfExis:如果已经存在数据,则抛出异常。
- Overwrite:如果已经存在数据,则覆盖原数据。
- Append:如果已经存在数据,则追加保存。
- Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
- 读取CSS上的数据
1 2
val dataFrameR = sparkSession.read.format("css").option("resource",resource).option("es.nodes", nodes).load() dataFrameR.show()
插入数据前:
插入数据后:
- 连接配置。
- 提交Spark作业
- 将写好的代码生成jar包,上传至DLI中。
- 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。
控制台操作请参考《数据湖探索用户指南》。
API操作请参考《数据湖探索API参考》>《创建批处理作业》。
- 如果选择Spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.css。
- 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 “Spark参数(--conf)” 配置
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*
- 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”。
- 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
- 构造依赖信息,创建SparkSession
- 完整示例代码
- Maven依赖
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- 通过SQL API访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
import org.apache.spark.sql.SparkSession object Test_SQL_CSS { def main(args: Array[String]): Unit = { // Create a SparkSession session. val sparkSession = SparkSession.builder().getOrCreate() // Create a DLI data table for DLI-associated CSS sparkSession.sql("create table css_table(id long, name string) using css options( 'es.nodes' = 'to-css-1174404217-QG2SwbVV.datasource.com:9200', 'es.nodes.wan.only' = 'true', 'resource' = '/mytest/css')") //*****************************SQL model*********************************** // Insert data into the DLI data table sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')") // Read data from DLI data table val dataFrame = sparkSession.sql("select * from css_table") dataFrame.show() // drop table sparkSession.sql("drop table css_table") sparkSession.close() } }
- 通过DataFrame API访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
import org.apache.spark.sql.{Row, SaveMode, SparkSession}; import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}; object Test_SQL_CSS { def main(args: Array[String]): Unit = { //Create a SparkSession session. val sparkSession = SparkSession.builder().getOrCreate() //*****************************DataFrame model*********************************** // Setting the /index/type of CSS val resource = "/mytest/css" // Define the cross-origin connection address of the CSS cluster val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200" //Setting schema val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false))) // Construction data val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob"))) // Create a DataFrame from RDD and schema val dataFrame_1 = sparkSession.createDataFrame(rdd, schema) //Write data to the CSS dataFrame_1.write.format("css") .option("resource", resource) .option("es.nodes", nodes) .mode(SaveMode.Append) .save() //Read data val dataFrameR = sparkSession.read.format("css").option("resource", resource).option("es.nodes", nodes).load() dataFrameR.show() spardSession.close() } }
- Maven依赖
CSS安全集群
- 开发说明
- 构造依赖信息,创建SparkSession
- 导入依赖
涉及到的mvn依赖库
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
import相关依赖包1 2
import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
- 创建会话,并设置AK/SK。
认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
1 2 3 4 5
val sparkSession = SparkSession.builder().getOrCreate() sparkSession.conf.set("fs.obs.access.key", ak) sparkSession.conf.set("fs.obs.secret.key", sk) sparkSession.conf.set("fs.obs.endpoint", enpoint) sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false")
- 导入依赖
- 通过SQL API访问
- 创建DLI跨源访问 CSS的关联表。
1 2 3 4 5 6 7 8 9 10 11
sparkSession.sql("create table css_table(id int, name string) using css options( 'es.nodes' 'to-css-1174404221-Y2bKVIqY.datasource.com:9200', 'es.nodes.wan.only'='true', 'resource'='/mytest/css', 'es.net.ssl'='true', 'es.net.ssl.keystore.location'='obs://桶名/path/transport-keystore.jks', 'es.net.ssl.keystore.pass'='***', 'es.net.ssl.truststore.location'='obs://桶名/path/truststore.jks', 'es.net.ssl.truststore.pass'='***', 'es.net.http.auth.user'='admin', 'es.net.http.auth.pass'='***')")
表2 创建表参数 参数
说明
es.nodes
CSS的连接地址,需要先创建跨源连接。具体操作请参考《数据湖探索用户指南》。
创建经典型跨源连接后,使用经典型跨源连接中返回的连接地址。
创建增强型跨源连接后,使用CSS提供的"内网访问地址",格式为"IP1:PORT1,IP2:PORT2"。
resource
指定在CSS关联的资源名,用"/index/type"指定资源位置(可简单理解index为database,type为table,但绝不等同)。
说明:1. ES 6.X版本中,单个Index只支持唯一type,type名可以自定义。
2. ES 7.X版本中,单个Index将使用“_doc”作为type名,不再支持自定义。若访问ES 7.X版本时,该参数只需要填写index即可。
pushdown
CSS的下压功能是否开启,默认为“true”。包含大量IO传输的表在有where过滤条件的情况下能够开启pushdown降低IO。
strict
CSS的下压是否是严格的,默认为“false”。精确匹配的场景下比pushdown降低更多IO。
batch.size.entries
单次batch插入entry的条数上限,默认为1000。如果单条数据非常大,在bulk存储设置的数据条数前提前到达了单次batch的总数据量上限,则停止存储数据,以batch.size.bytes为准,提交该批次的数据。
batch.size.bytes
单次batch的总数据量上限,默认为1mb。如果单条数据非常小,在bulk存储到总数据量前提前到达了单次batch的条数上限,则停止存储数据,以batch.size.entries为准,提交该批次的数据。
es.nodes.wan.only
是否仅通过域名访问es节点,默认为“false”。使用经典型跨源的连接地址作为es.nodes时,该参数需要配置为“true”;使用CSS服务提供的原始内网IP地址作为es.nodes时,不需要填写该参数或者配置为“false”。
es.mapping.id
指定一个字段,其值作为es中Document的id。
说明:- 相同/index/type下的Document id是唯一的。如果作为Document id的字段存在重复值,则在执行插入es时,重复id的Document将会被覆盖。
- 该特性可以用作容错解决方案。当插入数据执行一半时,DLI作业失败,会有部分数据已经插入到es中,这部分为冗余数据。如果设置了Document id,则在重新执行DLI作业时,会覆盖上一次的冗余数据。
es.net.ssl
连接安全CSS集群,默认值为“false”。
es.net.ssl.keystore.location
安全CSS集群的证书,生成的keystore文件在OBS上的地址。
es.net.ssl.keystore.pass
安全CSS集群的证书,生成的keystore文件时的密码。
es.net.ssl.truststore.location
安全CSS集群的证书,生成的truststore文件在OBS上的地址。
es.net.ssl.truststore.pass
安全CSS集群的证书,生成的truststore文件时的密码。
es.net.http.auth.user
安全CSS集群的用户名。
es.net.http.auth.pass
安全CSS集群的密码。
“batch.size.entries”和“batch.size.bytes”分别对数据条数和数据量大小进行限制。
- 插入数据。
1
sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')")
- 查询数据。
1 2
val dataFrame = sparkSession.sql("select * from css_table") dataFrame.show()
插入数据前:
插入数据后:
- 删除数据表
1
sparkSession.sql("drop table css_table")
- 创建DLI跨源访问 CSS的关联表。
- 通过DataFrame API访问
- 连接配置。
1 2
val resource = "/mytest/css" val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200"
- 构造schema,并添加数据。
1 2
val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false))) val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob")))
- 导入数据到CSS。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
val dataFrame_1 = sparkSession.createDataFrame(rdd, schema) dataFrame_1.write .format("css") .option("resource", resource) .option("es.nodes", nodes) .option("es.net.ssl", "true") .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***") .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***") .option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .mode(SaveMode.Append) .save()
SaveMode 有四种保存类型:
- ErrorIfExis:如果已经存在数据,则抛出异常。
- Overwrite:如果已经存在数据,则覆盖原数据。
- Append:如果已经存在数据,则追加保存。
- Ignore:如果已经存在数据,则不做操作。这类似于SQL中的“如果不存在则创建表”。
- 读取CSS上的数据
1 2 3 4 5 6 7 8 9 10 11 12
val dataFrameR = sparkSession.read.format("css") .option("resource",resource) .option("es.nodes", nodes) .option("es.net.ssl", "true") .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***") .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***") .option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .load() dataFrameR.show()
插入数据前:
插入数据后:
- 连接配置。
- 提交Spark作业
- 构造依赖信息,创建SparkSession
- 完整示例代码
- Maven依赖
1 2 3 4 5
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- 通过SQL API访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
import org.apache.spark.sql.SparkSession object csshttpstest { def main(args: Array[String]): Unit = { //Create a SparkSession session. val sparkSession = SparkSession.builder().getOrCreate() // Create a DLI data table for DLI-associated CSS sparkSession.sql("create table css_table(id long, name string) using css options('es.nodes' = '192.168.6.204:9200','es.nodes.wan.only' = 'false','resource' = '/mytest','es.net.ssl'='true','es.net.ssl.keystore.location' = 'obs://xietest1/lzq/keystore.jks','es.net.ssl.keystore.pass' = '**','es.net.ssl.truststore.location'='obs://xietest1/lzq/truststore.jks','es.net.ssl.truststore.pass'='**','es.net.http.auth.user'='admin','es.net.http.auth.pass'='**')") //*****************************SQL model*********************************** // Insert data into the DLI data table sparkSession.sql("insert into css_table values(13, 'John'),(22, 'Bob')") // Read data from DLI data table val dataFrame = sparkSession.sql("select * from css_table") dataFrame.show() // drop table sparkSession.sql("drop table css_table") sparkSession.close() } }
- 通过DataFrame API访问
认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
import org.apache.spark.sql.{Row, SaveMode, SparkSession}; import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}; object Test_SQL_CSS { def main(args: Array[String]): Unit = { //Create a SparkSession session. val sparkSession = SparkSession.builder().getOrCreate() sparkSession.conf.set("fs.obs.access.key", ak) sparkSession.conf.set("fs.obs.secret.key", sk) //*****************************DataFrame model*********************************** // Setting the /index/type of CSS val resource = "/mytest/css" // Define the cross-origin connection address of the CSS cluster val nodes = "to-css-1174405013-Ht7O1tYf.datasource.com:9200" //Setting schema val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false))) // Construction data val rdd = sparkSession.sparkContext.parallelize(Seq(Row(12, "John"),Row(21,"Bob"))) // Create a DataFrame from RDD and schema val dataFrame_1 = sparkSession.createDataFrame(rdd, schema) //Write data to the CSS dataFrame_1.write .format("css") .option("resource", resource) .option("es.nodes", nodes) .option("es.net.ssl", "true") .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***") .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***") .option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .mode(SaveMode.Append) .save(); //Read data val dataFrameR = sparkSession.read.format("css") .option("resource", resource) .option("es.nodes", nodes) .option("es.net.ssl", "true") .option("es.net.ssl.keystore.location", "obs://桶名/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***") .option("es.net.ssl.truststore.location", "obs://桶名/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***") .option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .load() dataFrameR.show() spardSession.close() } }
- Maven依赖