更新时间:2024-08-03 GMT+08:00

通过JDBC方式实现查询HetuEngine SQL任务

功能简介

通过JDBC连接方式,使用用户名和密码连接到HetuEngine,组装对应的SQL发送到HetuEngine执行,并能查询对应的SQL语句执行进度和状态。
import io.XXX.jdbc.XXXResultSet;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;

public class JDBCExampleStatementProgressPercentage{

    private static Properties properties = new Properties();
    public static Connection connection = null;
    public static ResultSet result = null;
    public static PreparedStatement statement = null;
    private static void init() throws ClassNotFoundException {
        // 认证用的密码写入代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全;
        // 本示例以密码保存在环境变量中来实现身份验证为例,运行本示例前,请先在本地环境中设置环境变量HETUENGINE_PASSWORD
        properties.setProperty("user", "YourUserName");
        String password = System.getenv("HETUENGINE_PASSWORD");
        properties.setProperty("password",password);
        Class.forName("io.XXX.jdbc.XXXDriver");
    }

    /**
      * Program entry
      *
      * @param args no need program parameter
      */
    public static void main(String[] args) {
        String url = "jdbc:XXX://192.168.81.37:2181,192.168.195.232:2181,192.168.169.84:2181/hive/default?serviceDiscoveryMode=hsbroker";
        try {
            init();
            String sql = "show tables";
            connection = DriverManager.getConnection(url, properties);
            statement = connection.prepareStatement(sql.trim());
            result = statement.executeQuery();

            XXXResultSet rs = (XXXResultSet) result;
            new Thread() {
                public void run() {
                    Timer timer = new Timer();
                    //表示在3秒之后开始执行,并且每2秒执行一次
                    timer.schedule(new TimerTask() {
                        @Override
                        public void run() {
                            double statementProgressPercentage = rs.getProgressPercentage().orElse(0.0);
                            System.out.println("The Current Query Progress Percentage is " + statementProgressPercentage*100 + "%");
                            if("FINISHED".equals(rs.getStatementStatus().orElse(""))) {
                                System.out.println("The Current Query Progress Percentage is 100%");
                                timer.cancel();
                                Thread.currentThread().interrupt();
                            }
                        }
                    }, 3000, 2000);
                }
            }.start();

            ResultSetMetaData resultMetaData = result.getMetaData();
            int colNum = resultMetaData.getColumnCount();
            for (int j = 1; j <= colNum; j++) {
                try {
                    System.out.print(resultMetaData.getColumnLabel(j) + "\t");
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            }

            while (result.next()) {
                for (int j = 1; j <= colNum; j++) {
                    System.out.print(result.getString(j) + "\t");
                }
                System.out.println();
            }
        } catch (SQLException | ClassNotFoundException e) {
            e.printStackTrace();
        } finally {
            if (result != null) {
                try {
                    result.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

上述代码中各参数说明如表1所示:

表1 参数及参数说明

参数名称

参数说明

url

jdbc:XXX://HSBroker1_IP:HSBroker1_Port,HSBroker2_IP:HSBroker2_Port,HSBroker3_IP:HSBroker3_Port/catalog/schema?serviceDiscoveryMode=hsbroker

说明:
  • XXX:驱动名,请以实际样例代码中的内容为准。
  • catalog、schema分别是JDBC客户端要连接的catalog和schema名称。
  • HSBroker_IP:HSBroker_Port是HSBroker的URL,多个URL以逗号隔开。例如:“192.168.81.37:2181,192.168.195.232:2181,192.168.169.84:2181”

    在Manager页面,选择“集群 > 服务 > HetuEngine > 实例”,获取HSBroker所有实例的业务IP;在“配置”页签,搜索“server.port”,获取HSBroker端口号。

user

访问HetuEngine的用户名,即在集群中创建的“人机”用户的用户名。

password

在集群中创建的“人机”用户的用户密码。

getStatementStatus()

返回当前SQL执行语句的执行状态,一共十一种状态:{‘RUNNING’, ‘FAILED’, ‘FINISHED’, ‘QUEUED’, ‘WAITING_FOR_RESOURCES’, ‘DISPATCHING’, ‘PLANNING’, ‘STARTING’, ‘RESCHEDULING’, ‘RESUMING’, ‘FINISHING’}。

getProgressPercentage()

返回当前SQL执行语句的执行进度,取值范围为【0-1】。