更新时间:2024-06-14 GMT+08:00
分享

分析Hive数据

功能介绍

本小节介绍了如何使用样例程序完成分析任务。样例程序主要有以下方式。

  • 使用JDBC接口提交数据分析任务。
  • 使用HCatalog接口提交数据分析任务。

样例代码

  • 使用Hive JDBC接口提交数据分析任务,参考样例程序中的JDBCExample.java。
    1. 定义HiveQL。HiveQL必须为单条语句,注意HiveQL不能包含“;”
         // 定义HQL,不能包含“;” 
         String[] sqls = {"CREATE TABLE IF NOT EXISTS employees_info(id INT,name STRING)", 
                  "SELECT COUNT(*) FROM employees_info", "DROP TABLE employees_info"}; 
    2. 拼接JDBC URL。
      // 拼接JDBC URL
      StringBuilder sBuilder = new StringBuilder(
        "jdbc:hive2://").append(clientInfo.getZkQuorum()).append("/");
      
      if (isSecurityMode) {
          // 安全模式
          // ZooKeeper登录认证 
          sBuilder.append(";serviceDiscoveryMode=")
                  .append(clientInfo.getServiceDiscoveryMode())
                  .append(";zooKeeperNamespace=")
                  .append(clientInfo.getZooKeeperNamespace())
                  .append(";sasl.qop=")
                  .append(clientInfo.getSaslQop())
                  .append(";auth=")
                  .append(clientInfo.getAuth())
                  .append(";principal=")
                  .append(clientInfo.getPrincipal())
                  .append(";");
      } else {
          // 普通模式
          sBuilder.append(";serviceDiscoveryMode=")
                  .append(clientInfo.getServiceDiscoveryMode())
                  .append(";zooKeeperNamespace=")
                  .append(clientInfo.getZooKeeperNamespace())
                  .append(";auth=none");
      }
      String url = sBuilder.toString();

      以上是通过ZooKeeper的方式访问Hive。若直连HiveServer的方式访问Hive,需按如下方式拼接JDBC URL,并将hiveclient.properties文件中的zk.quorum配置项的端口改为10000。

      // 拼接JDBC URL
      StringBuilder sBuilder = new StringBuilder(
        "jdbc:hive2://").append(clientInfo.getZkQuorum()).append("/");
      
      if (isSecurityMode) {
          // 安全模式
          // ZooKeeper登录认证 
          sBuilder.append(";sasl.qop=")
                  .append(clientInfo.getSaslQop())
                  .append(";auth=")
                  .append(clientInfo.getAuth())
                  .append(";principal=")
                  .append(clientInfo.getPrincipal())
                  .append(";");
      } else {
          // 普通模式
          sBuilder.append(";auth=none");
      }
      String url = sBuilder.toString();

      注:直连HiveServer时,若当前连接的HiveServer故障则会导致访问Hive失败;若使用ZooKeeper的访问Hive,只要有任一个HiveServer实例可正常提供服务即可。因此使用JDBC时建议通过ZooKeeper的方式访问Hive。

    3. 加载Hive JDBC驱动。
         // 加载Hive JDBC驱动 
         Class.forName(HIVE_DRIVER);
    4. 填写正确的用户名,获取JDBC连接,确认HQL的类型(DDL/DML),调用对应的接口执行HiveQL,输出查询的列名和结果到控制台,关闭JDBC连接。
       
         Connection connection = null; 
           try { 
             // 获取JDBC连接 
             // 第二个参数需要填写正确的用户名,否则会以匿名用户(anonymous)登录
             connection = DriverManager.getConnection(url, "userName", ""); 
                
             // 建表 
             // 表建完之后,如果要往表中导数据,可以使用LOAD语句将数据导入表中,比如从HDFS上将数据导入表: 
             //load data inpath '/tmp/employees.txt' overwrite into table employees_info; 
             execDDL(connection,sqls[0]); 
             System.out.println("Create table success!"); 
               
             // 查询 
             execDML(connection,sqls[1]); 
                
             // 删表 
             execDDL(connection,sqls[2]); 
             System.out.println("Delete table success!"); 
           } 
           finally { 
             // 关闭JDBC连接 
             if (null != connection) { 
               connection.close(); 
             } 
        
       public static void execDDL(Connection connection, String sql) 
         throws SQLException { 
           PreparedStatement statement = null; 
           try { 
             statement = connection.prepareStatement(sql); 
             statement.execute(); 
           } 
           finally { 
             if (null != statement) { 
               statement.close(); 
             } 
           } 
         } 
        
        
         public static void execDML(Connection connection, String sql) throws SQLException { 
           PreparedStatement statement = null; 
           ResultSet resultSet = null; 
           ResultSetMetaData resultMetaData = null; 
            
           try { 
             // 执行HQL 
             statement = connection.prepareStatement(sql); 
             resultSet = statement.executeQuery(); 
              
             // 输出查询的列名到控制台 
             resultMetaData = resultSet.getMetaData(); 
             int columnCount = resultMetaData.getColumnCount(); 
             for (int i = 1; i <= columnCount; i++) { 
               System.out.print(resultMetaData.getColumnLabel(i) + '\t'); 
             } 
             System.out.println(); 
              
             // 输出查询结果到控制台 
             while (resultSet.next()) { 
               for (int i = 1; i <= columnCount; i++) { 
                 System.out.print(resultSet.getString(i) + '\t'); 
               } 
               System.out.println(); 
             } 
           } 
           finally { 
             if (null != resultSet) { 
               resultSet.close(); 
             } 
              
             if (null != statement) { 
               statement.close(); 
             } 
           } 
         }     
  • 使用HCatalog接口提交数据分析任务,参考样例程序中的HCatalogExample.java。
    1. 编写Map类,从Hive的表中获取数据。
         public static class Map extends
                  Mapper<LongWritable, HCatRecord, IntWritable, IntWritable> {
              int age;
              @Override
              protected void map(
                      LongWritable key,
                      HCatRecord value,
                      Context context)
                      throws IOException, InterruptedException {
                  age = (Integer) value.get(0);
                  context.write(new IntWritable(age), new IntWritable(1));
              }
          }     
    2. 编写Reduce类,对从Hive表中读取到的数据进行统计。
          public static class Reduce extends Reducer<IntWritable, IntWritable,
          IntWritable, HCatRecord> {
            @Override
            protected void reduce(
                    IntWritable key,
                    Iterable<IntWritable> values,
                    Context context)
                    throws IOException, InterruptedException {
                int sum = 0;
                Iterator<IntWritable> iter = values.iterator();
                while (iter.hasNext()) {
                    sum++;
                    iter.next();
                }
                HCatRecord record = new DefaultHCatRecord(2);
                record.set(0, key.get());
                record.set(1, sum);
      
                context.write(null, record);
              }
          } 
    3. 在run()方法中配置job后,执行main()方法,提交任务。
          public int run(String[] args) throws Exception {
            
              HiveConf.setLoadMetastoreConfig(true);
              Configuration conf = getConf();
              String[] otherArgs = args;
              
              String inputTableName = otherArgs[0];
              String outputTableName = otherArgs[1];
              String dbName = "default";
      
              @SuppressWarnings("deprecation")
              Job job = new Job(conf, "GroupByDemo");
              
              HCatInputFormat.setInput(job, dbName, inputTableName);
              job.setInputFormatClass(HCatInputFormat.class);
              job.setJarByClass(HCatalogExample.class);
              job.setMapperClass(Map.class);
              job.setReducerClass(Reduce.class);
              job.setMapOutputKeyClass(IntWritable.class);
              job.setMapOutputValueClass(IntWritable.class);
              job.setOutputKeyClass(WritableComparable.class);
              job.setOutputValueClass(DefaultHCatRecord.class);
              
              OutputJobInfo outputjobInfo = OutputJobInfo.create(dbName,outputTableName, null);
              HCatOutputFormat.setOutput(job, outputjobInfo);
              HCatSchema schema = outputjobInfo.getOutputSchema();
              HCatOutputFormat.setSchema(job, schema);
              job.setOutputFormatClass(HCatOutputFormat.class);
              
              return (job.waitForCompletion(true) ? 0 : 1);
          }
          public static void main(String[] args) throws Exception {
              int exitCode = ToolRunner.run(new HCatalogExample(), args);
              System.exit(exitCode);
          }

相关文档