文档首页/
MapReduce服务 MRS/
开发指南(普通版_3.x)/
Spark2x开发指南(安全模式)/
开发Spark应用/
通过JDBC访问Spark SQL样例程序/
通过JDBC访问Spark SQL样例程序(Java)
更新时间:2024-08-05 GMT+08:00
通过JDBC访问Spark SQL样例程序(Java)
功能简介
使用自定义客户端的JDBC接口提交数据分析任务,并返回结果。
样例代码
- 定义SQL语句。SQL语句必须为单条语句,注意其中不能包含“;”。示例:
ArrayList<String> sqlList = new ArrayList<String>(); sqlList.add("CREATE TABLE CHILD (NAME STRING, AGE INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"); sqlList.add("LOAD DATA INPATH 'hdfs://hacluster/home/data' INTO TABLE CHILD"); sqlList.add("SELECT * FROM child"); sqlList.add("DROP TABLE child"); executeSql(url, sqlList);
- 拼接JDBC URL。
String securityConfig = ";saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.<系统域名>@<系统域名>;user.principal=sparkuser;user.keytab=/opt/FIclient/user.keytab;"; Configuration config = new Configuration(); config.addResource(new Path(args[0])); String zkUrl = config.get("spark.deploy.zookeeper.url"); String zkNamespace = null; zkNamespace = fileInfo.getProperty("spark.thriftserver.zookeeper.namespace"); if (zkNamespace != null) { //从配置项中删除冗余字符 zkNamespace = zkNamespace.substring(1); } StringBuilder sb = new StringBuilder("jdbc:hive2://" + zkUrl + ";serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=" + zkNamespace + securityConfig); String url = sb.toString();
由于KERBEROS认证成功后,默认有效期为1天,超过有效期后,如果客户端需要和JDBCServer新建连接则需要重新认证,否则就会执行失败。因此,若长期执行应用过程中需要新建连接,用户需要在“url”中添加user.principal和user.keytab认证信息,以保证每次建立连接时认证成功,例如,“url”中需要加上“user.principal=sparkuser;user.keytab=/opt/client/user.keytab”。
- 加载Hive JDBC驱动。
Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance();
- 获取JDBC连接,执行HQL,输出查询的列名和结果到控制台,关闭JDBC连接。
连接字符串中的“zk.quorum”也可以使用配置文件中的配置项“spark.deploy.zookeeper.url”来代替。
在网络拥塞的情况下,您还可以设置客户端与JDBCServer连接的超时时间,可以避免客户端由于无限等待服务端的返回而挂起。使用方式如下:
在执行“DriverManager.getConnection”方法获取JDBC连接前,添加“DriverManager.setLoginTimeout(n)”方法来设置超时时长,其中n表示等待服务返回的超时时长,单位为秒,类型为Int,默认为“0”(表示永不超时)。
static void executeSql(String url, ArrayList<String> sqls) throws ClassNotFoundException, SQLException { try { Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance(); } catch (Exception e) { e.printStackTrace(); } Connection connection = null; PreparedStatement statement = null; try { connection = DriverManager.getConnection(url); for (int i =0 ; i < sqls.size(); i++) { String sql = sqls.get(i); System.out.println("---- Begin executing sql: " + sql + " ----"); statement = connection.prepareStatement(sql); ResultSet result = statement.executeQuery(); ResultSetMetaData resultMetaData = result.getMetaData(); Integer colNum = resultMetaData.getColumnCount(); for (int j =1; j <= colNum; j++) { System.out.println(resultMetaData.getColumnLabel(j) + "\t"); } System.out.println(); while (result.next()) { for (int j =1; j <= colNum; j++){ System.out.println(result.getString(j) + "\t"); } System.out.println(); } System.out.println("---- Done executing sql: " + sql + " ----"); } } catch (Exception e) { e.printStackTrace(); } finally { if (null != statement) { statement.close(); } if (null != connection) { connection.close(); } } }