使用JDBC接口提交数据分析任务
功能介绍
本章节介绍如何使用JDBC样例程序完成数据分析任务。
样例代码
- 读取HiveServer客户端property文件,其中“hiveclient.properties”文件在“hive-jdbc-example/src/main/resources”目录下。
Properties clientInfo = null; String userdir = System.getProperty("user.dir") + File.separator + "conf" + File.separator; InputStream fileInputStream = null; try{ clientInfo = new Properties(); //"hiveclient.properties"为客户端配置文件 //"hiveclient.properties"文件可从对应实例客户端安装包解压目录下的config目录下获取,并上传到JDBC样例工程的“hive-jdbc-example/src/main/resources”目录下 String hiveclientProp = userdir + "hiveclient.properties" ; File propertiesFile = new File(hiveclientProp); fileInputStream = new FileInputStream(propertiesFile); clientInfo.load(fileInputStream); }catch (Exception e) { throw new IOException(e); }finally{ if(fileInputStream != null){ fileInputStream.close(); fileInputStream = null; } }
- 获取ZooKeeper的IP列表和端口、集群的认证模式、HiveServer的SASL配置、HiveServer在ZooKeeper中节点名称、客户端对服务端的发现模式、以及服务端进程认证的principal。这些配置样例代码会自动从“hiveclient.properties中”读取。
//zkQuorum获取后的格式为"xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181"; //"xxx.xxx.xxx.xxx"为集群中ZooKeeper所在节点的业务IP,端口默认是2181 zkQuorum = clientInfo.getProperty("zk.quorum"); auth = clientInfo.getProperty("auth"); sasl_qop = clientInfo.getProperty("sasl.qop"); zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace"); serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode"); principal = clientInfo.getProperty("principal");
- 在安全模式下,需要Kerberos用户以及keytab文件路径等信息进行登录认证。
// 设置新建用户的userName,其中"xxx"指代之前创建的用户名,例如创建的用户为developuser,则USER_NAME为developuser USER_NAME = "xxx"; // 设置客户端的keytab和krb5文件路径,即“hive-jdbc-example\src\main\resources” String userdir = System.getProperty("user.dir") + File.separator + "conf" + File.separator; USER_KEYTAB_FILE = userdir + "user.keytab"; KRB5_FILE = userdir + "krb5.conf";
- 定义HQL。HQL必须为单条语句,注意HQL不能包含“;”。
// 定义HQL,不能包含“;” String[] sqls = {"CREATE TABLE IF NOT EXISTS employees_info(id INT,name STRING)", "SELECT COUNT(*) FROM employees_info", "DROP TABLE employees_info"};
- 拼接JDBC URL。
拼接JDBC URL也可以不提供账户和keytab路径,采用提前认证的方式。如果使用IBM JDK运行Hive应用程序,则必须使用“JDBC代码样例二”提供的预认证方式才能访问。
以下代码片段,拼接完成后的JDBC URL示例为:
jdbc:hive2://xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;sasl.qop=auth-conf;auth=KERBEROS;principal=hive/hadoop.<系统域名>@<系统域名>;
系统域名可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数获取。
// 拼接JDBC URL StringBuilder sBuilder = new StringBuilder( "jdbc:hive2://").append(zkQuorum).append("/"); if ("KERBEROS".equalsIgnoreCase(auth)) { sBuilder.append(";serviceDiscoveryMode=") .append(serviceDiscoveryMode) .append(";zooKeeperNamespace=") .append(zooKeeperNamespace) .append(";sasl.qop=") .append(sasl_qop) .append(";auth=") .append(auth) .append(";principal=") .append(principal) .append(";user.principal=") .append(USER_NAME) .append(";user.keytab=") .append(USER_KEYTAB_FILE) .append(";"); } String url = sBuilder.toString();
- 加载Hive JDBC驱动。
// 加载Hive JDBC驱动 Class.forName(HIVE_DRIVER);
- 获取JDBC连接,确认HQL的类型(DDL/DML),调用对应的接口执行HQL,输出查询的列名和结果到控制台,关闭JDBC连接。
Connection connection = null; try { // 获取JDBC连接 connection = DriverManager.getConnection(url, "", ""); // 建表 // 表建完之后,如果要往表中导数据,可以使用LOAD语句将数据导入表中,比如从HDFS上将数据导入表: //load data inpath '/tmp/employees.txt' overwrite into table employees_info; execDDL(connection,sqls[0]); System.out.println("Create table success!"); // 查询 execDML(connection,sqls[1]); // 删表 execDDL(connection,sqls[2]); System.out.println("Delete table success!"); } finally { // 关闭JDBC连接 if (null != connection) { connection.close(); } public static void execDDL(Connection connection, String sql) throws SQLException { PreparedStatement statement = null; try { statement = connection.prepareStatement(sql); statement.execute(); } finally { if (null != statement) { statement.close(); } } } public static void execDML(Connection connection, String sql) throws SQLException { PreparedStatement statement = null; ResultSet resultSet = null; ResultSetMetaData resultMetaData = null; try { // 执行HQL statement = connection.prepareStatement(sql); resultSet = statement.executeQuery(); // 输出查询的列名到控制台 resultMetaData = resultSet.getMetaData(); int columnCount = resultMetaData.getColumnCount(); for (int i = 1; i <= columnCount; i++) { System.out.print(resultMetaData.getColumnLabel(i) + '\t'); } System.out.println(); // 输出查询结果到控制台 while (resultSet.next()) { for (int i = 1; i <= columnCount; i++) { System.out.print(resultSet.getString(i) + '\t'); } System.out.println(); } } finally { if (null != resultSet) { resultSet.close(); } if (null != statement) { statement.close(); } } }