更新时间:2024-06-27 GMT+08:00
分享

通过JDBC访问Spark SQL样例程序(Java)

功能简介

使用自定义客户端的JDBC接口提交数据分析任务,并返回结果。

样例代码

  1. 定义SQL语句。SQL语句必须为单条语句,注意其中不能包含“;”。示例:

    ArrayList<String> sqlList = new ArrayList<String>();
    sqlList.add("CREATE TABLE CHILD (NAME STRING, AGE INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE");
    sqlList.add("LOAD DATA INPATH 'hdfs://hacluster/home/data' INTO TABLE CHILD");
    sqlList.add("SELECT * FROM child");
    sqlList.add("DROP TABLE child");
    executeSql(url, sqlList);

  2. 拼接JDBC URL。

    String securityConfig = ";saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.<系统域名>@<系统域名>;user.principal=sparkuser;user.keytab=/opt/FIclient/user.keytab;";
    Configuration config = new Configuration();
    config.addResource(new Path(args[0]));
    String zkUrl = config.get("spark.deploy.zookeeper.url");
    
    String zkNamespace = null;
    zkNamespace = fileInfo.getProperty("spark.thriftserver.zookeeper.namespace");
    if (zkNamespace != null) {
        //从配置项中删除冗余字符
        zkNamespace = zkNamespace.substring(1);
    }
    
    StringBuilder sb = new StringBuilder("jdbc:hive2://"
            + zkUrl
            + ";serviceDiscoveryMode=zooKeeper;zooKeeperNamespace="
            + zkNamespace
            + securityConfig);
    String url = sb.toString();

    由于KERBEROS认证成功后,默认有效期为1天,超过有效期后,如果客户端需要和JDBCServer新建连接则需要重新认证,否则就会执行失败。因此,若长期执行应用过程中需要新建连接,用户需要在“url”中添加user.principal和user.keytab认证信息,以保证每次建立连接时认证成功,例如,“url”中需要加上“user.principal=sparkuser;user.keytab=/opt/client/user.keytab”。

  3. 加载Hive JDBC驱动。

    Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();

  4. 获取JDBC连接,执行HQL,输出查询的列名和结果到控制台,关闭JDBC连接。

    连接字符串中的“zk.quorum”也可以使用配置文件中的配置项“spark.deploy.zookeeper.url”来代替。

    在网络拥塞的情况下,您还可以设置客户端与JDBCServer连接的超时时间,可以避免客户端由于无限等待服务端的返回而挂起。使用方式如下:

    在执行“DriverManager.getConnection”方法获取JDBC连接前,添加“DriverManager.setLoginTimeout(n)”方法来设置超时时长,其中n表示等待服务返回的超时时长,单位为秒,类型为Int,默认为“0”(表示永不超时)。

    static void executeSql(String url, ArrayList<String> sqls) throws ClassNotFoundException, SQLException {
            try {
                Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();
            } catch (Exception e) {
                e.printStackTrace();
            }
            Connection connection = null;
            PreparedStatement statement = null;
    
            try {
                connection = DriverManager.getConnection(url);
                for (int i =0 ; i < sqls.size(); i++) {
                    String sql = sqls.get(i);
                    System.out.println("---- Begin executing sql: " + sql +  " ----");
                    statement = connection.prepareStatement(sql);
                    ResultSet result = statement.executeQuery();
                    ResultSetMetaData resultMetaData = result.getMetaData();
                    Integer colNum = resultMetaData.getColumnCount();
                    for (int j =1; j <= colNum; j++) {
                        System.out.println(resultMetaData.getColumnLabel(j) + "\t");
                    }
                    System.out.println();
    
                    while (result.next()) {
                        for (int j =1; j <= colNum; j++){
                            System.out.println(result.getString(j) + "\t");
                        }
                        System.out.println();
                    }
                    System.out.println("---- Done executing sql: " + sql +  " ----");
                }
    
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (null != statement) {
                    statement.close();
                }
                if (null != connection) {
                    connection.close();
                }
            }
        }

相关文档