使用JDBC提交作业
操作场景
在Linux或Windows环境下您可以使用JDBC应用程序连接DLI服务端提交作业。
- 使用JDBC连接DLI提交的作业运行在Spark引擎上。
- JDBC版本2.X版本功能重构后,仅支持从DLI作业桶读取查询结果,如需使用该特性需具备以下条件:
- 在DLI管理控制台“全局配置 > 工程配置”中完成作业桶的配置。
- 提交工单申请开启查询结果写入桶特性的白名单。
DLI类型 |
JDBC类型 |
JAVA类型 |
---|---|---|
INT |
INTEGER |
java.lang.Integer |
STRING |
VARCHAR |
java.lang.String |
FLOAT |
FLOAT |
java.lang.Float |
DOUBLE |
DOUBLE |
java.lang.Double |
DECIMAL |
DECIMAL |
java.math.BigDecimal |
BOOLEAN |
BOOLEAN |
java.lang.Boolean |
SMALLINT/SHORT |
SMALLINT |
java.lang.Short |
TINYINT |
TINYINT |
java.lang.Short |
BIGINT/LONG |
BIGINT |
java.lang.Long |
TIMESTAMP |
TIMESTAMP |
java.sql.Timestamp |
CHAR |
CHAR |
Java.lang.Character |
VARCHAR |
VARCHAR |
java.lang.String |
DATE |
DATE |
java.sql.Date |
前提条件
在使用JDBC前,需要进行如下操作:
- 授权。
DLI使用统一身份认证服务(Identity and Access Management,简称IAM)进行精细的企业级多租户管理。该服务提供用户身份认证、权限分配、访问控制等功能,可以帮助您安全地控制华为云资源的访问。
通过IAM,您可以在华为云账号中给员工创建IAM用户,并使用策略来控制他们对华为云资源的访问范围。
目前包括角色(粗粒度授权)和策略(细粒度授权)。具体的权限介绍和授权操作请参考《数据湖探索用户指南》。
- 创建队列。在“资源管理 > 队列管理”下,单击右上角“购买队列”,进入购买队列页面选择“通用队列”,即Spark作业的计算资源。
如果创建队列的用户不是管理员用户,在创建队列后,需要管理员用户赋权后才可使用。关于赋权的具体操作请参考《数据湖探索用户指南》。
操作步骤
- 在使用JDBC的机器中安装JDK,JDK版本为1.7或以上版本,并配置环境变量。
- 参考下载JDBC驱动包章节,获取DLI JDBC驱动包“huaweicloud-dli-jdbc-<version>.zip”,解压,获得“huaweicloud-dli-jdbc-<version>-jar-with-dependencies.jar”。
- 在使用JDBC的机器中,将上一步解压的文件“huaweicloud-dli-jdbc-1.1.1-jar-with-dependencies.jar”添加至Java工程的“classpath”路径下。
- DLI JDBC提供两种身份认证模式连接到DLI服务,即Token和AK/SK。获取Token和AK/SK的方法请参见认证。
- 使用Class.forName()加载DLI JDBC驱动程序。
Class.forName("com.huawei.dli.jdbc.DliDriver");
- 通过DriverManager的GetConnection方法创建Connection。
Connection conn = DriverManager.getConnection(String url, Properties info);
其中,JDBC的配置项通过url传入,请参考表2配置参数。JDBC配置对象,除了在url中以分号间隔设置配置项外,还可以通过Info对象动态设置属性项,具体属性项参见表3。表2 数据库连接参数 参数
描述
url
url的格式如下。
jdbc:dli://<endPoint>/projectId? <key1>=<val1>;<key2>=<val2>…
- endpoint指DLI的域名。projectId指项目ID。
在地区和终端节点获取DLI对应的Endpoint,从华为云“用户名”>“我的凭证”页面获取项目编号。
- “?”后面接其他配置项,每个配置项以“key=value”的形式列出,配置项之间以“;”隔开,这些配置项也可以通过Info对象传入。
Info
Info传入自定义的配置项,若Info没有属性项传入,可设为null。配置格式为:info.setProperty("属性项", "属性值")。
表3 属性项 属性项
必须配置
默认值
描述
queuename
是
-
DLI服务的队列名称。
databasename
否
-
数据库名称。
authenticationmode
否
token
身份认证方式,当前支持两种:token或aksk。
accesskey
是
-
AK/SK认证密钥,获取方式请参考认证。
secretkey
是
-
AK/SK认证密钥,获取方式请参考认证。
regionname
authenticationmode=aksk时必须配置
-
区域名称,具体区域请参考地区和终端节点。
servicename
authenticationmode=aksk时必须配置
-
服务名称,即“dli”。
token
authenticationmode=token时必须配置
-
Token认证,认证方式请参考认证。
charset
否
UTF-8
JDBC编码方式。
usehttpproxy
否
false
是否使用访问代理。
proxyhost
usehttpproxy=true时必须配置
-
访问代理host。
proxyport
usehttpproxy=true时必须配置
-
访问代理端口。
dli.sql.checkNoResultQuery
否
false
是否允许调用executeQuery接口执行没有返回结果的语句(如DDL)。
- “false”表示允许调用。
- “true”表示不允许调用。
jobtimeout
否
300
提交作业终止时间,单位:秒。
iam.endpoint
否,默认根据regionName自动拼接
-
终端节点,具体区域请参考地区和终端节点。
obs.endpoint
否,默认根据regionName自动拼接
-
终端节点,具体区域请参考地区和终端节点。
directfetchthreshold
否
1000
请您根据业务情况判断返回结果数是否超过设置的阈值。
默认阈值1000。
- endpoint指DLI的域名。projectId指项目ID。
- 创建Statement对象,设置相关参数并提交Spark SQL到DLI服务。
Statement statement = conn.createStatement();
statement.execute("SET dli.sql.spark.sql.forcePartitionPredicatesOnPartitionedTable.enabled=true");
statement.execute("select * from tb1");
- 获取结果。
ResultSet rs = statement.getResultSet();
- 显示结果。
while (rs.next()) { int a = rs.getInt(1); int b = rs.getInt(2); }
- 关闭连接。
conn.close();
示例
- 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
- 本示例以ak和sk保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量System.getenv("AK")和System.getenv("SK")。
import java.sql.*; import java.util.Properties; public class DLIJdbcDriverExample { public static void main(String[] args) throws ClassNotFoundException, SQLException { Connection conn = null; try { Class.forName("com.huawei.dli.jdbc.DliDriver"); String url = "jdbc:dli://<endpoint>/<projectId>?databasename=db1;queuename=testqueue"; Properties info = new Properties(); info.setProperty("authenticationmode", "aksk"); info.setProperty("regionname", "<real region name>"); info.setProperty("accesskey", "<System.getenv("AK")>"); info.setProperty("secretkey", "<System.getenv("SK")>"); conn = DriverManager.getConnection(url, info); Statement statement = conn.createStatement(); statement.execute("select * from tb1"); ResultSet rs = statement.getResultSet(); int line = 0; while (rs.next()) { line ++; int a = rs.getInt(1); int b = rs.getInt(2); System.out.println("Line:" + line + ":" + a + "," + b); } statement.execute("SET dli.sql.spark.sql.forcePartitionPredicatesOnPartitionedTable.enabled=true"); statement.execute("describe tb1"); ResultSet rs1 = statement.getResultSet(); line = 0; while (rs1.next()) { line ++; String a = rs1.getString(1); String b = rs1.getString(2); System.out.println("Line:" + line + ":" + a + "," + b); } } catch (SQLException ex) { } finally { if (conn != null) { conn.close(); } } } }
开启重试功能
开启JDBC重试功能,执行查询操作失败时系统会进行重试。
- 为避免重复数据插入等操作,非查询语句不进行重试。
- 1.1.5及以上版本JDBC驱动包具有该功能。如果需要使用该功能,请获取最新版本JDBC驱动包。
开启重试功能需在Info参数中添加如表4所示属性项。
属性项 |
必须配置 |
默认值 |
描述 |
---|---|---|---|
USE_RETRY_KEY |
是 |
false |
是否开启重试。设置为“true”,表示开启重试。 |
RETRY_TIMES_KEY |
是 |
3000 |
重试时间间隔(毫秒)。建议设置为30000ms。 |
RETRY_INTERVALS_KEY |
是 |
3 |
重试次数。建议设置为3~5次。 |
设置JDBC配置项参数,开启重试功能,创建链接,示例如下:
import com.xxx.dli.jdbs.utils.ConnectionResource;//引入“ConnectionResource”,请按需修改类别名称 import java.sql.*; import java.util.Properties; public class DLIJdbcDriverExample { private static final String X_AUTH_TOKEN_VALUE = "<realtoken>"; public static void main(String[] args) throws ClassNotFoundException, SQLException { Connection conn = null; try { Class.forName("com.huawei.dli.jdbc.DliDriver"); String url = "jdbc:dli://<endpoint>/<projectId>?databasename=db1;queuename=testqueue"; Properties info = new Properties(); info.setProperty("token", X_AUTH_TOKEN_VALUE); info.setProperty(ConnectionResource.USE_RETRY_KEY, "true"); //开启重试 info.setProperty(ConnectionResource.RETRY_TIMES_KEY, "30000");// 重试间隔ms info.setProperty(ConnectionResource.RETRY_INTERVALS_KEY, "5");// 重试次数 conn = DriverManager.getConnection(url, info); Statement statement = conn.createStatement(); statement.execute("select * from tb1"); ResultSet rs = statement.getResultSet(); int line = 0; while (rs.next()) { line ++; int a = rs.getInt(1); int b = rs.getInt(2); System.out.println("Line:" + line + ":" + a + "," + b); } statement.execute("describe tb1"); ResultSet rs1 = statement.getResultSet(); line = 0; while (rs1.next()) { line ++; String a = rs1.getString(1); String b = rs1.getString(2); System.out.println("Line:" + line + ":" + a + "," + b); } } catch (SQLException ex) { } finally { if (conn != null) { conn.close(); } } } }