更新时间:2023-12-20 GMT+08:00
分享

样例程序指导

功能介绍

本小节介绍了如何使用样例程序完成分析任务。样例程序主要有以下方式:

  • 使用JDBC接口提交数据分析任务。
  • 使用Python方式提交数据分析任务。

样例代码

  • 使用Hive JDBC接口提交数据分析任务,参考样例程序中的JDBCExample.java。
    1. 读取HiveServer客户端property文件,其中“hiveclient.properties”文件在Hive提供的jdbc样例程序的resources目录下。
      Properties clientInfo = null;
      String userdir = System.getProperty("user.dir") + File.separator
      + "conf" + File.separator;
      InputStream fileInputStream = null;
      try{
      clientInfo = new Properties();
      //"hiveclient.properties"为客户端配置文件,如果使用多服务特性,需要把该文件换成对应实例客户端下的"hiveclient.properties"
      //"hiveclient.properties"文件位置在对应实例客户端安裝包解压目录下的config目录下
      String hiveclientProp = userdir + "hiveclient.properties" ;
      File propertiesFile = new File(hiveclientProp);
      fileInputStream = new FileInputStream(propertiesFile);
      clientInfo.load(fileInputStream);
      }catch (Exception e) {
      throw new IOException(e);
      }finally{
      if(fileInputStream != null){
      fileInputStream.close();
      fileInputStream = null;
      }
      }
    2. 获取ZooKeeper的IP列表和端口,集群的认证模式,HiveServer的sasl配置,HiveServer在ZooKeeper中节点名称,客户端对服务端的发现模式,以及服务端进程认证的principal。这些配置都可以从hiveclient.properties中读取。
      //zkQuorum获取后的格式为"xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181";
      //"xxx.xxx.xxx.xxx"为集群中ZooKeeper所在节点的业务IP,端口默认是2181
          zkQuorum = clientInfo.getProperty("zk.quorum");
          auth = clientInfo.getProperty("auth");
          sasl_qop = clientInfo.getProperty("sasl.qop");
          zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
          serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
          principal = clientInfo.getProperty("principal"); 
           
    3. 在安全模式下,需要kerberos用户以及keytab文件路径等信息进行登录认证。USER_NAME、USER_KEYTAB_FILE、KRB5_FILE的获取,请参见JDBC样例工程运行及结果查看
          // 设置新建用户的userName,其中"xxx"指代之前创建的用户名,例如创建的用户为user,则USER_NAME为user
          USER_NAME = "xxx";
          // 设置客户端的keytab和krb5文件路径
          String userdir = System.getProperty("user.dir") + File.separator 
                  + "conf" + File.separator;
          USER_KEYTAB_FILE = userdir + "user.keytab";
          KRB5_FILE = userdir + "krb5.conf"; 
    4. 定义HQL。HQL必须为单条语句,注意HQL不能包含“;”
         // 定义HQL,不能包含“;” 
         String[] sqls = {"CREATE TABLE IF NOT EXISTS employees_info(id INT,name STRING)", 
                  "SELECT COUNT(*) FROM employees_info", "DROP TABLE employees_info"}; 
    5. 拼接JDBC URL。

      拼接JDBC URL也可以不提供账户和keytab路径,采用提前认证的方式,具体请参见“开发规范”中Hive示例中的JDBC代码样例二。如果使用IBM jdk运行Hive应用程序,则必须使用“JDBC代码样例二”提供的预认证方式才能访问。

      以下代码片段,拼接完成后的JDBC URL示例为:

      jdbc:hive2://xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;sasl.qop=auth-conf;auth=KERBEROS;principal=hive/hadoop.<系统域名>@<系统域名>;
           // 拼接JDBC URL 
           StringBuilder sBuilder = new StringBuilder( 
               "jdbc:hive2://").append(zkQuorum).append("/"); 
            
           if ("KERBEROS".equalsIgnoreCase(auth)) { 
            sBuilder.append(";serviceDiscoveryMode=") 
                    .append(serviceDiscoveryMode)
                    .append(";zooKeeperNamespace=")
                    .append(zooKeeperNamespace)
                    .append(";sasl.qop=")
                    .append(sasl_qop)
                    .append(";auth=")
                    .append(auth)
                    .append(";principal=")
                    .append(principal)
                                .append(";user.principal=")
                    .append(USER_NAME)
                    .append(";user.keytab=")
                    .append(USER_KEYTAB_FILE)
                    .append(";");
                     
           } else {
             // 普通模式 
             sBuilder.append(";serviceDiscoveryMode=")  
                     .append(serviceDiscoveryMode) 
                     .append(";zooKeeperNamespace=") 
                     .append(zooKeeperNamespace)
                     .append(";auth=none;"); 
           } 
           String url = sBuilder.toString(); 
    6. 加载Hive JDBC驱动。
         // 加载Hive JDBC驱动 
         Class.forName(HIVE_DRIVER);
    7. 获取JDBC连接,确认HQL的类型(DDL/DML),调用对应的接口执行HQL,输出查询的列名和结果到控制台,关闭JDBC连接。
         Connection connection = null; 
           try { 
             // 获取JDBC连接
             // 如果使用的是普通模式,那么第二个参数需要填写正确的用户名,否则会以匿名用户(anonymous)登录
             connection = DriverManager.getConnection(url, "", ""); 
                
             // 建表 
             // 表建完之后,如果要往表中导数据,可以使用LOAD语句将数据导入表中,比如从HDFS上将数据导入表: 
             //load data inpath '/tmp/employees.txt' overwrite into table employees_info; 
             execDDL(connection,sqls[0]); 
             System.out.println("Create table success!"); 
               
             // 查询 
             execDML(connection,sqls[1]); 
                
             // 删表 
             execDDL(connection,sqls[2]); 
             System.out.println("Delete table success!"); 
           } 
           finally { 
             // 关闭JDBC连接 
             if (null != connection) { 
               connection.close(); 
             } 
        
       public static void execDDL(Connection connection, String sql) 
         throws SQLException { 
           PreparedStatement statement = null; 
           try { 
             statement = connection.prepareStatement(sql); 
             statement.execute(); 
           } 
           finally { 
             if (null != statement) { 
               statement.close(); 
             } 
           } 
         } 
        
         public static void execDML(Connection connection, String sql) throws SQLException { 
           PreparedStatement statement = null; 
           ResultSet resultSet = null; 
           ResultSetMetaData resultMetaData = null; 
            
           try { 
             // 执行HQL 
             statement = connection.prepareStatement(sql); 
             resultSet = statement.executeQuery(); 
              
             // 输出查询的列名到控制台 
             resultMetaData = resultSet.getMetaData(); 
             int columnCount = resultMetaData.getColumnCount(); 
             for (int i = 1; i <= columnCount; i++) { 
               System.out.print(resultMetaData.getColumnLabel(i) + '\t'); 
             } 
             System.out.println(); 
              
             // 输出查询结果到控制台 
             while (resultSet.next()) { 
               for (int i = 1; i <= columnCount; i++) { 
                 System.out.print(resultSet.getString(i) + '\t'); 
               } 
               System.out.println(); 
             } 
           } 
           finally { 
             if (null != resultSet) { 
               resultSet.close(); 
             } 
              
             if (null != statement) { 
               statement.close(); 
             } 
           } 
         }     
  • 使用Python方式提交数据分析任务,参考样例程序中的“python-examples/pyCLI_sec.py”。该样例程序连接的集群的认证模式是安全模式,运行样例程序之前需要使用kinit命令认证相应权限的kerberos用户。
    1. 导入HAConnection类。
       from pyhs2.haconnection import HAConnection     
    2. 声明HiveServer的IP地址列表。本例中hosts代表HiveServer的节点,xxx.xxx.xxx.xxx代表业务IP地址。
      hosts = ["xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"] 
      1. 如果HiveServer实例被迁移,原始的示例程序会失效。在HiveServer实例迁移之后,用户需要更新示例程序中使用的HiveServer的IP地址。
      2. 若使用Hive多实例,则IP地址根据实际连接的实例地址进行更改。
    3. 配置kerberos主机名和服务名。本例中kerberos主机名为hadoop,服务名为hive。
      conf = {"krb_host":"hadoop.<系统域名>", "krb_service":"hive"}     

      若使用Hive多实例,服务名则需根据实际连接的集群进行更改。例如连接Hive1实例,则将服务名改成hive1。

    4. 创建连接,执行HQL,输出查询的列名和结果到控制台。
         try: 
             with HAConnection(hosts = hosts, 
                                port = 21066, 
                                authMechanism = "KERBEROS", 
                                configuration = conf) as haConn: 
                 with haConn.getConnection() as conn: 
                     with conn.cursor() as cur: 
                         # show databases 
                         print cur.getdatabases() 
                          
                         # execute query 
                         cur.execute("show tables") 
                          
                         # return column info from query 
                         print cur.getschema() 
                          
                         # fetch table results 
                         for i in cur.fetch(): 
                             print i 
                              
         except exception, e: 
             print e

      若使用Hive多实例,不仅要根据上述2的说明更改hosts,并且要根据实际安装的端口号更改port。默认端口从Hive到Hive4为21066到21070。

分享:

    相关文档

    相关产品