Example Code
Function
Collect statistics on female netizens who dwell on online shopping for more than 2 hours at a weekend.
The operation is performed in three steps:
- Filter the online time of female netizens in original files using the CollectionMapper class inherited from the Mapper abstract class.
- Count the online time of each female netizen, and output information about female netizens who dwell online for more than 2 hours using the CollectionReducer class inherited from the Reducer abstract class.
- The main method creates a MapReduce job and submits the MapReduce job to the Hadoop cluster.
Example Code
The following code snippets are used as an example. For complete code, see the com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector class.
Example 1: The CollectionMapper class defines the map() and setup() methods of the Mapper abstract class.
  public static class CollectionMapper extends
          Mapper<Object, Text, Text, IntWritable> {
    // Delimiter.
    String delim;
    // Filter sex. 
    String sexFilter;
    // Name.
    private Text nameInfo = new Text();
    // Output <key,value> must be serialized. 
    private IntWritable timeInfo = new IntWritable(1);
    /**
    * Distributed computing 
    * 
    * @param key Object: location offset of the source file.
    * @param value Text: a row of characters in the source file.
    * @param context Context: output parameter.
    * @throws IOException , InterruptedException 
    */ 
       public void map(Object key, Text value, Context context)
               throws IOException, InterruptedException 
       {
           String line = value.toString();
           if (line.contains(sexFilter)) 
           {
              // A character string that has been read.
              String name = line.substring(0, line.indexOf(delim));
              nameInfo.set(name); 
              // Obtain the dwell duration. 
              String time = line.substring(line.lastIndexOf(delim) + 1, 
                            line.length());
              timeInfo.set(Integer.parseInt(time));
              // The Map task outputs a key-value pair.
              context.write(nameInfo, timeInfo);
           }
       }
        /**
      * map use to init.
      * 
      * @param context Context.
      */
        public void setup(Context context) throws IOException,
                InterruptedException 
    {
              // Obtain configuration information using Context.
       delim = context.getConfiguration().get("log.delimiter", ",");
           
 sexFilter = delim
                                 + context.getConfiguration()
                                         .get("log.sex.filter", "female") + delim;
        }
    }
  Example 2: The CollectionReducer class defines the reduce() method of the Reducer abstract class.
    public static class CollectionReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> 
{
    // Statistical results.
        private IntWritable result = new IntWritable();
        // Total time threshold.
        private int timeThreshold;
     /**
      * @param key Text : key after Mapper.
      * @param values Iterable : all statistical results with the same key.
      * @param context Context
      * @throws IOException , InterruptedException
      */
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException
 {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            // No results are output if the time is less than the threshold.
            if (sum < timeThreshold) 
       {
                return;
            }
            result.set(sum);
            // In the output information, key indicates netizen information, and value indicates the total online time of the netizen.
            context.write(key, result);
        }
        /**
         * The setup() method is invoked for only once before the map() method or reduce() method.
     * 
     * @param context Context
     * @throws IOException , InterruptedException
     */
        public void setup(Context context) throws IOException,
                InterruptedException 
    {
            // Context obtains configuration information.
            timeThreshold = context.getConfiguration().getInt(
                    "log.time.threshold", 120);
        }
    }
  Example 3: Use the main() method to create a job, set parameters, and submit the job to the hadoop cluster.
  public static void main(String[] args) throws Exception {
    // Initialize environment variables.
    Configuration conf = new Configuration();
    // Security login.
    LoginUtil.login(PRINCIPAL, KEYTAB, KRB, conf);
    // Obtain input parameters.
    String[] otherArgs = new GenericOptionsParser(conf, args)
        .getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: collect female info <in> <out>");
      System.exit(2);
    }
    // Initialize the job object.
    @SuppressWarnings("deprecation")
    Job job = new Job(conf, "Collect Female Info");
    job.setJarByClass(FemaleInfoCollector.class);
    // Set map and reduce classes to be executed, or specify the map and reduce classes using configuration files.
    job.setMapperClass(CollectionMapper.class);
    job.setReducerClass(CollectionReducer.class);
    // Set the Combiner class. The combiner class is not used by default. Classes same as the reduce class are used.
    // Exercise caution when using the Combiner class. You can specify it using configuration files.
    job.setCombinerClass(CollectionReducer.class);
    // Set the output type of the job.
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    // Submit the job to a remote environment for execution.
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
  Example 4: CollectionCombiner class combines the mapped data on the map side to reduce the amount of data transmitted from map to reduce.
  /**
   * Combiner class
   */
  public static class CollectionCombiner extends
  Reducer<Text, IntWritable, Text, IntWritable> {
  
  // Intermediate statistical results
  private IntWritable intermediateResult = new IntWritable();
  
  /**
   * @param key     Text : key after Mapper
   * @param values  Iterable : all results with the same key in this map task
   * @param context Context
   * @throws IOException , InterruptedException
   */
  public void reduce(Text key, Iterable<IntWritable> values,
  Context context) throws IOException, InterruptedException {
  int sum = 0;
  for (IntWritable val : values) {
  sum += val.get();
  }
  
  intermediateResult.set(sum);
  
  // In the output information, key indicates netizen information, 
  // and value indicates the total online time of the netizen in this map task.
  context.write(key, intermediateResult);
  }
  
  }
 Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.
 
    