通过JDBC访问Spark SQL样例程序(Scala)
功能简介
使用自定义客户端的JDBC接口提交数据分析任务,并返回结果。
样例代码
- 定义SQL语句。SQL语句必须为单条语句,注意其中不能包含“;”。示例:
val sqlList = new ArrayBuffer[String] sqlList += "CREATE TABLE CHILD (NAME STRING, AGE INT) " + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" sqlList += "LOAD DATA LOCAL INPATH '/home/data' INTO TABLE CHILD" sqlList += "SELECT * FROM child" sqlList += "DROP TABLE child"
样例工程中的data文件需要放到JDBCServer所在机器的home目录下。
- 拼接JDBC URL。
val config: Configuration = new Configuration() config.addResource(new Path(args(0))) val zkUrl = config.get("spark.deploy.zookeeper.url") var zkNamespace: String = null zkNamespace = fileInfo.getProperty("spark.thriftserver.zookeeper.namespace") //从配置项中删除冗余字符 if (zkNamespace != null) zkNamespace = zkNamespace.substring(1) val sb = new StringBuilder("jdbc:hive2://" + zkUrl + "/;serviceDiscoveryMode=zooKeeper;" + "zooKeeperNamespace=" + zkNamespace + ";"); val url = sb.toString()
- 加载Hive JDBC驱动。获取JDBC连接,执行HQL,输出查询的列名和结果到控制台,关闭JDBC连接。
连接字符串中的“zk.quorum”也可以使用配置文件中的配置项“spark.deploy.zookeeper.url”来代替。
在网络拥塞的情况下,您还可以设置客户端与JDBCServer连接的超时时间,可以避免客户端由于无限等待服务端的返回而挂起。使用方式如下:
在执行“DriverManager.getConnection”方法获取JDBC连接前,添加“DriverManager.setLoginTimeout(n)”方法来设置超时时长,其中n表示等待服务返回的超时时长,单位为秒,类型为Int,默认为“0”(表示永不超时)。
def executeSql(url: String, sqls: Array[String]): Unit = { //加载Hive JDBC驱动。 Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance() var connection: Connection = null var statement: PreparedStatement = null try { connection = DriverManager.getConnection(url) for (sql <- sqls) { println(s"---- Begin executing sql: $sql ----") statement = connection.prepareStatement(sql) val result = statement.executeQuery() val resultMetaData = result.getMetaData val colNum = resultMetaData.getColumnCount for (i <- 1 to colNum) { print(resultMetaData.getColumnLabel(i) + "\t") } println() while (result.next()) { for (i <- 1 to colNum) { print(result.getString(i) + "\t") } println() } println(s"---- Done executing sql: $sql ----") } } finally { if (null != statement) { statement.close() } if (null != connection) { connection.close() } } }