Updated on 2022-06-01 GMT+08:00

Examples

Collect Statistics on Female Netizens Who Dwell on Online Shopping for More Than 2 Hours on the Weekend

The operation is performed in three steps.

  1. Filter the dwell duration of female netizens in log files using the MapperClass inherited from the Mapper abstract class.
  2. Calculate the dwell duration of each female netizen and output information about the female netizens who dwell online for more than 2 hours using the ReducerClass inherited from the Reducer abstract class.
  3. Use the main method to create a MapReduce job and then submit the MapReduce job to the Hadoop cluster.

Example 1: Use MapperClass to define the map() and setup() methods of the Mapper abstract class.

public static class MapperClass extends

Mapper<Object, Text, Text, IntWritable> {
// Delimiter
String delim;
// Gender screening
String sexFilter;
private final static IntWritable timeInfo = new IntWritable(1);
private Text nameInfo = new Text();
/**
* map input. The key indicates the offset of the original file, and the value is a row of characters in the original file.
* The map input key and value are provided by InputFormat. You do not need to set them. By default, TextInputFormat is used.
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// A line of character string data has been read.
String line = value.toString();
if (line.contains(sexFilter)) {
// Obtain the name.
String name = line.substring(0, line.indexOf(delim));
nameInfo.set(name);
// Obtain the dwell duration.
String time = line.substring(line.lastIndexOf(delim),
line.length());
timeInfo.set(Integer.parseInt(time));
// The Map task outputs a key-value pair.
context.write(nameInfo, timeInfo);
}
}
/**
* The setup() method is called only once before the map() method of a map task or the reduce() method of a reduce task is called.
*/
public void setup(Context context) throws IOException,
InterruptedException {
// Obtain configuration information using Context.
sexFilter = delim + context.getConfiguration().get("log.sex.filter", "female") + delim;
}
}

Example 2: Use ReducerClass to define the reduce() method of the Reducer abstract class.

public static class ReducerClass extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
// Total time threshold
private int timeThreshold;
/**
* @param The input is a collection iterator consisting of key-value pairs
* Each map puts together all the pairs with the same key. The reduce method sums the number of the same keys.
* Call context.write(key, value) to write the output to the specified directory. 
* Outputformat writes the (key, value) pairs outputted by reduce to the file system. 
* By default, TextOutputFormat is used to write the reduce output to HDFS.
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// No results are outputted if the time is less than the threshold.
if (sum < timeThreshold) {
return;
}
result.set(sum);
// In the reduce output, key indicates netizen information, and value indicates the total dwell duration of the netizen.
context.write(key, result);
}
/**
* The setup() method is called only once before the map() method of a map task or the reduce() method of a reduce task is called.
*/
public void setup(Context context) throws IOException,
InterruptedException {
// Obtain configuration information using Context.
timeThreshold = context.getConfiguration().getInt(
"log.time.threshold", 120);
}
}

Example 3: Use the main() method to create a job, set parameters, and submit the job to the Hadoop cluster.

public static void main(String[] args) throws Exception {
Configuration conf = getConfiguration();
// Input parameters for the main method: args[0] indicates the input path of the MapReduce job. args[1] indicates the output path of the MapReduce job.
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "Collect Female Info");
// Set the JAR file where the main task is located.
job.setJar("D:\\mapreduce-examples\\hadoop-mapreduce-examples\\mapreduce-examples.jar");
// job.setJarByClass(TestWordCount.class);
// Set map and reduce classes to be executed, or specify the map and reduce classes using configuration files.
job.setMapperClass(TokenizerMapperV1.class);
job.setReducerClass(IntSumReducerV1.class);
// Set the Combiner class. By default, it is not used. If it is used, it runs the same classes as reduce. Exercise caution when using the Combiner class. You can also specify the combiner class in the configuration file. 
job.setCombinerClass(IntSumReducerV1.class);
// Set the output type of the job. You can also specify it in the configuration file. 
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// Set the input and output paths for the job. You can also specify them in the configuration file.
Path outputPath = new Path(otherArgs[1]);
FileSystem fs = outputPath.getFileSystem(conf);
// If the output path already exists, delete it.
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)   0 : 1);
}