Updated on 2022-09-14 GMT+08:00

Example Program Guide

Function

This section describes how to use an example program to complete an analysis task. An example program can submit a task by using the following methods:

  • Submitting a data analysis task by using JDBC interfaces
  • Submitting a data analysis task by using Python

Example Codes

  • Submit a data analysis task using the Hive Java database connectivity (JDBC) interface, that is, JDBCExample.java.
    1. Read the property file of the HiveServer client. The hiveclient.properties file is saved in the resources directory of the JDBC example program provided by Hive.
      Properties clientInfo = null;
      String userdir = System.getProperty("user.dir") + File.separator
      + "conf" + File.separator;
      InputStream fileInputStream = null;
      try{
      clientInfo = new Properties();
      //hiveclient.properties indicates the client configuration file. If the multiple instances feature is used, the file must be replaced with the hiveclient.properties file on the instance client.
      //hiveclient.properties is located under the config directory of the directory where the instance client installation package is decompressed.
      String hiveclientProp = userdir + "hiveclient.properties" ;
      File propertiesFile = new File(hiveclientProp);
      fileInputStream = new FileInputStream(propertiesFile);
      clientInfo.load(fileInputStream);
      }catch (Exception e) {
      throw new IOException(e);
      }finally{
      if(fileInputStream != null){
      fileInputStream.close();
      fileInputStream = null;
      }
      }
    1. Obtain the IP address and port list of ZooKeeper, the cluster authentication mode, the SASL configuration of HiveServers, node names of HiveServers in ZooKeeper, the discovery mode from the client to the server, and the principal server process for user authentication. You can read all these configurations from the hiveclient.properties file.
          //The format of zkQuorum is xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181"; 
        //xxx.xxx.xxx.xxx is the IP address of the node where ZooKeeper resides. The default port is 2181. 
          zkQuorum = clientInfo.getProperty("zk.quorum");
          auth = clientInfo.getProperty("auth");
          sasl_qop = clientInfo.getProperty("sasl.qop");
          zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
          serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
          principal = clientInfo.getProperty("principal"); 
           
    2. In security mode, the kerberos user and keytab file path are required for login authentication. For details about how to obtain USER_NAME, USER_KEYTAB_FILE, and KRB5_FILE, see Running JDBC and Viewing Results.
          // Set the userName of new user.
          USER_NAME = "xxx";
          // Set the keytab and krb5 files location of client.
          String userdir = System.getProperty("user.dir") + File.separator 
                  + "conf" + File.separator;
          USER_KEYTAB_FILE = userdir + "user.keytab";
          KRB5_FILE = userdir + "krb5.conf"; 
    3. Define HQL. HQL must be a single statement and cannot contain semicolons (;).
           // Define HQL. HQL cannot contain ";" 
           String[] sqls = {"CREATE TABLE IF NOT EXISTS employees_info(id INT,name STRING)", 
                  "SELECT COUNT(*) FROM employees_info", "DROP TABLE employees_info"};     
    4. Build JDBC URL.

      You can also implement pre-authentication without the need of providing the account and keytab file path. For details, see JDBC code example 2 in Examples. If IBM JDK is used to run Hive applications, pre-authentication in JDBC sample code 2 must be implemented.

      The following is an example of the JDBC URL composed of code snippets:

      jdbc:hive2://xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;sasl.qop=auth-conf;auth=KERBEROS;principal=hive/hadoop.<system domain name>@<system domain name>;

      You can login in to FusionInsight Manager, choose System > Permission > Domain and Mutual Trust, and check the value of Local Domain, which is the current system domain name.

      hive/hadoop.<system domain name> is the user name. All letters in the system domain name contained in the user name of the system are lowercase letters. For example, if Local domain is set to 9427068F-6EFA-4833-B43E-60CB641E5B6C.COM, the user is hive/hadoop.9427068f-6efa-4833-b43e-60cb641e5b6c.com.

           // Concat JDBC URL 
           StringBuilder sBuilder = new StringBuilder( 
               "jdbc:hive2://").append(zkQuorum).append("/"); 
            
           if ("KERBEROS".equalsIgnoreCase(auth)) { 
            sBuilder.append(";serviceDiscoveryMode=") 
                    .append(serviceDiscoveryMode)
                    .append(";zooKeeperNamespace=")
                    .append(zooKeeperNamespace)
                    .append(";sasl.qop=")
                    .append(sasl_qop)
                    .append(";auth=")
                    .append(auth)
                    .append(";principal=")
                    .append(principal)
                                .append(";user.principal=")
                    .append(USER_NAME)
                    .append(";user.keytab=")
                    .append(USER_KEYTAB_FILE)
                    .append(";");
                     
           } else {
             // Normal mode 
             sBuilder.append(";serviceDiscoveryMode=")  
                     .append(serviceDiscoveryMode) 
                     .append(";zooKeeperNamespace=") 
                     .append(zooKeeperNamespace)
                     .append(";auth=none;"); 
           } 
           String url = sBuilder.toString();   
    5. Load the Hive JDBC driver.
       
         // Load the Hive JDBC driver. 
         Class.forName(HIVE_DRIVER);
    6. Obtain the JDBC connection, confirm the HQL type (DDL/DML), call ports to run the HQL statement, return the queried column name and results to the console, and close the JDBC connection.
       
         Connection connection = null; 
           try { 
             // Obtain the JDBC connection. 
             // If the normal mode is used, the second parameter needs to be set to a correct username. Otherwise, the anonymous user will be used for login.
             connection = DriverManager.getConnection(url, "", ""); 
                
           // Create a table 
           // To import data to a table after the table is created, you can use the LOAD statement. For example, import data from the HDFS to the table. 
             //load data inpath '/tmp/employees.txt' overwrite into table employees_info; 
             execDDL(connection,sqls[0]); 
             System.out.println("Create table success!"); 
               
           // Query 
           execDML(connection,sqls[1]); 
        
           // Delete the table
           execDDL(connection,sqls[2]); 
           System.out.println("Delete table success!"); 
         } 
         finally {
           // Close the JDBC connection. 
             if (null != connection) { 
               connection.close(); 
             }
        
       public static void execDDL(Connection connection, String sql) 
         throws SQLException { 
           PreparedStatement statement = null; 
           try { 
             statement = connection.prepareStatement(sql); 
             statement.execute(); 
           } 
           finally { 
             if (null != statement) { 
               statement.close(); 
             } 
           } 
         } 
        
        
         public static void execDML(Connection connection, String sql) throws SQLException { 
           PreparedStatement statement = null; 
           ResultSet resultSet = null; 
           ResultSetMetaData resultMetaData = null; 
            
           try { 
             // Run the HQL statement. 
             statement = connection.prepareStatement(sql); 
             resultSet = statement.executeQuery(); 
              
             // Return the queried column name to the console. 
             resultMetaData = resultSet.getMetaData(); 
             int columnCount = resultMetaData.getColumnCount(); 
             for (int i = 1; i <= columnCount; i++) { 
               System.out.print(resultMetaData.getColumnLabel(i) + '\t'); 
             } 
             System.out.println(); 
              
             // Return the results to the console. 
             while (resultSet.next()) { 
               for (int i = 1; i <= columnCount; i++) { 
                 System.out.print(resultSet.getString(i) + '\t'); 
               } 
               System.out.println(); 
             } 
           } 
           finally { 
             if (null != resultSet) { 
               resultSet.close(); 
             } 
              
             if (null != statement) { 
               statement.close(); 
             } 
           } 
         }     
  • Submit a data analysis task using the Python interface, that is, python-examples/pyCLI_nosec.py.
    1. Import the HAConnection class.
       
         from pyhs2.haconnection import HAConnection     
    2. Declare the HiveServer IP address list. In this example, hosts indicate the nodes of HiveServer, and xxx.xxx.xxx.xxx indicates the business IP address.
       
         hosts = ["xxx.xxx.xxx.xxx", "xxx.xxx.xxx.xxx"] 
         
      1. If the HiveServer instance is migrated, the original sample program is invalid. You need to update the IP address of the HiveServer after the migration of the HiveServer instance used in the sample program.
      2. If multiple Hive instances are used, update the IP address based on the address of the instance that is actually connected.
    3. Set the third parameter of the HAConnection constructor to a correct username. The password can be left empty. Create a connection, run the HQL statement, and return the queried column name and results to the console.
       
       try:
          with HAConnection(hosts = hosts,
                             port = 21066,
                             authMechanism = "PLAIN",
                             user='user1',
                             password='******') as haConn:
              with haConn.getConnection() as conn:
                  with conn.cursor() as cur:
                      # Show databases
                      print cur.getDatabases()
                      # Execute query
                      cur.execute("show tables")
                      # Return column info from query
                      print cur.getSchema()
                      # Fetch table results
                      for i in cur.fetch():
                          print i
      except Exception, e:
          print e
      
            

      If multiple Hive instances are used, you need to modify hosts according to the description in 2 and change the port number based on to the actual port number. The default ports of Hive to Hive4 are 21066 to 21070, respectively.