更新时间:2024-08-03 GMT+08:00

通过JDBC访问Spark SQL样例程序(Scala)

功能简介

使用自定义客户端的JDBC接口提交数据分析任务,并返回结果。

样例代码

  1. 定义SQL语句。SQL语句必须为单条语句,注意其中不能包含“;”。示例:

    val sqlList = new ArrayBuffer[String]
    sqlList += "CREATE TABLE CHILD (NAME STRING, AGE INT) " +
    "ROW FORMAT DELIMITED FIELDS TERMINATED BY ','"
    sqlList += "LOAD DATA LOCAL INPATH '/home/data' INTO TABLE CHILD"
    sqlList += "SELECT * FROM child"
    sqlList += "DROP TABLE child"

    样例工程中的data文件需要放到JDBCServer所在机器的home目录下。

  2. 拼接JDBC URL。

    val securityConfig = ";saslQop=auth-conf;auth=KERBEROS;principal=spark2x/hadoop.<系统域名>@<系统域名>;user.principal=sparkuser;user.keytab=/opt/FIclient/user.keytab;"
    val config: Configuration = new Configuration()
    config.addResource(new Path(args(0)))
    val zkUrl = config.get("spark.deploy.zookeeper.url")
    
    var zkNamespace: String = null
    zkNamespace = fileInfo.getProperty("spark.thriftserver.zookeeper.namespace")
    //从配置项中删除冗余字符
    if (zkNamespace != null) zkNamespace = zkNamespace.substring(1)
    val sb = new StringBuilder("jdbc:hive2://"
      + zkUrl
      + ";serviceDiscoveryMode=zooKeeper;zooKeeperNamespace="
      + zkNamespace
      + securityConfig)
    val url = sb.toString()

    由于KERBEROS认证成功后,默认有效期为1天,超过有效期后,如果客户端需要和JDBCServer新建连接则需要重新认证,否则就会执行失败。因此,若长期执行应用过程中需要新建连接,用户需要在“url”中添加user.principal和user.keytab认证信息,以保证每次建立连接时认证成功,例如,“url”中需要加上“user.principal=sparkuser;user.keytab=/opt/client/user.keytab”。

  3. 加载Hive JDBC驱动,获取JDBC连接,执行HQL,输出查询的列名和结果到控制台,关闭JDBC连接。

    连接字符串中的“zk.quorum”也可以使用配置文件中的配置项“spark.deploy.zookeeper.url”来代替。

    在网络拥塞的情况下,您还可以设置客户端与JDBCServer连接的超时时间,可以避免客户端由于无限等待服务端的返回而挂起。使用方式如下:

    在执行“DriverManager.getConnection”方法获取JDBC连接前,添加“DriverManager.setLoginTimeout(n)”方法来设置超时时长,其中n表示等待服务返回的超时时长,单位为秒,类型为Int,默认为“0”(表示永不超时)。

    def executeSql(url: String, sqls: Array[String]): Unit = {
    //加载Hive JDBC驱动。
    Class.forName("org.apache.hive.jdbc.HiveDriver").newInstance()
    
    var connection: Connection = null
    var statement: PreparedStatement = null
    try {
      connection = DriverManager.getConnection(url)
      for (sql <- sqls) {
        println(s"---- Begin executing sql: $sql ----")
        statement = connection.prepareStatement(sql)
    
        val result = statement.executeQuery()
    
        val resultMetaData = result.getMetaData
        val colNum = resultMetaData.getColumnCount
        for (i <- 1 to colNum) {
          print(resultMetaData.getColumnLabel(i) + "\t")
        }
        println()
    
        while (result.next()) {
          for (i <- 1 to colNum) {
            print(result.getString(i) + "\t")
          }
          println()
        }
        println(s"---- Done executing sql: $sql ----")
      }
    } finally {
      if (null != statement) {
        statement.close()
      }
    
      if (null != connection) {
        connection.close()
      }
    }
    }