使用JDBC连接DLI并提交SQL作业
操作场景
在Linux或Windows环境下您可以使用JDBC应用程序连接DLI服务端提交作业。
DLI类型 |
JDBC类型 |
JAVA类型 |
---|---|---|
INT |
INTEGER |
java.lang.Integer |
STRING |
VARCHAR |
java.lang.String |
FLOAT |
FLOAT |
java.lang.Float |
DOUBLE |
DOUBLE |
java.lang.Double |
DECIMAL |
DECIMAL |
java.math.BigDecimal |
BOOLEAN |
BOOLEAN |
java.lang.Boolean |
SMALLINT/SHORT |
SMALLINT |
java.lang.Short |
TINYINT |
TINYINT |
java.lang.Short |
BIGINT/LONG |
BIGINT |
java.lang.Long |
TIMESTAMP |
TIMESTAMP |
java.sql.Timestamp |
CHAR |
CHAR |
Java.lang.Character |
VARCHAR |
VARCHAR |
java.lang.String |
DATE |
DATE |
java.sql.Date |
前提条件
在使用JDBC前,需要进行如下操作:
- 授权。
DLI使用统一身份认证服务(Identity and Access Management,简称IAM)进行精细的企业级多租户管理。该服务提供用户身份认证、权限分配、访问控制等功能,可以帮助您安全地控制华为云资源的访问。
通过IAM,您可以在华为云账号中给员工创建IAM用户,并使用策略来控制他们对华为云资源的访问范围。
目前包括角色(粗粒度授权)和策略(细粒度授权)。具体的权限介绍和授权操作请参考《数据湖探索用户指南》。
- 创建队列。在“资源管理 > 队列管理”下,单击右上角“购买队列”,进入购买队列页面选择“通用队列”,即Spark作业的计算资源。
如果创建队列的用户不是管理员用户,在创建队列后,需要管理员用户赋权后才可使用。关于赋权的具体操作请参考《数据湖探索用户指南》。
操作步骤
- 在使用JDBC的机器中安装JDK,JDK版本为1.7或以上版本,并配置环境变量。
- 参考下载并安装JDBC驱动包章节,获取DLI JDBC驱动包“huaweicloud-dli-jdbc-<version>.zip”,解压,获得“huaweicloud-dli-jdbc-<version>-jar-with-dependencies.jar”。
- 在使用JDBC的机器中,将上一步解压的文件“huaweicloud-dli-jdbc-1.1.1-jar-with-dependencies.jar”添加至Java工程的“classpath”路径下。
- DLI JDBC提供两种身份认证模式连接到DLI服务,即Token和AK/SK。获取Token和AK/SK的方法请参见认证鉴权。
- 使用Class.forName()加载DLI JDBC驱动程序。
Class.forName("com.huawei.dli.jdbc.DliDriver");
- 通过DriverManager的GetConnection方法创建Connection。
Connection conn = DriverManager.getConnection(String url, Properties info);
其中,JDBC的配置项通过url传入,请参考表2配置参数。JDBC配置对象,除了在url中以分号间隔设置配置项外,还可以通过Info对象动态设置属性项,具体属性项参见表3。表2 数据库连接参数 参数
描述
url
url的格式如下。
jdbc:dli://<endPoint>/projectId? <key1>=<val1>;<key2>=<val2>…
- endpoint指DLI的域名。projectId指项目ID。
在地区和终端节点获取DLI对应的Endpoint,从华为云“用户名”>“我的凭证”页面获取项目编号。
- “?”后面接其他配置项,每个配置项以“key=value”的形式列出,配置项之间以“;”隔开,这些配置项也可以通过Info对象传入。
Info
Info传入自定义的配置项,若Info没有属性项传入,可设为null。配置格式为:info.setProperty("属性项", "属性值")。
表3 属性项 属性项
必须配置
默认值
描述
不同版本dli-jdbc支持情况
queuename
是
-
DLI服务的队列名称。
dli-jdbc-1.x
dli-jdbc-2.x
databasename
否
-
数据库名称。
dli-jdbc-1.x
dli-jdbc-2.x
authenticationmode
否
token
身份认证方式,当前支持两种:token或aksk。
dli-jdbc-1.x
accesskey
是
-
AK/SK认证密钥,获取方式请参考认证鉴权。
dli-jdbc-1.x
dli-jdbc-2.x
secretkey
是
-
AK/SK认证密钥,获取方式请参考认证鉴权。
dli-jdbc-1.x
dli-jdbc-2.x
regionname
authenticationmode=aksk时必须配置
-
区域名称,具体区域请参考地区和终端节点。
dli-jdbc-1.x
dli-jdbc-2.x
token
authenticationmode=token时必须配置
-
Token认证,认证方式请参考认证鉴权。
dli-jdbc-1.x
charset
否
UTF-8
JDBC编码方式。
dli-jdbc-1.x
dli-jdbc-2.x
usehttpproxy
否
false
是否使用访问代理。
dli-jdbc-1.x
proxyhost
usehttpproxy=true时必须配置
-
访问代理host。
dli-jdbc-1.x
dli-jdbc-2.x
proxyport
usehttpproxy=true时必须配置
-
访问代理端口。
dli-jdbc-1.x
dli-jdbc-2.x
dli.sql.checkNoResultQuery
否
false
是否允许调用executeQuery接口执行没有返回结果的语句(如DDL)。
- “false”表示允许调用。
- “true”表示不允许调用。
dli-jdbc-1.x
dli-jdbc-2.x
jobtimeout
否
300
提交作业终止时间,单位:秒。
dli-jdbc-1.x
dli-jdbc-2.x
directfetchthreshold
否
1000
请您根据业务情况判断返回结果数是否超过设置的阈值。
默认阈值1000。
dli-jdbc-1.x
- endpoint指DLI的域名。projectId指项目ID。
- 创建Statement对象,设置相关参数并提交Spark SQL到DLI服务。
Statement statement = conn.createStatement();
statement.execute("SET dli.sql.spark.sql.forcePartitionPredicatesOnPartitionedTable.enabled=true");
statement.execute("select * from tb1");
- 获取结果。
ResultSet rs = statement.getResultSet();
- 显示结果。
while (rs.next()) { int a = rs.getInt(1); int b = rs.getInt(2); }
- 关闭连接。
conn.close();
示例
- 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
- 本示例以ak和sk保存在环境变量中为例,运行本示例前请先在本地环境中设置环境变量System.getenv("AK")和System.getenv("SK")。
import java.sql.*; import java.util.Properties; public class DLIJdbcDriverExample { public static void main(String[] args) throws ClassNotFoundException, SQLException { Connection conn = null; try { Class.forName("com.huawei.dli.jdbc.DliDriver"); String url = "jdbc:dli://<endpoint>/<projectId>?databasename=db1;queuename=testqueue"; Properties info = new Properties(); info.setProperty("authenticationmode", "aksk"); info.setProperty("regionname", "<real region name>"); info.setProperty("accesskey", "<System.getenv("AK")>"); info.setProperty("secretkey", "<System.getenv("SK")>"); conn = DriverManager.getConnection(url, info); Statement statement = conn.createStatement(); statement.execute("select * from tb1"); ResultSet rs = statement.getResultSet(); int line = 0; while (rs.next()) { line ++; int a = rs.getInt(1); int b = rs.getInt(2); System.out.println("Line:" + line + ":" + a + "," + b); } statement.execute("SET dli.sql.spark.sql.forcePartitionPredicatesOnPartitionedTable.enabled=true"); statement.execute("describe tb1"); ResultSet rs1 = statement.getResultSet(); line = 0; while (rs1.next()) { line ++; String a = rs1.getString(1); String b = rs1.getString(2); System.out.println("Line:" + line + ":" + a + "," + b); } } catch (SQLException ex) { } finally { if (conn != null) { conn.close(); } } } }