Updated on 2024-08-16 GMT+08:00

Analyzing Hive Data

Function Description

This section describes how to use a sample program to complete an analysis task. The sample program provides the following methods:

  • Submitting a data analysis task by using JDBC APIs
  • Submitting a data analysis task by using HCatalog APIs

Sample Code

  • If you submit a data analysis task using Hive JDBC APIs, refer to JDBCExample.java in the sample program.
    1. Define HiveQL. HiveQL must be a single statement and cannot contain ";".
         // Define HiveQL, which cannot contain the semicolon (;).
         String[] sqls = {"CREATE TABLE IF NOT EXISTS employees_info(id INT,name STRING)", 
                  "SELECT COUNT(*) FROM employees_info", "DROP TABLE employees_info"}; 
    2. Build JDBC URL.
      // Build JDBC URL.
      StringBuilder sBuilder = new StringBuilder(
        "jdbc:hive2://").append(clientInfo.getZkQuorum()).append("/");
      
      if (isSecurityMode) {
          // Security mode
          // ZooKeeper login authentication
          sBuilder.append(";serviceDiscoveryMode=")
                  .append(clientInfo.getServiceDiscoveryMode())
                  .append(";zooKeeperNamespace=")
                  .append(clientInfo.getZooKeeperNamespace())
                  .append(";sasl.qop=")
                  .append(clientInfo.getSaslQop())
                  .append(";auth=")
                  .append(clientInfo.getAuth())
                  .append(";principal=")
                  .append(clientInfo.getPrincipal())
                  .append(";");
      } else {
          // Normal mode
          sBuilder.append(";serviceDiscoveryMode=")
                  .append(clientInfo.getServiceDiscoveryMode())
                  .append(";zooKeeperNamespace=")
                  .append(clientInfo.getZooKeeperNamespace())
                  .append(";auth=none");
      }
      String url = sBuilder.toString();

      The preceding operations are performed to access Hive through ZooKeeper. If you want to access Hive by directly connecting to HiveServer, perform the following operations to combine the JDBC URL and change the port of zk.quorum in the hiveclient.properties file to 10000.

      // Build JDBC URL.
      StringBuilder sBuilder = new StringBuilder(
        "jdbc:hive2://").append(clientInfo.getZkQuorum()).append("/");
      
      if (isSecurityMode) {
          // Security mode
          // ZooKeeper login authentication
          sBuilder.append(";sasl.qop=")
                  .append(clientInfo.getSaslQop())
                  .append(";auth=")
                  .append(clientInfo.getAuth())
                  .append(";principal=")
                  .append(clientInfo.getPrincipal())
                  .append(";");
      } else {
          // Normal mode
          sBuilder.append(";auth=none");
      }
      String url = sBuilder.toString();

      Note: When the HiveServer is directly connected, if the connected HiveServer is faulty, Hive access fails. If the ZooKeeper is used to access Hive, any available HiveServer instance can provide services properly. Therefore, you are advised to use ZooKeeper to access Hive when using JDBC.

    3. Load the Hive JDBC driver.
         // Load the Hive JDBC driver.
         Class.forName(HIVE_DRIVER);
    4. Enter a correct username, obtain the JDBC connection, confirm the HiveQL type (DDL/DML), call APIs to run HiveQL, return the queried column name and result to the console, and close the JDBC connection.
       
         Connection connection = null; 
           try { 
             // Obtain the JDBC connection.
             // If you set the second parameter to an incorrect username, the anonymous user will be used for login.
             connection = DriverManager.getConnection(url, "userName", ""); 
                
             // Create a table.
             // To import data to a table after the table is created, you can use the LOAD statement. For example, import data from HDFS to the table. 
             //load data inpath '/tmp/employees.txt' overwrite into table employees_info; 
             execDDL(connection,sqls[0]); 
             System.out.println("Create table success!"); 
               
             // Query
             execDML(connection,sqls[1]); 
                
             // Delete the table.
             execDDL(connection,sqls[2]); 
             System.out.println("Delete table success!"); 
           } 
           finally { 
             // Close the JDBC connection.
             if (null != connection) { 
               connection.close(); 
             } 
        
       public static void execDDL(Connection connection, String sql) 
         throws SQLException { 
           PreparedStatement statement = null; 
           try { 
             statement = connection.prepareStatement(sql); 
             statement.execute(); 
           } 
           finally { 
             if (null != statement) { 
               statement.close(); 
             } 
           } 
         } 
        
        
         public static void execDML(Connection connection, String sql) throws SQLException { 
           PreparedStatement statement = null; 
           ResultSet resultSet = null; 
           ResultSetMetaData resultMetaData = null; 
            
           try { 
             // Execute HiveQL.
             statement = connection.prepareStatement(sql); 
             resultSet = statement.executeQuery(); 
              
             // Output the queried column name to the console.
             resultMetaData = resultSet.getMetaData(); 
             int columnCount = resultMetaData.getColumnCount(); 
             for (int i = 1; i <= columnCount; i++) { 
               System.out.print(resultMetaData.getColumnLabel(i) + '\t'); 
             } 
             System.out.println(); 
              
             // Output the query result to the console.
             while (resultSet.next()) { 
               for (int i = 1; i <= columnCount; i++) { 
                 System.out.print(resultSet.getString(i) + '\t'); 
               } 
               System.out.println(); 
             } 
           } 
           finally { 
             if (null != resultSet) { 
               resultSet.close(); 
             } 
              
             if (null != statement) { 
               statement.close(); 
             } 
           } 
         }     
  • If you submit a data analysis task using HCatalog APIs, refer to HCatalogExample.java in the sample program.
    1. Compile the Map class to obtain data from a Hive table.
         public static class Map extends
                  Mapper<LongWritable, HCatRecord, IntWritable, IntWritable> {
              int age;
              @Override
              protected void map(
                      LongWritable key,
                      HCatRecord value,
                      Context context)
                      throws IOException, InterruptedException {
                  age = (Integer) value.get(0);
                  context.write(new IntWritable(age), new IntWritable(1));
              }
          }     
    2. Compile the Reduce class to collect statistics on data read from the Hive table.
          public static class Reduce extends Reducer<IntWritable, IntWritable,
          IntWritable, HCatRecord> {
            @Override
            protected void reduce(
                    IntWritable key,
                    Iterable<IntWritable> values,
                    Context context)
                    throws IOException, InterruptedException {
                int sum = 0;
                Iterator<IntWritable> iter = values.iterator();
                while (iter.hasNext()) {
                    sum++;
                    iter.next();
                }
                HCatRecord record = new DefaultHCatRecord(2);
                record.set(0, key.get());
                record.set(1, sum);
      
                context.write(null, record);
              }
          } 
    3. After configuring the job in the run() method, execute the main() method to submit a task.
          public int run(String[] args) throws Exception {
            
              HiveConf.setLoadMetastoreConfig(true);
              Configuration conf = getConf();
              String[] otherArgs = args;
              
              String inputTableName = otherArgs[0];
              String outputTableName = otherArgs[1];
              String dbName = "default";
      
              @SuppressWarnings("deprecation")
              Job job = new Job(conf, "GroupByDemo");
              
              HCatInputFormat.setInput(job, dbName, inputTableName);
              job.setInputFormatClass(HCatInputFormat.class);
              job.setJarByClass(HCatalogExample.class);
              job.setMapperClass(Map.class);
              job.setReducerClass(Reduce.class);
              job.setMapOutputKeyClass(IntWritable.class);
              job.setMapOutputValueClass(IntWritable.class);
              job.setOutputKeyClass(WritableComparable.class);
              job.setOutputValueClass(DefaultHCatRecord.class);
              
              OutputJobInfo outputjobInfo = OutputJobInfo.create(dbName,outputTableName, null);
              HCatOutputFormat.setOutput(job, outputjobInfo);
              HCatSchema schema = outputjobInfo.getOutputSchema();
              HCatOutputFormat.setSchema(job, schema);
              job.setOutputFormatClass(HCatOutputFormat.class);
              
              return (job.waitForCompletion(true) ? 0 : 1);
          }
          public static void main(String[] args) throws Exception {
              int exitCode = ToolRunner.run(new HCatalogExample(), args);
              System.exit(exitCode);
          }