Example Code
Function
Collect statistics on female netizens who dwell on online shopping for more than 2 hours at a weekend.
The operation is performed in three steps:
- Filter the online time of female netizens in original files using the CollectionMapper class inherited from the Mapper abstract class.
- Count the online time of each female netizen, and output information about female netizens who dwell online for more than 2 hours using the CollectionReducer class inherited from the Reducer abstract class.
- The main method creates a MapReduce job and submits the MapReduce job to the Hadoop cluster.
Example Code
The following code snippets are used as an example. For complete code, see the com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector class.
Example 1: The CollectionMapper class defines the map() and setup() methods of the Mapper abstract class.
public static class CollectionMapper extends Mapper<Object, Text, Text, IntWritable> { // Delimiter. String delim; // Filter sex. String sexFilter; // Name. private Text nameInfo = new Text(); // Output <key,value> must be serialized. private IntWritable timeInfo = new IntWritable(1); /** * Distributed computing * * @param key Object: location offset of the source file. * @param value Text: a row of characters in the source file. * @param context Context: output parameter. * @throws IOException , InterruptedException */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if (line.contains(sexFilter)) { // A character string that has been read. String name = line.substring(0, line.indexOf(delim)); nameInfo.set(name); // Obtain the dwell duration. String time = line.substring(line.lastIndexOf(delim) + 1, line.length()); timeInfo.set(Integer.parseInt(time)); // The Map task outputs a key-value pair. context.write(nameInfo, timeInfo); } } /** * map use to init. * * @param context Context. */ public void setup(Context context) throws IOException, InterruptedException { // Obtain configuration information using Context. delim = context.getConfiguration().get("log.delimiter", ","); sexFilter = delim + context.getConfiguration() .get("log.sex.filter", "female") + delim; } }
Example 2: The CollectionReducer class defines the reduce() method of the Reducer abstract class.
public static class CollectionReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // Statistical results. private IntWritable result = new IntWritable(); // Total time threshold. private int timeThreshold; /** * @param key Text : key after Mapper. * @param values Iterable : all statistical results with the same key. * @param context Context * @throws IOException , InterruptedException */ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } // No results are output if the time is less than the threshold. if (sum < timeThreshold) { return; } result.set(sum); // In the output information, key indicates netizen information, and value indicates the total online time of the netizen. context.write(key, result); } /** * The setup() method is invoked for only once before the map() method or reduce() method. * * @param context Context * @throws IOException , InterruptedException */ public void setup(Context context) throws IOException, InterruptedException { // Context obtains configuration information. timeThreshold = context.getConfiguration().getInt( "log.time.threshold", 120); } }
Example 3: Use the main() method to create a job, set parameters, and submit the job to the hadoop cluster.
public static void main(String[] args) throws Exception { // Initialize environment variables. Configuration conf = new Configuration(); // Security login. LoginUtil.login(PRINCIPAL, KEYTAB, KRB, conf); // Obtain input parameters. String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: collect female info <in> <out>"); System.exit(2); } // Initialize the job object. @SuppressWarnings("deprecation") Job job = new Job(conf, "Collect Female Info"); job.setJarByClass(FemaleInfoCollector.class); // Set map and reduce classes to be executed, or specify the map and reduce classes using configuration files. job.setMapperClass(CollectionMapper.class); job.setReducerClass(CollectionReducer.class); // Set the Combiner class. The combiner class is not used by default. Classes same as the reduce class are used. // Exercise caution when using the Combiner class. You can specify it using configuration files. job.setCombinerClass(CollectionReducer.class); // Set the output type of the job. job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // Submit the job to a remote environment for execution. System.exit(job.waitForCompletion(true) ? 0 : 1); }
Example 4: CollectionCombiner class combines the mapped data on the map side to reduce the amount of data transmitted from map to reduce.
/** * Combiner class */ public static class CollectionCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { // Intermediate statistical results private IntWritable intermediateResult = new IntWritable(); /** * @param key Text : key after Mapper * @param values Iterable : all results with the same key in this map task * @param context Context * @throws IOException , InterruptedException */ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } intermediateResult.set(sum); // In the output information, key indicates netizen information, // and value indicates the total online time of the netizen in this map task. context.write(key, intermediateResult); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.