更新时间:2022-07-19 GMT+08:00
分享

示例

JDBC二次开发示例代码一

以下示例代码主要功能如下。

  1. 在JDBC URL地址中提供用户名和密钥文件路径,程序自动完成安全登录、建立Hive连接。

    MRS 1.9.2及之前版本ZooKeeper端口号默认为24002,详见MRS Manager的Zookeeper配置。

  2. 执行创建表、查询和删除三类HQL语句。
package com.huawei.bigdata.hive.example;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Properties;


import org.apache.hadoop.conf.Configuration;
import com.huawei.bigdata.security.LoginUtil;

public class JDBCExample {
  private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
  
  private static final String ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME = "Client";
  private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";    
  private static final String ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL = "zookeeper/hadoop";
  
  private static Configuration CONF = null; 
  private static String KRB5_FILE = null;
  private static String USER_NAME = null;
  private static String USER_KEYTAB_FILE = null;
  
  private static String zkQuorum = null;//zookeeper节点ip和端口列表
  private static String auth = null;
  private static String sasl_qop = null;
  private static String zooKeeperNamespace = null;
  private static String serviceDiscoveryMode = null;
  private static String principal = null;
  private static void init() throws IOException{
    CONF = new Configuration();

    Properties clientInfo = null;
    String userdir = System.getProperty("user.dir") + File.separator
        + "conf" + File.separator;
    System.out.println(userdir);
    InputStream fileInputStream = null;
    try{
      clientInfo = new Properties();
      //"hiveclient.properties"为客户端配置文件,如果使用多实例特性,需要把该文件换成对应实例客户端下的"hiveclient.properties"
      //"hiveclient.properties"文件位置在对应实例客户端安裝包解压目录下的config目录下
      String hiveclientProp = userdir + "hiveclient.properties" ;
      File propertiesFile = new File(hiveclientProp);
      fileInputStream = new FileInputStream(propertiesFile);
      clientInfo.load(fileInputStream);
    }catch (Exception e) {
      throw new IOException(e);
    }finally{
      if(fileInputStream != null){
        fileInputStream.close();
        fileInputStream = null;
      }
    }
    //zkQuorum获取后的格式为"xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181";
    //"xxx.xxx.xxx.xxx"为集群中ZooKeeper所在节点的业务IP,端口默认是2181
    zkQuorum =  clientInfo.getProperty("zk.quorum");
    auth = clientInfo.getProperty("auth");
    sasl_qop = clientInfo.getProperty("sasl.qop");
    zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
    serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
    principal = clientInfo.getProperty("principal"); 
    // 设置新建用户的USER_NAME,其中"xxx"指代之前创建的用户名,例如创建的用户为user,则USER_NAME为user
    USER_NAME = "userx";

    if ("KERBEROS".equalsIgnoreCase(auth)) {
      // 设置客户端的keytab和krb5文件路径
      USER_KEYTAB_FILE = userdir + "user.keytab";
      KRB5_FILE = userdir + "krb5.conf";

      LoginUtil.setJaasConf(ZOOKEEPER_DEFAULT_LOGIN_CONTEXT_NAME, USER_NAME, USER_KEYTAB_FILE);
      LoginUtil.setZookeeperServerPrincipal(ZOOKEEPER_SERVER_PRINCIPAL_KEY, ZOOKEEPER_DEFAULT_SERVER_PRINCIPAL);

      // 安全模式
      // Zookeeper登录认证
      LoginUtil.login(USER_NAME, USER_KEYTAB_FILE, KRB5_FILE, CONF);
    }
  }
  
  /**
   * 本示例演示了如何使用Hive JDBC接口来执行HQL命令<br>
   * <br>
   * 
   * @throws ClassNotFoundException
   * @throws IllegalAccessException
   * @throws InstantiationException
   * @throws SQLException
   * @throws IOException 
   */
  public static void main(String[] args) throws InstantiationException,
      IllegalAccessException, ClassNotFoundException, SQLException, IOException{
    // 参数初始化
    init();

    // 定义HQL,HQL为单条语句,不能包含“;”
    String[] sqls = {"CREATE TABLE IF NOT EXISTS employees_info(id INT,name STRING)",
        "SELECT COUNT(*) FROM employees_info", "DROP TABLE employees_info"};

    // 拼接JDBC URL
    StringBuilder sBuilder = new StringBuilder(
        "jdbc:hive2://").append(zkQuorum).append("/");

    if ("KERBEROS".equalsIgnoreCase(auth)) {
      sBuilder.append(";serviceDiscoveryMode=") 
              .append(serviceDiscoveryMode)
              .append(";zooKeeperNamespace=")
              .append(zooKeeperNamespace)
              .append(";sasl.qop=")
              .append(sasl_qop)
              .append(";auth=")
              .append(auth)
              .append(";principal=")
              .append(principal)
              .append(";");
    } else {
     //普通模式
      sBuilder.append(";serviceDiscoveryMode=") 
              .append(serviceDiscoveryMode)
              .append(";zooKeeperNamespace=")
              .append(zooKeeperNamespace)
              .append(";auth=none");
    }
    String url = sBuilder.toString();
    
    // 加载Hive JDBC驱动
    Class.forName(HIVE_DRIVER);

    Connection connection = null;
    try {
      System.out.println(url);
      // 获取JDBC连接
      // 如果使用的是普通模式,那么第二个参数需要填写正确的用户名,否则会以匿名用户(anonymous)登录
      connection = DriverManager.getConnection(url, "", "");
        
      // 建表
      // 表建完之后,如果要往表中导数据,可以使用LOAD语句将数据导入表中,比如从HDFS上将数据导入表:
      // load data inpath '/tmp/employees.txt' overwrite into table employees_info;
      execDDL(connection,sqls[0]);
      System.out.println("Create table success!");
       
      // 查询
      execDML(connection,sqls[1]);
        
      // 删表
      execDDL(connection,sqls[2]);
      System.out.println("Delete table success!");
    }
    finally {
      // 关闭JDBC连接
      if (null != connection) {
        connection.close();
      }
    }
  }
  
  public static void execDDL(Connection connection, String sql)
  throws SQLException {
    PreparedStatement statement = null;
    try {
      statement = connection.prepareStatement(sql);
      statement.execute();
    }
    finally {
      if (null != statement) {
        statement.close();
      }
    }
  }


  public static void execDML(Connection connection, String sql) throws SQLException {
    PreparedStatement statement = null;
    ResultSet resultSet = null;
    ResultSetMetaData resultMetaData = null;
    
    try {
      // 执行HQL
      statement = connection.prepareStatement(sql);
      resultSet = statement.executeQuery();
      
      // 输出查询的列名到控制台
      resultMetaData = resultSet.getMetaData();
      int columnCount = resultMetaData.getColumnCount();
      for (int i = 1; i <= columnCount; i++) {
        System.out.print(resultMetaData.getColumnLabel(i) + '\t');
      }
      System.out.println();
      
      // 输出查询结果到控制台
      while (resultSet.next()) {
        for (int i = 1; i <= columnCount; i++) {
          System.out.print(resultSet.getString(i) + '\t');
        }
        System.out.println();
      }
    }
    finally {
      if (null != resultSet) {
        resultSet.close();
      }
      
      if (null != statement) {
        statement.close();
      }
    }
  }
}

JDBC二次开发示例代码二

以下示例代码主要功能如下。

  1. 用户自行进行安全登录,不在JDBC URL地址中提供用户和密钥文件路径,建立Hive连接。
  2. 执行创建表、查询和删除三类HQL语句。

程序在访问ZooKeeper时会使用jaas配置文件,例如user.hive.jaas.conf,具体信息如下。

Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="D:\\workspace\\jdbc-examples\\conf\\user.keytab"
principal="xxx@HADOOP.COM"
useTicketCache=false
storeKey=true
debug=true;
};

用户需要根据实际环境,修改上述配置文件的keyTab路径(绝对路径)和principal,并设置环境变量java.security.auth.login.config指向该文件所在路径。

package com.huawei.bigdata.hive.example;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;

public class JDBCExamplePreLogin {
  private static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
  
  /**
   * 本示例演示了如何使用Hive JDBC接口来执行HQL命令<br>
   * <br>
   * 
   * @throws ClassNotFoundException
   * @throws IllegalAccessException
   * @throws InstantiationException
   * @throws SQLException
   */
  public static void main(String[] args) throws InstantiationException,
      IllegalAccessException, ClassNotFoundException, SQLException ,IOException{
    
    Properties clientInfo = null;
    String userdir = System.getProperty("user.dir") + File.separator
        + "conf" + File.separator;
    InputStream fileInputStream = null;
    try{
      clientInfo = new Properties();
      //"hiveclient.properties"为客户端配置文件,如果使用多实例特性,需要把该文件换成对应实例客户端下的"hiveclient.properties"
      //"hiveclient.properties"文件位置在对应实例客户端安裝包解压目录下的config目录下
      String hiveclientProp = userdir + "hiveclient.properties" ;
      File propertiesFile = new File(hiveclientProp);
      fileInputStream = new FileInputStream(propertiesFile);
      clientInfo.load(fileInputStream);
    }catch (Exception e) {
      throw new IOException(e);
    }finally{
      if(fileInputStream != null){
        fileInputStream.close();
        fileInputStream = null;
      }
    }
    //zkQuorum获取后的格式为"xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181";
    //"xxx.xxx.xxx.xxx"为集群中ZooKeeper所在节点的业务IP,端口默认是2181
    String zkQuorum =  clientInfo.getProperty("zk.quorum");
    String auth = clientInfo.getProperty("auth");
    String sasl_qop = clientInfo.getProperty("sasl.qop");
    String zooKeeperNamespace = clientInfo.getProperty("zooKeeperNamespace");
    String serviceDiscoveryMode = clientInfo.getProperty("serviceDiscoveryMode");
    String principal = clientInfo.getProperty("principal"); 
    
    // 定义HQL,HQL为单条语句,不能包含“;”
    String[] sqls = {"CREATE TABLE IF NOT EXISTS employees_info(id INT,name STRING)",
        "SELECT COUNT(*) FROM employees_info", "DROP TABLE employees_info"};

    // 拼接JDBC URL
    StringBuilder sBuilder = new StringBuilder(
        "jdbc:hive2://").append(zkQuorum).append("/");
    
    if ("KERBEROS".equalsIgnoreCase(auth)) {

      // 设置属性java.security.krb5.conf,以此指定将访问的安全服务的信息
      System.setProperty("java.security.krb5.conf", "conf/krb5.conf");
      // 设置需要使用的jaas配置文件,请根据实际情况修改user.hive.jaas.conf中的keyTab和principal
      System.setProperty("java.security.auth.login.config",
          "conf/user.hive.jaas.conf");
      
      Configuration conf = new Configuration();
      conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
      conf.set(HADOOP_SECURITY_AUTHORIZATION, "true");
      String PRINCIPAL = "username.client.kerberos.principal";
      String KEYTAB = "username.client.keytab.file";
      // 设置客户端的keytab文件路径
      conf.set(KEYTAB, "conf/user.keytab");
      // 设置新建用户的userPrincipal,此处填写为带域名的用户名,例如创建的用户为user,域为HADOOP.COM,则其userPrincipal则为user@HADOOP.COM。
      conf.set(PRINCIPAL, "xxx@xxx");
      
      // 进行登录认证 
      UserGroupInformation.setConfiguration(conf);
      SecurityUtil.login(conf, KEYTAB, PRINCIPAL);
  
      // 安全模式
      sBuilder.append(";serviceDiscoveryMode=") 
              .append(serviceDiscoveryMode)
              .append(";zooKeeperNamespace=")
              .append(zooKeeperNamespace)
              .append(";sasl.qop=")
              .append(sasl_qop)
              .append(";auth=")
              .append(auth)
              .append(";principal=")
              .append(principal)
              .append(";");
    } else {
      // 普通模式
      sBuilder.append(";serviceDiscoveryMode=") 
              .append(serviceDiscoveryMode)
              .append(";zooKeeperNamespace=")
              .append(zooKeeperNamespace)
              .append(";auth=none");
    }
    String url = sBuilder.toString();
    
    // 加载Hive JDBC驱动
    Class.forName(HIVE_DRIVER);

    Connection connection = null;
    try {
      // 获取JDBC连接
      // 如果使用的是普通模式,那么第二个参数需要填写正确的用户名,否则会以匿名用户(anonymous)登录
      connection = DriverManager.getConnection(url, "", "");

      // 建表
      // 表建完之后,如果要往表中导数据,可以使用LOAD语句将数据导入表中,比如从HDFS上将数据导入表:
      // load data inpath '/tmp/employees.txt' overwrite into table employees_info;
      execDDL(connection,sqls[0]);
      System.out.println("Create table success!");
       
      // 查询
      execDML(connection,sqls[1]);
        
      // 删表
      execDDL(connection,sqls[2]);
      System.out.println("Delete table success!");
    }
    finally {
      // 关闭JDBC连接
      if (null != connection) {
        connection.close();
      }
    }
  }
  
  public static void execDDL(Connection connection, String sql)
  throws SQLException {
    PreparedStatement statement = null;
    try {
      statement = connection.prepareStatement(sql);
      statement.execute();
    }
    finally {
      if (null != statement) {
        statement.close();
      }
    }
  }


  public static void execDML(Connection connection, String sql) throws SQLException {
    PreparedStatement statement = null;
    ResultSet resultSet = null;
    ResultSetMetaData resultMetaData = null;
    
    try {
      // 执行HQL
      statement = connection.prepareStatement(sql);
      resultSet = statement.executeQuery();
      
      // 输出查询的列名到控制台
      resultMetaData = resultSet.getMetaData();
      int columnCount = resultMetaData.getColumnCount();
      for (int i = 1; i <= columnCount; i++) {
        System.out.print(resultMetaData.getColumnLabel(i) + '\t');
      }
      System.out.println();
      
      // 输出查询结果到控制台
      while (resultSet.next()) {
        for (int i = 1; i <= columnCount; i++) {
          System.out.print(resultSet.getString(i) + '\t');
        }
        System.out.println();
      }
    }
    finally {
      if (null != resultSet) {
        resultSet.close();
      }
      
      if (null != statement) {
        statement.close();
      }
    }
  }

}

HCatalog二次开发示例代码

以下示例代码演示了如何使用HCatalog提供的HCatInputFormat和HCatOutputFormat接口提交MapReduce任务。

public class HCatalogExample extends Configured implements Tool {

    public static class Map extends
            Mapper<LongWritable, HCatRecord, IntWritable, IntWritable> {
        int age;
        @Override
        protected void map(
                LongWritable key,
                HCatRecord value,
                org.apache.hadoop.mapreduce.Mapper<LongWritable, HCatRecord,
                        IntWritable, IntWritable>.Context context)
                throws IOException, InterruptedException {
            age = (Integer) value.get(0);
            context.write(new IntWritable(age), new IntWritable(1));
        }
    }
    
    public static class Reduce extends Reducer<IntWritable, IntWritable,
    IntWritable, HCatRecord> {
      @Override
      protected void reduce(
              IntWritable key,
              java.lang.Iterable<IntWritable> values,
              org.apache.hadoop.mapreduce.Reducer<IntWritable, IntWritable,
              IntWritable, HCatRecord>.Context context)
              throws IOException, InterruptedException {
          int sum = 0;
          Iterator<IntWritable> iter = values.iterator();
          while (iter.hasNext()) {
              sum++;
              iter.next();
          }
          HCatRecord record = new DefaultHCatRecord(2);
          record.set(0, key.get());
          record.set(1, sum);

          context.write(null, record);
        }
    }

    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        String[] otherArgs = args;
        
        String inputTableName = otherArgs[0];
        String outputTableName = otherArgs[1];
        String dbName = "default";

        @SuppressWarnings("deprecation")
        Job job = new Job(conf, "GroupByDemo");
        
        HCatInputFormat.setInput(job, dbName, inputTableName);
        job.setInputFormatClass(HCatInputFormat.class);
        job.setJarByClass(HCatalogExample.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(WritableComparable.class);
        job.setOutputValueClass(DefaultHCatRecord.class);
        
        OutputJobInfo outputjobInfo = OutputJobInfo.create(dbName,outputTableName, null);
        HCatOutputFormat.setOutput(job, outputjobInfo);
        HCatSchema schema = outputjobInfo.getOutputSchema();
        HCatOutputFormat.setSchema(job, schema);
        job.setOutputFormatClass(HCatOutputFormat.class);
        
        return (job.waitForCompletion(true) ? 0 : 1);
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new HCatalogExample(), args);
        System.exit(exitCode);
    }
}
分享:

    相关文档

    相关产品