更新时间:2023-12-20 GMT+08:00
分享

样例程序指导

功能介绍

本小节介绍了如何使用样例程序完成分析任务。样例程序主要有以下方式:

  • 使用JDBC接口提交数据分析任务。
  • 使用Python方式提交数据分析任务。

样例代码

  • 使用Hive JDBC接口提交数据分析任务,参考样例程序中的JDBCExample.java。
    1. 读取HiveServer客户端property文件,其中hiveclient.properties文件在hive提供的jdbc样例程序的resources目录下。
      Properties clientInfo = null;
      String userdir = System.getProperty("user.dir") + File.separator
      + "conf" + File.separator;
      InputStream fileInputStream = null;
      try{
      clientInfo = new Properties();
      //"hiveclient.properties"为客户端配置文件,如果使用多实例特性,需要把该文件换成对应实例客户端下的"hiveclient.properties"
      //"hiveclient.properties"文件位置在对应实例客户端安裝包解压目录下的config目录下
      String hiveclientProp = userdir + "hiveclient.properties" ;
      File propertiesFile = new File(hiveclientProp);
      fileInputStream = new FileInputStream(propertiesFile);
      clientInfo.load(fileInputStream);
      }catch (Exception e) {
      throw new IOException(e);
      }finally{
      if(fileInputStream != null){
      fileInputStream.close();
      fileInputStream = null;
      }
      }
    2. 获取ZooKeeper的IP列表和端口,集群的认证安全模式,HiveServer的sasl配置,HiveServer在ZooKeeper中节点名称,客户端对服务端的发现模式,以及服务端进程认证的principal。这些配置都可以从hiveclient.properties中读取。
         //zkQuorum获取后的格式为"xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181";
      //"xxx.xxx.xxx.xxx"为集群中ZooKeeper所在节点的业务IP,端口默认是2181
          zkQuorum = clientInfo.getProperty("zk.quorum");
          auth = clientInfo.getProperty("auth");
          sasl_qop = clientInfo.getProperty("sasl.qop");
          zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
          serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
          principal = clientInfo.getProperty("principal"); 
    3. 在安全模式下,需要kerberos用户以及keytab文件路径等信息进行登录认证。USER_NAME、USER_KEYTAB_FILE、KRB5_FILE的获取,请参见JDBC样例工程运行及结果查看
          // 设置新建用户的userName,其中"xxx"指代之前创建的用户名,例如创建的用户为user,则USER_NAME为user
          USER_NAME = "xxx";
          // 设置客户端的keytab和krb5文件路径
          String userdir = System.getProperty("user.dir") + File.separator 
                  + "conf" + File.separator;
          USER_KEYTAB_FILE = userdir + "user.keytab";
          KRB5_FILE = userdir + "krb5.conf"; 
    4. 定义HQL。HQL必须为单条语句,注意HQL不能包含“;”
         // 定义HQL,不能包含“;” 
         String[] sqls = {"CREATE TABLE IF NOT EXISTS employees_info(id INT,name STRING)", 
                  "SELECT COUNT(*) FROM employees_info", "DROP TABLE employees_info"}; 
    5. 拼接JDBC URL。

      拼接JDBC URL也可以不提供账户和keytab路径,采用提前认证的方式,具体请参见“开发规范”中Hive示例中的JDBC代码样例二。如果使用IBM jdk运行Hive应用程序,则必须使用“JDBC代码样例二”提供的预认证方式才能访问。

      以下代码片段,拼接完成后的JDBC URL示例为:

      jdbc:hive2://xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;sasl.qop=auth-conf;auth=KERBEROS;principal=hive/hadoop.<系统域名>@<系统域名>;

      用户可登录FusionInsight Manager,单击“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。

      “hive/hadoop.<系统域名>为用户名,用户的用户名所包含的系统域名所有字母为小写。例如“本端域”参数为“9427068F-6EFA-4833-B43E-60CB641E5B6C.COM”,用户名为“hive/hadoop.9427068f-6efa-4833-b43e-60cb641e5b6c.com”。

           // 拼接JDBC URL 
           StringBuilder sBuilder = new StringBuilder( 
               "jdbc:hive2://").append(zkQuorum).append("/"); 
            
           if ("KERBEROS".equalsIgnoreCase(auth)) { 
            sBuilder.append(";serviceDiscoveryMode=") 
                    .append(serviceDiscoveryMode)
                    .append(";zooKeeperNamespace=")
                    .append(zooKeeperNamespace)
                    .append(";sasl.qop=")
                    .append(sasl_qop)
                    .append(";auth=")
                    .append(auth)
                    .append(";principal=")
                    .append(principal)
                                .append(";user.principal=")
                    .append(USER_NAME)
                    .append(";user.keytab=")
                    .append(USER_KEYTAB_FILE)
                    .append(";");
                     
           } else {
             // 普通模式 
             sBuilder.append(";serviceDiscoveryMode=")  
                     .append(serviceDiscoveryMode) 
                     .append(";zooKeeperNamespace=") 
                     .append(zooKeeperNamespace)
                     .append(";auth=none;"); 
           } 
           String url = sBuilder.toString();   
    6. 加载Hive JDBC驱动。
         // 加载Hive JDBC驱动 
         Class.forName(HIVE_DRIVER);
    7. 获取JDBC连接,确认HQL的类型(DDL/DML),调用对应的接口执行HQL,输出查询的列名和结果到控制台,关闭JDBC连接。
          Connection connection = null; 
           try { 
             // 获取JDBC连接
             // 如果使用的是普通模式,那么第二个参数需要填写正确的用户名,否则会以匿名用户(anonymous)登录
             connection = DriverManager.getConnection(url, "", ""); 
                
             // 建表 
             // 表建完之后,如果要往表中导数据,可以使用LOAD语句将数据导入表中,比如从HDFS上将数据导入表: 
             //load data inpath '/tmp/employees.txt' overwrite into table employees_info; 
             execDDL(connection,sqls[0]); 
             System.out.println("Create table success!"); 
               
             // 查询 
             execDML(connection,sqls[1]); 
                
             // 删表 
             execDDL(connection,sqls[2]); 
             System.out.println("Delete table success!"); 
           } 
           finally { 
             // 关闭JDBC连接 
             if (null != connection) { 
               connection.close(); 
             } 
        
       public static void execDDL(Connection connection, String sql) 
         throws SQLException { 
           PreparedStatement statement = null; 
           try { 
             statement = connection.prepareStatement(sql); 
             statement.execute(); 
           } 
           finally { 
             if (null != statement) { 
               statement.close(); 
             } 
           } 
         } 
        
           public static void execDML(Connection connection, String sql) throws SQLException { 
           PreparedStatement statement = null; 
           ResultSet resultSet = null; 
           ResultSetMetaData resultMetaData = null; 
            
           try { 
             // 执行HQL 
             statement = connection.prepareStatement(sql); 
             resultSet = statement.executeQuery(); 
              
             // 输出查询的列名到控制台 
             resultMetaData = resultSet.getMetaData(); 
             int columnCount = resultMetaData.getColumnCount(); 
             for (int i = 1; i <= columnCount; i++) { 
               System.out.print(resultMetaData.getColumnLabel(i) + '\t'); 
             } 
             System.out.println(); 
              
             // 输出查询结果到控制台 
             while (resultSet.next()) { 
               for (int i = 1; i <= columnCount; i++) { 
                 System.out.print(resultSet.getString(i) + '\t'); 
               } 
               System.out.println(); 
             } 
           } 
           finally { 
             if (null != resultSet) { 
               resultSet.close(); 
             } 
              
             if (null != statement) { 
               statement.close(); 
             } 
           } 
         }  
  • 使用Python方式提交数据分析任务,参考样例程序中的“python-examples/pyCLI_nosec.py”。
    1. 导入HAConnection类。
      from pyhs2.haconnection import HAConnection     
    2. 声明HiveServer的IP地址列表。本例中hosts代表HiveServer的节点,xxx.xxx.xxx.xxx代表业务IP地址。
      hosts = ["xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"] 
      1. 如果HiveServer实例被迁移,原始的示例程序会失效。在HiveServer实例迁移之后,用户需要更新示例程序中使用的HiveServer的IP地址。
      2. 若使用Hive多实例,则IP地址根据实际连接的实例地址进行更改。
    3. 在HAConnection的第三个参数填写正确的用户名,密码可以不填写。创建连接,执行HQL,输出查询的列名和结果到控制台。
       try:
          with HAConnection(hosts = hosts,
                             port = 21066,
                             authMechanism = "PLAIN",
                             user='root',
                             password='******') as haConn:
              with haConn.getConnection() as conn:
                  with conn.cursor() as cur:
                      # Show databases
                      print cur.getDatabases()
                      # Execute query
                      cur.execute("show tables")
                      # Return column info from query
                      print cur.getSchema()
                      # Fetch table results
                      for i in cur.fetch():
                          print i
      except Exception, e:
          print e

      若使用Hive多实例,不仅要根据上述2的说明更改hosts,并且要根据实际安装的端口号更改port。默认端口从Hive到Hive4为21066到21070。

分享:

    相关文档

    相关产品