分析Hive数据
功能介绍
本小节介绍了如何使用样例程序完成分析任务。样例程序主要有以下方式。
- 使用JDBC接口提交数据分析任务。
- 使用HCatalog接口提交数据分析任务。
样例代码
- 使用Hive JDBC接口提交数据分析任务,参考样例程序中的JDBCExample.java。
- 定义HiveQL。HiveQL必须为单条语句,注意HiveQL不能包含“;”。
// 定义HQL,不能包含“;” String[] sqls = {"CREATE TABLE IF NOT EXISTS employees_info(id INT,name STRING)", "SELECT COUNT(*) FROM employees_info", "DROP TABLE employees_info"};
- 拼接JDBC URL。
// 拼接JDBC URL StringBuilder sBuilder = new StringBuilder( "jdbc:hive2://").append(clientInfo.getZkQuorum()).append("/"); if (isSecurityMode) { // 安全模式 // ZooKeeper登录认证 sBuilder.append(";serviceDiscoveryMode=") .append(clientInfo.getServiceDiscoveryMode()) .append(";zooKeeperNamespace=") .append(clientInfo.getZooKeeperNamespace()) .append(";sasl.qop=") .append(clientInfo.getSaslQop()) .append(";auth=") .append(clientInfo.getAuth()) .append(";principal=") .append(clientInfo.getPrincipal()) .append(";"); } else { // 普通模式 sBuilder.append(";serviceDiscoveryMode=") .append(clientInfo.getServiceDiscoveryMode()) .append(";zooKeeperNamespace=") .append(clientInfo.getZooKeeperNamespace()) .append(";auth=none"); } String url = sBuilder.toString();
以上是通过ZooKeeper的方式访问Hive。若直连HiveServer的方式访问Hive,需按如下方式拼接JDBC URL,并将hiveclient.properties文件中的zk.quorum配置项的端口改为10000。
// 拼接JDBC URL StringBuilder sBuilder = new StringBuilder( "jdbc:hive2://").append(clientInfo.getZkQuorum()).append("/"); if (isSecurityMode) { // 安全模式 // ZooKeeper登录认证 sBuilder.append(";sasl.qop=") .append(clientInfo.getSaslQop()) .append(";auth=") .append(clientInfo.getAuth()) .append(";principal=") .append(clientInfo.getPrincipal()) .append(";"); } else { // 普通模式 sBuilder.append(";auth=none"); } String url = sBuilder.toString();
注:直连HiveServer时,若当前连接的HiveServer故障则会导致访问Hive失败;若使用ZooKeeper的访问Hive,只要有任一个HiveServer实例可正常提供服务即可。因此使用JDBC时建议通过ZooKeeper的方式访问Hive。
- 加载Hive JDBC驱动。
// 加载Hive JDBC驱动 Class.forName(HIVE_DRIVER);
- 填写正确的用户名,获取JDBC连接,确认HQL的类型(DDL/DML),调用对应的接口执行HiveQL,输出查询的列名和结果到控制台,关闭JDBC连接。
Connection connection = null; try { // 获取JDBC连接 // 第二个参数需要填写正确的用户名,否则会以匿名用户(anonymous)登录 connection = DriverManager.getConnection(url, "userName", ""); // 建表 // 表建完之后,如果要往表中导数据,可以使用LOAD语句将数据导入表中,比如从HDFS上将数据导入表: //load data inpath '/tmp/employees.txt' overwrite into table employees_info; execDDL(connection,sqls[0]); System.out.println("Create table success!"); // 查询 execDML(connection,sqls[1]); // 删表 execDDL(connection,sqls[2]); System.out.println("Delete table success!"); } finally { // 关闭JDBC连接 if (null != connection) { connection.close(); } public static void execDDL(Connection connection, String sql) throws SQLException { PreparedStatement statement = null; try { statement = connection.prepareStatement(sql); statement.execute(); } finally { if (null != statement) { statement.close(); } } } public static void execDML(Connection connection, String sql) throws SQLException { PreparedStatement statement = null; ResultSet resultSet = null; ResultSetMetaData resultMetaData = null; try { // 执行HQL statement = connection.prepareStatement(sql); resultSet = statement.executeQuery(); // 输出查询的列名到控制台 resultMetaData = resultSet.getMetaData(); int columnCount = resultMetaData.getColumnCount(); for (int i = 1; i <= columnCount; i++) { System.out.print(resultMetaData.getColumnLabel(i) + '\t'); } System.out.println(); // 输出查询结果到控制台 while (resultSet.next()) { for (int i = 1; i <= columnCount; i++) { System.out.print(resultSet.getString(i) + '\t'); } System.out.println(); } } finally { if (null != resultSet) { resultSet.close(); } if (null != statement) { statement.close(); } } }
- 定义HiveQL。HiveQL必须为单条语句,注意HiveQL不能包含“;”。
- 使用HCatalog接口提交数据分析任务,参考样例程序中的HCatalogExample.java。
- 编写Map类,从Hive的表中获取数据。
public static class Map extends Mapper<LongWritable, HCatRecord, IntWritable, IntWritable> { int age; @Override protected void map( LongWritable key, HCatRecord value, Context context) throws IOException, InterruptedException { age = (Integer) value.get(0); context.write(new IntWritable(age), new IntWritable(1)); } }
- 编写Reduce类,对从Hive表中读取到的数据进行统计。
public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, HCatRecord> { @Override protected void reduce( IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; Iterator<IntWritable> iter = values.iterator(); while (iter.hasNext()) { sum++; iter.next(); } HCatRecord record = new DefaultHCatRecord(2); record.set(0, key.get()); record.set(1, sum); context.write(null, record); } }
- 在run()方法中配置job后,执行main()方法,提交任务。
public int run(String[] args) throws Exception { HiveConf.setLoadMetastoreConfig(true); Configuration conf = getConf(); String[] otherArgs = args; String inputTableName = otherArgs[0]; String outputTableName = otherArgs[1]; String dbName = "default"; @SuppressWarnings("deprecation") Job job = new Job(conf, "GroupByDemo"); HCatInputFormat.setInput(job, dbName, inputTableName); job.setInputFormatClass(HCatInputFormat.class); job.setJarByClass(HCatalogExample.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(WritableComparable.class); job.setOutputValueClass(DefaultHCatRecord.class); OutputJobInfo outputjobInfo = OutputJobInfo.create(dbName,outputTableName, null); HCatOutputFormat.setOutput(job, outputjobInfo); HCatSchema schema = outputjobInfo.getOutputSchema(); HCatOutputFormat.setSchema(job, schema); job.setOutputFormatClass(HCatOutputFormat.class); return (job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new HCatalogExample(), args); System.exit(exitCode); }
- 编写Map类,从Hive的表中获取数据。