java样例代码
前提条件
在DLI管理控制台上已完成创建跨源连接。具体操作请参考《数据湖探索用户指南》。
CSS非安全集群
- 开发说明
- 代码实现
- 构造依赖信息,创建SparkSession
- 通过SQL API访问
- 创建DLI跨源访问 CSS关联表。
sparkSession.sql("create table css_table(id long, name string) using css options( 'es.nodes' = '192.168.9.213:9200', 'es.nodes.wan.only' = 'true','resource' ='/mytest')");
- 插入数据。
sparkSession.sql("insert into css_table values(18, 'John'),(28, 'Bob')");
- 查询数据。
sparkSession.sql("select * from css_table").show();
- 删除数据表。
sparkSession.sql("drop table css_table");
- 创建DLI跨源访问 CSS关联表。
- 提交Spark作业
- 将写好的代码文件生成jar包,上传至DLI中。
- 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。
控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。
- 如果选择Spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.css。
- 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 “Spark参数(--conf)” 配置
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*
- 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”。
- 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。
- 代码实现
- 完整示例代码
- Maven依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- 通过SQL API 访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
import org.apache.spark.sql.*; public class java_css_unsecurity { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("datasource-css-unsecurity").getOrCreate(); // Create a DLI data table for DLI-associated CSS sparkSession.sql("create table css_table(id long, name string) using css options( 'es.nodes' = '192.168.15.34:9200', 'es.nodes.wan.only' = 'true', 'resource' = '/mytest')"); //*****************************SQL model*********************************** // Insert data into the DLI data table sparkSession.sql("insert into css_table values(18, 'John'),(28, 'Bob')"); // Read data from DLI data table sparkSession.sql("select * from css_table").show(); // drop table sparkSession.sql("drop table css_table"); sparkSession.close(); } }
- Maven依赖
CSS安全集群
- 准备工作
请参考CSS安全集群配置,准备工作的主要目的是为了生成keystore.jks文件和truststore.jks文件,并将其上传至OBS桶中。
- 开发说明-https off
如果没有开启https访问的话,不需要去生成keystore.jks和truststore.jks文件的,只需要设置好ssl访问和账号密码参数即可。
- 构造依赖信息,创建SparkSession
- 导入依赖。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
import相关依赖包:1
import org.apache.spark.sql.SparkSession;
- 创建会话。
1
SparkSession sparkSession = SparkSession.builder().appName("datasource-css").getOrCreate();
- 导入依赖。
- 通过SQL API 访问
- 创建DLI跨源访问 CSS的关联表。
1
sparkSession.sql("create table css_table(id long, name string) using css options( 'es.nodes' = '192.168.9.213:9200', 'es.nodes.wan.only' = 'true', 'resource' = '/mytest','es.net.ssl'='false','es.net.http.auth.user'='admin','es.net.http.auth.pass'='*******')");
- 创建CSS跨源表的参数详情可参考表1。
- 上述示例中,因为CSS安全集群关闭了https访问,所以“es.net.ssl”参数要设置为“false”。“es.net.http.auth.user”以及“es.net.http.auth.pass”为创建集群时设置的账号和密码。
- 插入数据
1
sparkSession.sql("insert into css_table values(18, 'John'),(28, 'Bob')");
- 查询数据
1
sparkSession.sql("select * from css_table").show();
- 删除数据表
sparkSession.sql("drop table css_table");
- 创建DLI跨源访问 CSS的关联表。
- 提交Spark作业
- 完整示例代码
- Maven依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- Maven依赖
- 构造依赖信息,创建SparkSession
- 开发说明-https on
- 构造依赖信息,创建SparkSession
- 导入依赖。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
import相关依赖包:
1 2 3 4 5
import org.apache.spark.SparkFiles; import org.apache.spark.sql.SparkSession; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths;
- 创建会话。
1
SparkSession sparkSession = SparkSession.builder().appName("datasource-css").getOrCreate();
- 拷贝证书。
sparkSession.sparkContext().addFile("obs://桶名/地址/transport-keystore.jks"); sparkSession.sparkContext().addFile("obs://桶名/地址/truststore.jks"); // 获取当前工作目录的路径 String pathUser = System.getProperty("user.dir"); System.out.println("path_user is " + pathUser); // 获取文件名 String esTransportKeystoreFileName = SparkFiles.get("transport-keystore.jks"); String esTruststoreFileName = SparkFiles.get("truststore.jks"); System.out.println("esTransportKeystoreFileName is " + esTransportKeystoreFileName); System.out.println("esTruststoreFileName is " + esTruststoreFileName); // 拼接文件完整路径 String esTransportKeystoreLocalPath = pathUser + "/" + "transport-keystore.jks"; String esTruststoreLocalPath = pathUser + "/" + "truststore.jks"; System.out.println("esTransportKeystoreLocalPath is " + esTransportKeystoreLocalPath); System.out.println("esTruststoreLocalPath is " + esTruststoreLocalPath); try { // 拷贝 keystore 文件 copyFile(esTransportKeystoreFileName, esTransportKeystoreLocalPath); // 拷贝 truststore 文件 copyFile(esTruststoreFileName, esTruststoreLocalPath); // 等待一段时间 Thread.sleep(2000); System.out.println("Files copied successfully:"); System.out.println("es_transport-keystore.jks: " + esTransportKeystoreLocalPath); System.out.println("es_truststore.jks: " + esTruststoreLocalPath); } catch (IOException | InterruptedException e) { e.printStackTrace(); }
- 导入依赖。
- 通过SQL API 访问
- 创建DLI跨源访问 CSS的关联表。
1 2 3
sparkSession.sql("create table css_table(id long, name string) using css options( 'es.nodes' = '192.168.13.189:9200', 'es.nodes.wan.only' = 'true', 'resource' = '/mytest','es.net.ssl'='true','es.net.ssl.keystore.location' = 'file://" + esTransportKeystoreLocalPath + "','es.net.ssl.keystore.pass' = '**', 'es.net.ssl.truststore.location'='file://" + esTruststoreLocalPath + "', 'es.net.ssl.truststore.pass'='***','es.net.http.auth.user'='admin','es.net.http.auth.pass'='**')");
创建CSS跨源表的参数详情可参考表1。
- 插入数据
1
sparkSession.sql("insert into css_table values(18, 'John'),(28, 'Bob')");
- 查询数据
1
sparkSession.sql("select * from css_table").show();
- 删除数据表
sparkSession.sql("drop table css_table");
- 创建DLI跨源访问 CSS的关联表。
- 提交Spark作业
- 完整示例代码
- Maven依赖
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency>
- 通过SQL API 访问
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
import org.apache.spark.SparkFiles; import org.apache.spark.sql.SparkSession; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; public class java_css_security_httpson { public static void main(String[] args) { SparkSession sparkSession = SparkSession.builder().appName("datasource-css").getOrCreate(); sparkSession.sparkContext().addFile("obs://桶名/地址/transport-keystore.jks"); sparkSession.sparkContext().addFile("obs://桶名/地址/css/truststore.jks"); // 获取当前工作目录的路径 String pathUser = System.getProperty("user.dir"); System.out.println("path_user is " + pathUser); // 获取文件名 String esTransportKeystoreFileName = SparkFiles.get("transport-keystore.jks"); String esTruststoreFileName = SparkFiles.get("truststore.jks"); System.out.println("esTransportKeystoreFileName is " + esTransportKeystoreFileName); System.out.println("esTruststoreFileName is " + esTruststoreFileName); // 拼接文件完整路径 String esTransportKeystoreLocalPath = pathUser + "/" + "transport-keystore.jks"; String esTruststoreLocalPath = pathUser + "/" + "truststore.jks"; System.out.println("esTransportKeystoreLocalPath is " + esTransportKeystoreLocalPath); System.out.println("esTruststoreLocalPath is " + esTruststoreLocalPath); try { // 拷贝 keystore 文件 copyFile(esTransportKeystoreFileName, esTransportKeystoreLocalPath); // 拷贝 truststore 文件 copyFile(esTruststoreFileName, esTruststoreLocalPath); // 等待一段时间 Thread.sleep(2000); System.out.println("Files copied successfully:"); System.out.println("es_transport-keystore.jks: " + esTransportKeystoreLocalPath); System.out.println("es_truststore.jks: " + esTruststoreLocalPath); } catch (IOException | InterruptedException e) { e.printStackTrace(); } // Create a DLI data table for DLI-associated CSS sparkSession.sql("create table css_table(id long, name string) using css options( 'es.nodes' = '192.168.13.189:9200', 'es.nodes.wan.only' = 'true', 'resource' = '/mytest','es.net.ssl'='true','es.net.ssl.keystore.location' = 'file://" + esTransportKeystoreLocalPath + "','es.net.ssl.keystore.pass' = '**','es.net.ssl.truststore.location'='file://" + esTruststoreLocalPath + "','es.net.ssl.truststore.pass'='**','es.net.http.auth.user'='admin','es.net.http.auth.pass'='**')"); //*****************************SQL model*********************************** // Insert data into the DLI data table sparkSession.sql("insert into css_table values(34, 'Yuan'),(28, 'Kids')"); // Read data from DLI data table sparkSession.sql("select * from css_table").show(); // drop table sparkSession.sql("drop table css_table"); sparkSession.close(); } private static void copyFile(String sourcePath, String destinationPath) throws IOException { // 从远程存储复制文件到本地的操作 byte[] fileContent = Files.readAllBytes(Paths.get(sourcePath)); Files.write(Paths.get(destinationPath), fileContent); } }
- Maven依赖
- 构造依赖信息,创建SparkSession