Updated on 2022-06-01 GMT+08:00

Rules

Inherit the Mapper Abstract Class

The map() and setup() methods are called during the Map procedure of a MapReduce job.

Correct example:

public static class MapperClass extends

Mapper<Object, Text, Text, IntWritable> {
/**
* map input. The key indicates the offset of the original file, and the value is a row of characters in the original file.
* The map input key and value are provided by InputFormat. You do not need to set them. By default, *TextInputFormat is used.
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// Custom implementation
}
/**
* The setup() method is called only once before the map() method of a map task or the reduce() method of a reduce task is called.
*/
public void setup(Context context) throws IOException,
InterruptedException {
// Custom implementation
}
}

Inherit the Reducer Abstract Class

The reduce() and setup() methods are called during the Reduce procedure of a MapReduce job.

Correct example:

public static class ReducerClass extends

Reducer<Text, IntWritable, Text, IntWritable> {

/**
* @param The input is a collection iterator consisting of key-value pairs
* Each map puts together all the pairs with the same key. The reduce method sums the number of the same keys.
* Call context.write(key, value) to write the output to the specified directory. 
* Outputformat writes the (key, value) pairs outputted by reduce to the file system. 
* By default, TextOutputFormat is used to write the reduce output to HDFS.
*/

public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// Custom implementation
}

/**
* The setup() method is called only once before the map() method of a map task or the reduce() method of a reduce task is called.
*/

public void setup(Context context) throws IOException,
InterruptedException {

// Custom implementation. Context obtains the configuration information

}
}

Submit a MapReduce Job

Use the main() method to create a job, set parameters, and submit the job to the Hadoop cluster.

Correct example:

public static void main(String[] args) throws Exception {
Configuration conf = getConfiguration();
// Input parameters for the main method: args[0] indicates the input path of the MapReduce job. args[1] indicates the output path of the MapReduce job.
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "job name");
// Set the JAR file where the main task is located.
job.setJar("D:\\job-examples.jar");
// job.setJarByClass(TestWordCount.class);
// Set map and reduce classes to be executed, or specify the map and reduce classes using configuration files.
job.setMapperClass(TokenizerMapperV1.class);
job.setReducerClass(IntSumReducerV1.class);
// Set the Combiner class. By default, it is not used. If it is used, it runs the same classes as reduce. Exercise caution when using the Combiner class. You can also specify the combiner class in the configuration file. 
job.setCombinerClass(IntSumReducerV1.class);
// Set the output type of the job. You can also specify it in the configuration file. 
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Set the input and output paths for the job. You can also specify them in the configuration file.
Path outputPath = new Path(otherArgs[1]);
FileSystem fs = outputPath.getFileSystem(conf);
// If the output path already exists, delete it.
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

Do Not Include Resource-Consuming Operations in the map or reduce Function

Do not include resource-consuming operations, such as creating database links and opening and closing files, in the map or reduce function.

Multithread Security Login Mode

If multiple threads are performing login operations, the relogin mode must be used for the subsequent logins of all threads after the first successful login of an application.

Login sample code

  private Boolean login(Configuration conf){
    boolean flag = false;
    UserGroupInformation.setConfiguration(conf);
    
    try {
      UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB));
      System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased());
      flag = true;
    } catch (IOException e) {
      e.printStackTrace();
    }
    return flag;
  }

Relogin sample code

public Boolean relogin(){
        boolean flag = false;
        try {
            
          UserGroupInformation.getLoginUser().reloginFromKeytab();
          System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased());
          flag = true;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return flag;
    }