更新时间:2024-11-14 GMT+08:00
分享

java样例代码

操作场景

本例提供使用Spark作业访问DWS数据源的java样例代码。

在DLI管理控制台上已完成创建跨源连接并绑定队列。具体操作请参考《数据湖探索用户指南》。

认证用的password硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。

操作前准备

  1. 导入依赖
    • 涉及到的mvn依赖库
      1
      2
      3
      4
      5
      <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>2.3.2</version>
      </dependency>
      
    • import相关依赖包
      1
      import org.apache.spark.sql.SparkSession;
      
  2. 创建会话
    1
    SparkSession sparkSession = SparkSession.builder().appName("datasource-dws").getOrCreate();
    

通过SQL API 访问数据源

  1. 创建DLI跨源访问DWS的关联表,填写连接参数。
    1
    sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ('url'='jdbc:postgresql://10.0.0.233:8000/postgres','dbtable'='test','user'='dbadmin','password'='**')");
    
  2. 插入数据
    1
    sparkSession.sql("insert into dli_to_dws values(3,'Liu'),(4,'Xie')");
    
  3. 查询数据
    1
    sparkSession.sql("select * from dli_to_dws").show();
    

    插入数据后:

提交Spark作业

  1. 将写好的代码文件生成jar包,上传至DLI中。

    控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《上传资源包》。

  2. 在Spark作业编辑器中选择对应的Module模块并执行Spark作业。

    控制台操作请参考《数据湖探索用户指南》。API操作请参考《数据湖探索API参考》>《创建批处理作业》。

    • 如果选择spark版本为2.3.2(即将下线)或2.4.5提交作业时,需要指定Module模块,名称为:sys.datasource.dws。
    • 如果选择Spark版本为3.1.1时,无需选择Module模块, 需在 'Spark参数(--conf)' 配置

      spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/dws/*

      spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/dws/*

    • 通过控制台提交作业请参考《数据湖探索用户指南》中的“选择依赖资源参数说明”。
    • 通过API提交作业请参考《数据湖探索API参考》>《创建批处理作业》中“表2-请求参数说明”关于“modules”参数的说明。

完整示例代码

通过SQL API 访问DWS表

import org.apache.spark.sql.SparkSession;
 
public class java_dws {
    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder().appName("datasource-dws").getOrCreate();
 
        sparkSession.sql("CREATE TABLE IF NOT EXISTS dli_to_dws USING JDBC OPTIONS ('url'='jdbc:postgresql://10.0.0.233:8000/postgres','dbtable'='test','user'='dbadmin','password'='**')");
 
        //*****************************SQL model***********************************
        //Insert data into the DLI data table
        sparkSession.sql("insert into dli_to_dws values(3,'Liu'),(4,'Xie')");
 
        //Read data from DLI data table
        sparkSession.sql("select * from dli_to_dws").show();
 
        //drop table
        sparkSession.sql("drop table dli_to_dws");
 
        sparkSession.close();
    }
}

相关文档