更新时间:2024-09-19 GMT+08:00

使用JDBC提交作业

操作场景

在Linux或Windows环境下您可以使用JDBC应用程序连接DLI服务端提交作业。

  • 使用JDBC连接DLI提交的作业运行在Spark引擎上。
  • JDBC版本2.X版本功能重构后,仅支持从DLI作业桶读取查询结果,如需使用该特性需具备以下条件:
    • 在DLI管理控制台“全局配置 > 工程配置”中完成作业桶的配置。
    • 提交工单申请开启查询结果写入桶特性的白名单。
DLI支持13种数据类型,每一种类型都可以映射成一种JDBC类型,在使用JDBC连接服务器时,请使用映射后的JAVA类型,映射关系如表1所示。
表1 数据类型映射

DLI类型

JDBC类型

JAVA类型

INT

INTEGER

java.lang.Integer

STRING

VARCHAR

java.lang.String

FLOAT

FLOAT

java.lang.Float

DOUBLE

DOUBLE

java.lang.Double

DECIMAL

DECIMAL

java.math.BigDecimal

BOOLEAN

BOOLEAN

java.lang.Boolean

SMALLINT/SHORT

SMALLINT

java.lang.Short

TINYINT

TINYINT

java.lang.Short

BIGINT/LONG

BIGINT

java.lang.Long

TIMESTAMP

TIMESTAMP

java.sql.Timestamp

CHAR

CHAR

Java.lang.Character

VARCHAR

VARCHAR

java.lang.String

DATE

DATE

java.sql.Date

前提条件

在使用JDBC前,需要进行如下操作:

  1. 授权。

    DLI使用统一身份认证服务(Identity and Access Management,简称IAM)进行精细的企业级多租户管理。该服务提供用户身份认证、权限分配、访问控制等功能,可以帮助您安全地控制华为云资源的访问。

    通过IAM,您可以在华为云账号中给员工创建IAM用户,并使用策略来控制他们对华为云资源的访问范围。

    目前包括角色(粗粒度授权)和策略(细粒度授权)。具体的权限介绍和授权操作请参考《数据湖探索用户指南》。

  2. 创建队列。在“资源管理 > 队列管理”下,单击右上角“购买队列”,进入购买队列页面选择“通用队列”,即Spark作业的计算资源。

    如果创建队列的用户不是管理员用户,在创建队列后,需要管理员用户赋权后才可使用。关于赋权的具体操作请参考《数据湖探索用户指南》。

操作步骤

  1. 在使用JDBC的机器中安装JDK,JDK版本为1.7或以上版本,并配置环境变量。
  2. 参考下载JDBC驱动包章节,获取DLI JDBC驱动包“huaweicloud-dli-jdbc-<version>.zip”,解压,获得“huaweicloud-dli-jdbc-<version>-jar-with-dependencies.jar”。
  3. 在使用JDBC的机器中,将上一步解压的文件“huaweicloud-dli-jdbc-1.1.1-jar-with-dependencies.jar”添加至Java工程的“classpath”路径下。
  4. DLI JDBC提供两种身份认证模式连接到DLI服务,即Token和AK/SK。获取Token和AK/SK的方法请参见认证
  5. 使用Class.forName()加载DLI JDBC驱动程序。

    Class.forName("com.huawei.dli.jdbc.DliDriver");

  6. 通过DriverManager的GetConnection方法创建Connection。

    Connection conn = DriverManager.getConnection(String url, Properties info);

    其中,JDBC的配置项通过url传入,请参考表2配置参数。JDBC配置对象,除了在url中以分号间隔设置配置项外,还可以通过Info对象动态设置属性项,具体属性项参见表3
    表2 数据库连接参数

    参数

    描述

    url

    url的格式如下。

    jdbc:dli://<endPoint>/projectId? <key1>=<val1>;<key2>=<val2>…

    • endpoint指DLI的域名。projectId指项目ID。

      地区和终端节点获取DLI对应的Endpoint,从华为云“用户名”>“我的凭证”页面获取项目编号。

    • “?”后面接其他配置项,每个配置项以“key=value”的形式列出,配置项之间以“;”隔开,这些配置项也可以通过Info对象传入。

    Info

    Info传入自定义的配置项,若Info没有属性项传入,可设为null。配置格式为:info.setProperty("属性项", "属性值")。

    表3 属性项

    属性项

    必须配置

    默认值

    描述

    queuename

    -

    DLI服务的队列名称。

    databasename

    -

    数据库名称。

    authenticationmode

    token

    身份认证方式,当前支持两种:token或aksk。

    accesskey

    -

    AK/SK认证密钥,获取方式请参考认证

    secretkey

    -

    AK/SK认证密钥,获取方式请参考认证

    regionname

    authenticationmode=aksk时必须配置

    -

    区域名称,具体区域请参考地区和终端节点

    servicename

    authenticationmode=aksk时必须配置

    -

    服务名称,即“dli”。

    token

    authenticationmode=token时必须配置

    -

    Token认证,认证方式请参考认证

    charset

    UTF-8

    JDBC编码方式。

    usehttpproxy

    false

    是否使用访问代理。

    proxyhost

    usehttpproxy=true时必须配置

    -

    访问代理host。

    proxyport

    usehttpproxy=true时必须配置

    -

    访问代理端口。

    dli.sql.checkNoResultQuery

    false

    是否允许调用executeQuery接口执行没有返回结果的语句(如DDL)。

    • “false”表示允许调用。
    • “true”表示不允许调用。

    jobtimeout

    300

    提交作业终止时间,单位:秒。

    iam.endpoint

    否,默认根据regionName自动拼接

    -

    终端节点,具体区域请参考地区和终端节点

    obs.endpoint

    否,默认根据regionName自动拼接

    -

    终端节点,具体区域请参考地区和终端节点

    directfetchthreshold

    1000

    请您根据业务情况判断返回结果数是否超过设置的阈值。

    默认阈值1000。

  7. 创建Statement对象,设置相关参数并提交Spark SQL到DLI服务。

    Statement statement = conn.createStatement();

    statement.execute("SET dli.sql.spark.sql.forcePartitionPredicatesOnPartitionedTable.enabled=true");

    statement.execute("select * from tb1");

  8. 获取结果。

    ResultSet rs = statement.getResultSet();

  9. 显示结果。

    while (rs.next()) {
    int a = rs.getInt(1);
    int b = rs.getInt(2);
    }

  10. 关闭连接。

    conn.close();

示例

  • 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
  • 本示例以ak和sk保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量System.getenv("AK")和System.getenv("SK")。
import java.sql.*;
import java.util.Properties;

public class DLIJdbcDriverExample {

    public static void main(String[] args) throws ClassNotFoundException, SQLException {
        Connection conn = null;
        try {
            Class.forName("com.huawei.dli.jdbc.DliDriver");
            String url = "jdbc:dli://<endpoint>/<projectId>?databasename=db1;queuename=testqueue";
            Properties info = new Properties();
            info.setProperty("authenticationmode", "aksk");
            info.setProperty("regionname", "<real region name>");
            info.setProperty("accesskey", "<System.getenv("AK")>");
            info.setProperty("secretkey", "<System.getenv("SK")>");
            conn = DriverManager.getConnection(url, info);
            Statement statement = conn.createStatement();
            statement.execute("select * from tb1");
            ResultSet rs = statement.getResultSet();
            int line = 0;
            while (rs.next()) {
                line ++;
                int a = rs.getInt(1);
                int b = rs.getInt(2);
                System.out.println("Line:" + line + ":" + a + "," + b);
            }
            statement.execute("SET dli.sql.spark.sql.forcePartitionPredicatesOnPartitionedTable.enabled=true");
            statement.execute("describe tb1");
            ResultSet rs1 = statement.getResultSet();
            line = 0;
            while (rs1.next()) {
                line ++;
                String a = rs1.getString(1);
                String b = rs1.getString(2);
                System.out.println("Line:" + line + ":" + a + "," + b);
            }
        } catch (SQLException ex) {
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
    }
}

开启重试功能

开启JDBC重试功能,执行查询操作失败时系统会进行重试。

  • 为避免重复数据插入等操作,非查询语句不进行重试。
  • 1.1.5及以上版本JDBC驱动包具有该功能。如果需要使用该功能,请获取最新版本JDBC驱动包。

开启重试功能需在Info参数中添加如表4所示属性项。

表4 重试功能属性项

属性项

必须配置

默认值

描述

USE_RETRY_KEY

false

是否开启重试。设置为“true”,表示开启重试。

RETRY_TIMES_KEY

3000

重试时间间隔(毫秒)。建议设置为30000ms。

RETRY_INTERVALS_KEY

3

重试次数。建议设置为3~5次。

设置JDBC配置项参数,开启重试功能,创建链接,示例如下:

import com.xxx.dli.jdbs.utils.ConnectionResource;//引入“ConnectionResource”,请按需修改类别名称
import java.sql.*;
import java.util.Properties;

public class DLIJdbcDriverExample {

    private static final String X_AUTH_TOKEN_VALUE = "<realtoken>";
    public static void main(String[] args) throws ClassNotFoundException, SQLException {
        Connection conn = null;
        try {
            Class.forName("com.huawei.dli.jdbc.DliDriver");
            String url = "jdbc:dli://<endpoint>/<projectId>?databasename=db1;queuename=testqueue";
            Properties info = new Properties();
            info.setProperty("token", X_AUTH_TOKEN_VALUE);
            info.setProperty(ConnectionResource.USE_RETRY_KEY, "true"); //开启重试
            info.setProperty(ConnectionResource.RETRY_TIMES_KEY, "30000");// 重试间隔ms
            info.setProperty(ConnectionResource.RETRY_INTERVALS_KEY, "5");// 重试次数
            conn = DriverManager.getConnection(url, info);
            Statement statement = conn.createStatement();
            statement.execute("select * from tb1");
            ResultSet rs = statement.getResultSet();
            int line = 0;
            while (rs.next()) {
                line ++;
                int a = rs.getInt(1);
                int b = rs.getInt(2);
                System.out.println("Line:" + line + ":" + a + "," + b);
            }
            statement.execute("describe tb1");
            ResultSet rs1 = statement.getResultSet();
            line = 0;
            while (rs1.next()) {
                line ++;
                String a = rs1.getString(1);
                String b = rs1.getString(2);
                System.out.println("Line:" + line + ":" + a + "," + b);
            }
        } catch (SQLException ex) {
        } finally {
            if (conn != null) {
                conn.close();
            }
        }
    }
}