更新时间:2024-08-05 GMT+08:00

HCatalog访问Hive样例程序

功能介绍

本章节介绍如何在MapReduce任务中使用HCatalog分析Hive表数据,读取输入表第一列int类型数据执行count(distinct XX)操作,将结果写入输出表。

样例代码

该样例程序在“hive-examples/hcatalog-example”的“HCatalogExample.java”中,实现该功能的模块如下:

  1. 实现Mapper类,通过HCatRecord获取第一列int类型数据,计数1并输出;
    public static class Map extends
            Mapper<LongWritable, HCatRecord, IntWritable, IntWritable> {
        int age;
        @Override
        protected void map(
                LongWritable key,
                HCatRecord value,
                Mapper<LongWritable, HCatRecord,
                        IntWritable, IntWritable>.Context context)
                throws IOException, InterruptedException {
            if ( value.get(0) instanceof Integer ) {
                age = (Integer) value.get(0);
            }
            context.write(new IntWritable(age), new IntWritable(1));
        }
    }
  2. 实现Reducer类,将map输出结果合并计数,统计不重复的值出现次数,使用HCatRecord输出结果;
    public static class Reduce extends Reducer<IntWritable, IntWritable,
            IntWritable, HCatRecord> {
        @Override
        protected void reduce(
                IntWritable key,
                Iterable<IntWritable> values,
                Reducer<IntWritable, IntWritable,
                        IntWritable, HCatRecord>.Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            Iterator<IntWritable> iter = values.iterator();
            while (iter.hasNext()) {
                sum++;
                iter.next();
            }
            HCatRecord record = new DefaultHCatRecord(2);
            record.set(0, key.get());
            record.set(1, sum);
            context.write(null, record);
        }
    }
  3. MapReduce任务定义,指定输入/输出类,Mapper/Recducer类,输入输出键值对格式;
    Job job = new Job(conf, "GroupByDemo");
    HCatInputFormat.setInput(job, dbName, inputTableName);
    job.setInputFormatClass(HCatInputFormat.class);
    job.setJarByClass(HCatalogExample.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(WritableComparable.class);
    job.setOutputValueClass(DefaultHCatRecord.class);
    String outputTableName = otherArgs[1];
    OutputJobInfo outputjobInfo = OutputJobInfo.create(dbName, outputTableName, null);
    HCatOutputFormat.setOutput(job, outputjobInfo);
    HCatSchema schema = outputjobInfo.getOutputSchema();
    HCatOutputFormat.setSchema(job, schema);
    job.setOutputFormatClass(HCatOutputFormat.class);