更新时间:2024-06-05 GMT+08:00
分享

使用JDBC提交数据分析任务

功能简介

本章节介绍如何使用JDBC样例程序完成数据分析任务。

样例代码

使用Hive JDBC接口提交数据分析任务,该样例程序在“hive-examples/hive-jdbc-example”的“JDBCExample.java”中,实现该功能的模块如下:
  1. 读取HiveServer客户端property文件,其中“hiveclient.properties”文件在“hive-jdbc-example\src\main\resources”目录下。
    Properties clientInfo = null;
    String userdir = System.getProperty("user.dir") + File.separator
    + "conf" + File.separator;
    InputStream fileInputStream = null;
    try{
    clientInfo = new Properties();
    //"hiveclient.properties"为客户端配置文件
    //"hiveclient.properties"文件可从对应实例客户端安裝包解压目录下的config目录下获取,并上传到JDBC样例工程的“hive-jdbc-example\src\main\resources”目录下
    String hiveclientProp = userdir + "hiveclient.properties" ;
    File propertiesFile = new File(hiveclientProp);
    fileInputStream = new FileInputStream(propertiesFile);
    clientInfo.load(fileInputStream);
    }catch (Exception e) {
    throw new IOException(e);
    }finally{
    if(fileInputStream != null){
    fileInputStream.close();
    fileInputStream = null;
    }
    }
  2. 获取ZooKeeper的IP列表和端口、集群的认证模式、HiveServer的SASL配置、HiveServer在ZooKeeper中节点名称、客户端对服务端的发现模式、以及服务端进程认证的principal。这些配置样例代码会自动从“hiveclient.properties中”读取。
    //zkQuorum获取后的格式为"xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181";
    //"xxx.xxx.xxx.xxx"为集群中ZooKeeper所在节点的业务IP,端口默认是2181
        zkQuorum = clientInfo.getProperty("zk.quorum");
        auth = clientInfo.getProperty("auth");
        sasl_qop = clientInfo.getProperty("sasl.qop");
        zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
        serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
        principal = clientInfo.getProperty("principal"); 
         
  3. 定义HQL。HQL必须为单条语句,注意HQL不能包含“;”
       // 定义HQL,不能包含“;” 
       String[] sqls = {"CREATE TABLE IF NOT EXISTS employees_info(id INT,name STRING)", 
                "SELECT COUNT(*) FROM employees_info", "DROP TABLE employees_info"}; 
  4. 拼接JDBC URL。

    拼接JDBC URL也可以不提供账户和keytab路径,采用提前认证的方式。如果使用IBM JDK运行Hive应用程序,则必须使用“JDBC代码样例二”提供的预认证方式才能访问。

    以下代码片段,拼接完成后的JDBC URL示例为:

    jdbc:hive2://xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;sasl.qop=auth-conf;auth=KERBEROS;principal=hive/hadoop.<系统域名>@<系统域名>;

    系统域名可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数获取。

    “hive/hadoop.<系统域名>”为用户名,用户的用户名所包含的系统域名所有字母为小写。例如“本端域”参数为“9427068F-6EFA-4833-B43E-60CB641E5B6C.COM”,则用户名为“hive/hadoop.9427068f-6efa-4833-b43e-60cb641e5b6c.com”。

         // 拼接JDBC URL 
           // 普通模式 
           sBuilder.append(";serviceDiscoveryMode=")  
                   .append(serviceDiscoveryMode) 
                   .append(";zooKeeperNamespace=") 
                   .append(zooKeeperNamespace)
                   .append(";auth=none;"); 
  5. 加载Hive JDBC驱动。
       // 加载Hive JDBC驱动 
       Class.forName(HIVE_DRIVER);
  6. 获取JDBC连接,确认HQL的类型(DDL/DML),调用对应的接口执行HQL,输出查询的列名和结果到控制台,关闭JDBC连接。
       Connection connection = null; 
         try { 
           // 获取JDBC连接
    
           connection = DriverManager.getConnection(url, "", ""); 
              
           // 建表 
           // 表建完之后,如果要往表中导数据,可以使用LOAD语句将数据导入表中,比如从HDFS上将数据导入表: 
           //load data inpath '/tmp/employees.txt' overwrite into table employees_info; 
           execDDL(connection,sqls[0]); 
           System.out.println("Create table success!"); 
             
           // 查询 
           execDML(connection,sqls[1]); 
              
           // 删表 
           execDDL(connection,sqls[2]); 
           System.out.println("Delete table success!"); 
         } 
         finally { 
           // 关闭JDBC连接 
           if (null != connection) { 
             connection.close(); 
           } 
      
     public static void execDDL(Connection connection, String sql) 
       throws SQLException { 
         PreparedStatement statement = null; 
         try { 
           statement = connection.prepareStatement(sql); 
           statement.execute(); 
         } 
         finally { 
           if (null != statement) { 
             statement.close(); 
           } 
         } 
       } 
      
       public static void execDML(Connection connection, String sql) throws SQLException { 
         PreparedStatement statement = null; 
         ResultSet resultSet = null; 
         ResultSetMetaData resultMetaData = null; 
          
         try { 
           // 执行HQL 
           statement = connection.prepareStatement(sql); 
           resultSet = statement.executeQuery(); 
            
           // 输出查询的列名到控制台 
           resultMetaData = resultSet.getMetaData(); 
           int columnCount = resultMetaData.getColumnCount(); 
           for (int i = 1; i <= columnCount; i++) { 
             System.out.print(resultMetaData.getColumnLabel(i) + '\t'); 
           } 
           System.out.println(); 
            
           // 输出查询结果到控制台 
           while (resultSet.next()) { 
             for (int i = 1; i <= columnCount; i++) { 
               System.out.print(resultSet.getString(i) + '\t'); 
             } 
             System.out.println(); 
           } 
         } 
         finally { 
           if (null != resultSet) { 
             resultSet.close(); 
           } 
            
           if (null != statement) { 
             statement.close(); 
           } 
         } 
       }     

相关文档