Updated on 2022-08-16 GMT+08:00

Example Codes

Function

Collect statistics on female netizens who dwell on online shopping for more than 2 hours at a weekend.

The operation is performed in three steps:

  • Filter the online time of female netizens in original files using the CollectionMapper class inherited from the Mapper abstract class.
  • Count the online time of each female netizen, and output information about female netizens who dwell online for more than 2 hours using the CollectionReducer class inherited from the Reducer abstract class.
  • The main method creates a MapReduce job and submits the MapReduce job to the Hadoop cluster.

Example Codes

The following code snippets are used as an example. For complete codes, see the com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector class.

Example 1: The CollectionMapper class defines the map() and setup() methods of the Mapper abstract class.

  public static class CollectionMapper extends
          Mapper<Object, Text, Text, IntWritable> {

    // Delimiter.
    String delim;
    // Filter sex. 
    String sexFilter;

    // Name.
    private Text nameInfo = new Text();

    // Output <key,value> must be serialized. 
    private IntWritable timeInfo = new IntWritable(1);

    /**
    * Distributed computing 
    * 
    * @param key Object: location offset of the source file.
    * @param value Text: a row of characters in the source file.
    * @param context Context: output parameter.
    * @throws IOException , InterruptedException 
    */ 
       public void map(Object key, Text value, Context context)
               throws IOException, InterruptedException 
       {

           String line = value.toString();

           if (line.contains(sexFilter)) 
           {

              // A character string that has been read.
              String name = line.substring(0, line.indexOf(delim));
              nameInfo.set(name); 
              // Obtain the dwell duration. 
              String time = line.substring(line.lastIndexOf(delim) + 1, 
                            line.length());
              timeInfo.set(Integer.parseInt(time));

              // The Map task outputs a key-value pair.
              context.write(nameInfo, timeInfo);
           }
       }
        /**
      * map use to init.
      * 
      * @param context Context.
      */
        public void setup(Context context) throws IOException,
                InterruptedException 
    {

              // Obtain configuration information using Context.
       delim = context.getConfiguration().get("log.delimiter", ",");

           
 sexFilter = delim
                                 + context.getConfiguration()
                                         .get("log.sex.filter", "female") + delim;
        }
    }

Example 2: The CollectionReducer class defines the reduce() method of the Reducer abstract class.

    public static class CollectionReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> 
{

    // Statistical results.
        private IntWritable result = new IntWritable();

        // Total time threshold.
        private int timeThreshold;

     /**
      * @param key Text : key after Mapper.
      * @param values Iterable : all statistical results with the same key.
      * @param context Context
      * @throws IOException , InterruptedException
      */
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException
 {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }

            // No results are output if the time is less than the threshold.
            if (sum < timeThreshold) 
       {
                return;
            }
            result.set(sum);

            // In the output information, key indicates netizen information, and value indicates the total online time of the netizen.
            context.write(key, result);
        }

        /**
         * The setup() method is invoked for only once before the map() method or reduce() method.
     * 
     * @param context Context
     * @throws IOException , InterruptedException
     */
        public void setup(Context context) throws IOException,
                InterruptedException 
    {

            // Context obtains configuration information.
            timeThreshold = context.getConfiguration().getInt(
                    "log.time.threshold", 120);
        }
    }

Example 3: Use the main() method to create a job, set parameters, and submit the job to the hadoop cluster.

  public static void main(String[] args) throws Exception {
    // Initialize environment variables.
    Configuration conf = new Configuration();

    // Obtain input parameters.
    String[] otherArgs = new GenericOptionsParser(conf, args)
        .getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: collect female info <in> <out>");
      System.exit(2);
    }

    // Initialize the job object.
    @SuppressWarnings("deprecation")
    Job job = Job.getInstance(conf, "Collect Female Info");
    job.setJarByClass(FemaleInfoCollector.class);

    // Set map and reduce classes to be executed, or specify the map and reduce classes using configuration files.
    job.setMapperClass(CollectionMapper.class);
    job.setReducerClass(CollectionReducer.class);

    // Set the Combiner class. The combiner class is not used by default. Classes same as the reduce class are used.
    // Exercise caution when using the Combiner class. You can specify it using configuration files.
    job.setCombinerClass(CollectionCombiner.class);

    // Set the output type of the job.
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    // Submit the job to a remote environment for execution.
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

Example 4: CollectionCombiner class combines the mapped data on the map side to reduce the amount of data transmitted from map to reduce.

  /**
   * Combiner class
   */
  public static class CollectionCombiner extends
  Reducer<Text, IntWritable, Text, IntWritable> {
  
  // Intermediate statistical results
  private IntWritable intermediateResult = new IntWritable();
  
  /**
   * @param key     Text : key after Mapper
   * @param values  Iterable : all results with the same key in this map task
   * @param context Context
   * @throws IOException , InterruptedException
   */
  public void reduce(Text key, Iterable<IntWritable> values,
  Context context) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
  sum += val.get();
  }
  
  intermediateResult.set(sum);
  
  // In the output information, key indicates netizen information, 
  // and value indicates the total online time of the netizen in this map task.
  context.write(key, intermediateResult);
  }
  
  }