Updated on 2024-04-02 GMT+08:00

Example Program Guide

Function

This section describes how to use an example program to complete an analysis task. An example program can submit a task by using the following methods:

  • Submitting a data analysis task by using JDBC interfaces
  • Submitting a data analysis task by using Python

Example Codes

  • Submit a data analysis task using the Hive Java database connectivity (JDBC) interface, that is, JDBCExample.java.
    1. Read the property file of the HiveServer client. The hiveclient.properties file is saved in the resources directory of the JDBC example program provided by Hive.
      Properties clientInfo = null;
      String userdir = System.getProperty("user.dir") + File.separator
      + "conf" + File.separator;
      InputStream fileInputStream = null;
      try{
      clientInfo = new Properties();
      //hiveclient.properties indicates the client configuration file. If the multiple-service feature is used, the file must be replaced with the hiveclient.properties file on the instance client.
      //hiveclient.properties is located under the config directory of the directory where the instance client installation package is decompressed.
      String hiveclientProp = userdir + "hiveclient.properties" ;
      File propertiesFile = new File(hiveclientProp);
      fileInputStream = new FileInputStream(propertiesFile);
      clientInfo.load(fileInputStream);
      }catch (Exception e) {
      throw new IOException(e);
      }finally{
      if(fileInputStream != null){
      fileInputStream.close();
      fileInputStream = null;
      }
      }
    1. Obtain the IP address and port list of ZooKeeper, the cluster authentication mode, the SASL configuration of HiveServers, node names of HiveServers in ZooKeeper, the discovery mode from the client to the server, and the principal server process for user authentication. You can read all these configurations from the hiveclient.properties file.
          //The format of zkQuorum is xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181"; 
        //xxx.xxx.xxx.xxx is the IP address of the node where ZooKeeper resides. The default port is 24002. 
      
          zkQuorum = clientInfo.getProperty("zk.quorum");
          auth = clientInfo.getProperty("auth");
          sasl_qop = clientInfo.getProperty("sasl.qop");
          zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
          serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
          principal = clientInfo.getProperty("principal"); 
           
    2. In security mode, the kerberos user and keytab file path are required for login authentication. For details about how to obtain USER_NAME, USER_KEYTAB_FILE, and KRB5_FILE, see Running JDBC and Viewing Results.
          // Set the userName of new user.
          USER_NAME = "xxx";
          // Set the keytab and krb5 files location of client.
          String userdir = System.getProperty("user.dir") + File.separator 
                  + "conf" + File.separator;
          USER_KEYTAB_FILE = userdir + "user.keytab";
          KRB5_FILE = userdir + "krb5.conf"; 
    3. Define HQL. HQL must be a single statement and cannot contain ";".
           // Define HQL. HQL cannot contain ";" 
           String[] sqls = {"CREATE TABLE IF NOT EXISTS employees_info(id INT,name STRING)", 
                  "SELECT COUNT(*) FROM employees_info", "DROP TABLE employees_info"};     
    4. Build JDBC URL.

      You can also implement pre-authentication without the need of providing the account and keytab file path. For details, see JDBC code example 2 in the Hive example in the Development Specifications. If IBM JDK is used to run Hive applications, pre-authentication in JDBC sample code 2 must be implemented.

      The following is an example of the JDBC URL composed of code snippets:

      jdbc:hive2://xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;sasl.qop=auth-conf;auth=KERBEROS;principal=hive/hadoop.<system domain name>@<system domain name>;

           // Concat JDBC URL 
           StringBuilder sBuilder = new StringBuilder( 
               "jdbc:hive2://").append(zkQuorum).append("/"); 
            
           if ("KERBEROS".equalsIgnoreCase(auth)) { 
            sBuilder.append(";serviceDiscoveryMode=") 
                    .append(serviceDiscoveryMode)
                    .append(";zooKeeperNamespace=")
                    .append(zooKeeperNamespace)
                    .append(";sasl.qop=")
                    .append(sasl_qop)
                    .append(";auth=")
                    .append(auth)
                    .append(";principal=")
                    .append(principal)
                                .append(";user.principal=")
                    .append(USER_NAME)
                    .append(";user.keytab=")
                    .append(USER_KEYTAB_FILE)
                    .append(";");
                     
           } else {
             // Normal mode 
             sBuilder.append(";serviceDiscoveryMode=")  
                     .append(serviceDiscoveryMode) 
                     .append(";zooKeeperNamespace=") 
                     .append(zooKeeperNamespace)
                     .append(";auth=none;"); 
           } 
           String url = sBuilder.toString();  
    5. Load the Hive JDBC driver.
       
         // Load the Hive JDBC driver. 
         Class.forName(HIVE_DRIVER);
    6. Obtain the JDBC connection, confirm the HQL type (DDL/DML), call ports to run the HQL statement, return the queried column name and results to the console, and close the JDBC connection.
       
         Connection connection = null; 
           try { 
             // Obtain the JDBC connection. 
             // If the normal mode is used, the second parameter needs to be set to a correct username. Otherwise, the anonymous user will be used for login.
             connection = DriverManager.getConnection(url, "", ""); 
                
           // Create a table 
           // To import data to a table after the table is created, you can use the LOAD statement. For example, import data from the HDFS to the table. 
             //load data inpath '/tmp/employees.txt' overwrite into table employees_info; 
             execDDL(connection,sqls[0]); 
             System.out.println("Create table success!"); 
               
           // Query 
           execDML(connection,sqls[1]); 
        
           // Delete the table
           execDDL(connection,sqls[2]); 
           System.out.println("Delete table success!"); 
         } 
         finally {
           // Close the JDBC connection. 
             if (null != connection) { 
               connection.close(); 
             }
        
       public static void execDDL(Connection connection, String sql) 
         throws SQLException { 
           PreparedStatement statement = null; 
           try { 
             statement = connection.prepareStatement(sql); 
             statement.execute(); 
           } 
           finally { 
             if (null != statement) { 
               statement.close(); 
             } 
           } 
         } 
        
        
         public static void execDML(Connection connection, String sql) throws SQLException { 
           PreparedStatement statement = null; 
           ResultSet resultSet = null; 
           ResultSetMetaData resultMetaData = null; 
            
           try { 
             // Run the HQL statement. 
             statement = connection.prepareStatement(sql); 
             resultSet = statement.executeQuery(); 
              
             // Return the queried column name to the console. 
             resultMetaData = resultSet.getMetaData(); 
             int columnCount = resultMetaData.getColumnCount(); 
             for (int i = 1; i <= columnCount; i++) { 
               System.out.print(resultMetaData.getColumnLabel(i) + '\t'); 
             } 
             System.out.println(); 
              
             // Return the results to the console. 
             while (resultSet.next()) { 
               for (int i = 1; i <= columnCount; i++) { 
                 System.out.print(resultSet.getString(i) + '\t'); 
               } 
               System.out.println(); 
             } 
           } 
           finally { 
             if (null != resultSet) { 
               resultSet.close(); 
             } 
              
             if (null != statement) { 
               statement.close(); 
             } 
           } 
         }     
  • Submit a data analysis task using the Python interface, that is, python-examples/pyCLI_sec.py. The authentication mode of the cluster to which the example program connects is the secure mode. Before running the example program, run the kinit command to authenticate the kerberos user with related rights.
    1. Import the HAConnection class.
       
         from pyhs2.haconnection import HAConnection     
    2. Declare the HiveServer IP address list. In this example, hosts indicate the nodes of HiveServer, and xxx.xxx.xxx.xxx indicates the service IP address.
       
         hosts = ["xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"] 
         
      1. If the HiveServer instance is migrated, the original sample program is invalid. You need to update the IP address of the HiveServer after the migration of the HiveServer instance used in the sample program.
      2. If multiple Hive instances are used, update the IP address based on the address of the instance that is actually connected.
    3. Configure the kerberos host name and service name. In this example, the kerberos host name is hadoop and service name is hive.
       
         conf = {"krb_host":"hadoop.<system domain name>", "krb_service":"hive"}     

      If multiple Hive instances are used, change the service name based on the cluster that is actually connected. For example, if the Hive1 instance is connected, change the service name to hive1.

    4. Create a connection, run the HQL statement, and return the queried column name and results to the console.
       
         try: 
             with HAConnection(hosts = hosts, 
                                port = 21066, 
                                authMechanism = "KERBEROS", 
                                configuration = conf) as haConn: 
                 with haConn.getConnection() as conn: 
                     with conn.cursor() as cur: 
                         # show databases 
                         print cur.getdatabases() 
                          
                         # execute query 
                         cur.execute("show tables") 
                          
                         # return column info from query 
                         print cur.getschema() 
                          
                         # fetch table results 
                         for i in cur.fetch(): 
                             print i 
                              
         except exception, e: 
             print e 
            

      If multiple Hive instances are used, you need to modify hosts according to the description in 2 and change the port number based on to the actual port number. The default ports of Hive to Hive4 are 21066 to 21070, respectively.