更新时间:2024-10-31 GMT+08:00

MapReduce统计样例代码

功能介绍

统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。

主要分为三个部分:

  • 从原文件中筛选女性网民上网时间数据信息,通过类CollectionMapper继承Mapper抽象类实现。
  • 汇总每个女性上网时间,并输出时间大于两个小时的女性网民信息,通过类CollectionReducer继承Reducer抽象类实现。
  • main方法提供建立一个MapReduce job,并提交MapReduce作业到Hadoop集群。

代码样例

下面代码片段仅为演示,具体代码参见com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector类:

样例1:类CollectionMapper定义Mapper抽象类的map()方法和setup()方法。

    public static class CollectionMapper extends
            Mapper<Object, Text, Text, IntWritable> {

    // 分隔符。
    String delim;
    // 性别筛选。
    String sexFilter;

    // 姓名信息。
    private Text nameInfo = new Text();

    // 输出的key,value要求是序列化的。
    private IntWritable timeInfo = new IntWritable(1);

     /**
     * 分布式计算
     * 
     * @param key Object : 原文件位置偏移量。
     * @param value Text : 原文件的一行字符数据。
     * @param context Context : 出参。
     * @throws IOException , InterruptedException
     */
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException
        {

            String line = value.toString();

            if (line.contains(sexFilter))
            {

                     // 读取的一行字符串数据。
                       String name = line.substring(0, line.indexOf(delim));
                       nameInfo.set(name);
                     // 获取上网停留时间。
                       String time = line.substring(line.lastIndexOf(delim) + 1,
                                     line.length());
                        timeInfo.set(Integer.parseInt(time));

                     // map输出key,value键值对。
                context.write(nameInfo, timeInfo);
            }
        }

        /**
      * map调用,做一些初始工作。
      * 
      * @param context Context
      */
        public void setup(Context context) throws IOException,
                InterruptedException 
    {

                      // 通过Context可以获得配置信息。
       delim = context.getConfiguration().get("log.delimiter", ",");

            sexFilter = delim
                                 + context.getConfiguration()
                                         .get("log.sex.filter", "female") + delim;
        }
    }

样例2:类CollectionReducer定义Reducer抽象类的reduce()方法。

    public static class CollectionReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> 
{

    // 统计结果。
        private IntWritable result = new IntWritable();

        // 总时间门槛。
        private int timeThreshold;

     /**
      * @param key Text : Mapper后的key项。
      * @param values Iterable : 相同key项的所有统计结果。
      * @param context Context
      * @throws IOException , InterruptedException
      */
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException
 {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }

            // 如果时间小于门槛时间,不输出结果。
            if (sum < timeThreshold) 
       {
                return;
            }
            result.set(sum);

            // reduce输出为key:网民的信息,value:该网民上网总时间。
            context.write(key, result);
        }

        /**
         * setup()方法只在进入map任务的map()方法之前或者reduce任务的reduce()方法之前调用一次。
     * 
     * @param context Context
     * @throws IOException , InterruptedException
     */
        public void setup(Context context) throws IOException,
                InterruptedException 
    {

            // Context可以获得配置信息。
            timeThreshold = context.getConfiguration().getInt(
                    "log.time.threshold", 120);
        }
    }

样例3:main()方法创建一个job,指定参数,提交作业到hadoop集群。

  public static void main(String[] args) throws Exception {
    // 初始化环境变量。
    Configuration conf = new Configuration();

    // 安全登录。
    LoginUtil.login(PRINCIPAL, KEYTAB, KRB, conf);

    // 获取入参。
    String[] otherArgs = new GenericOptionsParser(conf, args)
        .getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: collect female info <in> <out>");
      System.exit(2);
    }

    // 初始化Job任务对象。
    @SuppressWarnings("deprecation")
    Job job = new Job(conf, "Collect Female Info");
    job.setJarByClass(FemaleInfoCollector.class);

    // 设置运行时执行map,reduce的类,也可以通过配置文件指定。
    job.setMapperClass(CollectionMapper.class);
    job.setReducerClass(CollectionReducer.class);

    // 设置combiner类,默认不使用,使用时通常使用和reduce一样的类。
    // Combiner类需要谨慎使用,也可以通过配置文件指定。
    job.setCombinerClass(CollectionReducer.class);

    // 设置作业的输出类型。
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    // 提交任务交到远程环境上执行。
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

样例4:类CollectionCombiner实现了在map端先合并一下map输出的数据,减少map和reduce之间传输的数据量。

  /**
   * Combiner class
   */
  public static class CollectionCombiner extends
  Reducer<Text, IntWritable, Text, IntWritable> {
  
  // Intermediate statistical results
  private IntWritable intermediateResult = new IntWritable();
  
  /**
   * @param key     Text : key after Mapper
   * @param values  Iterable : all results with the same key in this map task
   * @param context Context
   * @throws IOException , InterruptedException
   */
  public void reduce(Text key, Iterable<IntWritable> values,
  Context context) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
  sum += val.get();
  }
  
  intermediateResult.set(sum);
  
  // In the output information, key indicates netizen information, 
  // and value indicates the total online time of the netizen in this map task.
  context.write(key, intermediateResult);
  }
  
  }