Updated on 2022-11-18 GMT+08:00

Java Sample Code

Function

The JDBC interface of the user-defined client is used to submit the data analysis task and return the results.

Sample Code

  1. Define an SQL statement. The SQL statement must be a single statement that cannot contain the semicolon (;). For example,

    ArrayList<String> sqlList = new ArrayList<String>();
    sqlList.add("CREATE TABLE CHILD (NAME STRING, AGE INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','");
    sqlList.add("LOAD DATA LOCAL INPATH '/home/data' INTO TABLE CHILD");
    sqlList.add("SELECT * FROM child");
    sqlList.add("DROP TABLE child");
    executeSql(url, sqlList);

    The data file in the sample project must be placed into the Home directory of the host where the JDBCServer is located.

  2. Assemble the JDBC URL.

    String securityConfig = ";saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.<system domain name>@<system domain name>;user.principal=sparkuser;user.keytab=/opt/FIclient/user.keytab;";
    Configuration config = new Configuration();
    config.addResource(new Path(args[0]));
    String zkUrl = config.get("spark.deploy.zookeeper.url");
    
    String zkNamespace = null;
    zkNamespace = fileInfo.getProperty("spark.thriftserver.zookeeper.namespace");
    if (zkNamespace != null) {
        //Remove redundant characters from configuration items
        zkNamespace = zkNamespace.substring(1);
    }
    
    StringBuilder sb = new StringBuilder("jdbc:hive2://"
            + zkUrl
            + ";serviceDiscoveryMode=zooKeeper;zooKeeperNamespace="
            + zkNamespace
            + securityConfig);
    String url = sb.toString();

    The default validity period of KERBEROS authentication is one day. After the validity period, the authentication needs to be performed again if you want to connect the client and JDBCServer. You can add the user.principal and user.keytab authentication information to url to ensure that the authentication is performed each time the connection is established. For example, add user.principal=sparkuser;user.keytab=/opt/client/user.keytab to url.

  3. Load the Hive JDBC driver.

    Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();

  4. Obtain the JDBC connection, execute the HQL, export the obtained column name and results to the console, and close the JDBC connection.

    The zk.quorum in the connection string can be replaced by spark.deploy.zookeeper.url in the configuration file.

    In network congestion, configure the timeout of the connection between the client and JDBCServer to avoid the suspending of the client due to timeless wait of the return from the server. The method is as follows:

    Before executing the DriverManager.getConnection script to obtain the JDBC connection, add the DriverManager.setLoginTimeout(n) script to configure the timeout. n indicates the timeout length of waiting for the return from the server. The unit is second, the type is Int, and the default value is 0 (indicating never timing out).

    static void executeSql(String url, ArrayList<String> sqls) throws ClassNotFoundException, SQLException {
            try {
                Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();
            } catch (Exception e) {
                e.printStackTrace();
            }
            Connection connection = null;
            PreparedStatement statement = null;
    
            try {
                connection = DriverManager.getConnection(url);
                for (int i =0 ; i < sqls.size(); i++) {
                    String sql = sqls.get(i);
                    System.out.println("---- Begin executing sql: " + sql +  " ----");
                    statement = connection.prepareStatement(sql);
                    ResultSet result = statement.executeQuery();
                    ResultSetMetaData resultMetaData = result.getMetaData();
                    Integer colNum = resultMetaData.getColumnCount();
                    for (int j =1; j <= colNum; j++) {
                        System.out.println(resultMetaData.getColumnLabel(j) + "\t");
                    }
                    System.out.println();
    
                    while (result.next()) {
                        for (int j =1; j <= colNum; j++){
                            System.out.println(result.getString(j) + "\t");
                        }
                        System.out.println();
                    }
                    System.out.println("---- Done executing sql: " + sql +  " ----");
                }
    
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (null != statement) {
                    statement.close();
                }
                if (null != connection) {
                    connection.close();
                }
            }
        }