Updated on 2022-08-16 GMT+08:00

Examples

Count the number of female netizens who dwell on online shopping for more than 2 hours at a weekend.

The operation involves three steps:

  1. Filter the online time of female netizens in log files using the MapperClass inherited from the Mapper abstract class.
  2. Calculate the online time of each female netizen and output information about the female netizens who dwell online for more than 2 hours using the ReducerClass inherited from the Reducer abstract class.
  3. Use the main method to create a MapReduce job and then submit the MapReduce job to the Hadoop cluster.

Step 1: Use MapperClass to define the map() and setup() methods of the Mapper abstract class.

public static class MapperClass extends

Mapper<Object, Text, Text, IntWritable> {
// Separator
String delim;
// Filter the sex.
String sexFilter;
private final static IntWritable timeInfo = new IntWritable(1);
private Text nameInfo = new Text();
/**
* map input. The key indicates the offset of the original file, and the value is a row of characters in the original file.
* The map input key and value are provided by InputFormat. You do not need to set them. By default, TextInputFormat is used.
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// A row of characters read.
String line = value.toString();
if (line.contains(sexFilter)) {
// Obtain the names.
String name = line.substring(0, line.indexOf(delim));
nameInfo.set(name);
// Obtain information about the online time. 
String time = line.substring(line.lastIndexOf(delim),
line.length());
timeInfo.set(Integer.parseInt(time));
// map outputs (key, value) pairs.
context.write(nameInfo, timeInfo);
}
}
/**
* The setup() method is called only once before the map() method of a map task or the reduce() method of a reduce task is called.
*/
public void setup(Context context) throws IOException,
InterruptedException {
// Obtain configuration information using Context.
sexFilter = delim + context.getConfiguration().get("log.sex.filter", "female") + delim;
}
}

Step 2: Use CReducerClass to define the reduce() method of the Reducer abstract class.

public static class ReducerClass extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
// Total time limit.
private int timeThreshold;
/**
* @param The input is a collection iterator consisting of (key, value) pairs.
* Each map puts together all the pairs with the same key. The reduce method sums the number of the same keys.
* Call context.write(key, value) to write the output to the specified directory.
*  Outputformat writes the (key, value) pairs output by reduce to the file system.
* By default, TextOutputFormat is used to write the reduce output to the HDFS.
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// If the time is smaller than the time limit, no information will be returned.
if (sum < timeThreshold) {
return;
}
result.set(sum);
// reduce output: key: female netizen information, value: online time
context.write(key, result);
}
/**
* The setup() method is called only once before the map() method of a map task or the reduce() method of a reduce task is called.
*/
public void setup(Context context) throws IOException,
InterruptedException {
// Obtain configuration information using Context.
timeThreshold = context.getConfiguration().getInt(
"log.time.threshold", 120);
}
}

Step 3: Use the main() method to create a job, set parameters, and submit the job to the Hadoop cluster.

public static void main(String[] args) throws Exception {
Configuration conf = getConfiguration();
// Input parameters for the main method: args[0] indicates the input path of the MR job. args[1] indicates the output path of the MR job.
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Collect Female Info");
// Locate the jar package of the major task.
job.setJar("D:\\mapreduce-examples\\hadoop-mapreduce-examples\\mapreduce-examples.jar");
// job.setJarByClass(TestWordCount.class);
// Set the map and reduce classes to be executed. You can also specify them in the configuration file.
job.setMapperClass(TokenizerMapperV1.class);
job.setReducerClass(IntSumReducerV1.class);
// Set the combiner class. By default, it is not used. If it is used, it runs the same classes as reduce. Exercise care when using the Combiner class. You can also specify the combiner class in the configuration file.
job.setCombinerClass(IntSumReducerV1.class);
// Set the output type of the job. You can also specify it in the configuration file.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Set the input and output paths for the job. You can also specify them in the configuration file.
Path outputPath = new Path(otherArgs[1]);
FileSystem fs = outputPath.getFileSystem(conf);
// If the output path already exists, delete it.
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)   0 : 1);
}