Example Code
Function
Collect statistics on female netizens who dwell on online shopping for more than 2 hours at a weekend.
The operation is performed in three steps:
- Filter the online time of female netizens in original files using the CollectionMapper class inherited from the Mapper abstract class.
- Count the online time of each female netizen, and output information about female netizens who dwell online for more than 2 hours using the CollectionReducer class inherited from the Reducer abstract class.
- The main method creates a MapReduce job and submits the MapReduce job to the Hadoop cluster.
Example Code
The following code snippets are used as an example. For complete code, see the com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector class.
Example 1: The CollectionMapper class defines the map() and setup() methods of the Mapper abstract class.
public static class CollectionMapper extends Mapper<Object, Text, Text, IntWritable> { // Delimiter. String delim; // Filter sex. String sexFilter; // Name. private Text nameInfo = new Text(); // Output <key,value> must be serialized. private IntWritable timeInfo = new IntWritable(1); /** * Distributed computing * * @param key Object: location offset of the source file. * @param value Text: a row of characters in the source file. * @param context Context: output parameter. * @throws IOException , InterruptedException */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if (line.contains(sexFilter)) { // A character string that has been read. String name = line.substring(0, line.indexOf(delim)); nameInfo.set(name); // Obtain the dwell duration. String time = line.substring(line.lastIndexOf(delim) + 1, line.length()); timeInfo.set(Integer.parseInt(time)); // The Map task outputs a key-value pair. context.write(nameInfo, timeInfo); } } /** * map use to init. * * @param context Context. */ public void setup(Context context) throws IOException, InterruptedException { // Obtain configuration information using Context. delim = context.getConfiguration().get("log.delimiter", ","); sexFilter = delim + context.getConfiguration() .get("log.sex.filter", "female") + delim; } }
Example 2: The CollectionReducer class defines the reduce() method of the Reducer abstract class.
public static class CollectionReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // Statistical results. private IntWritable result = new IntWritable(); // Total time threshold. private int timeThreshold; /** * @param key Text : key after Mapper. * @param values Iterable : all statistical results with the same key. * @param context Context * @throws IOException , InterruptedException */ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } // No results are output if the time is less than the threshold. if (sum < timeThreshold) { return; } result.set(sum); // In the output information, key indicates netizen information, and value indicates the total online time of the netizen. context.write(key, result); } /** * The setup() method is invoked for only once before the map() method or reduce() method. * * @param context Context * @throws IOException , InterruptedException */ public void setup(Context context) throws IOException, InterruptedException { // Context obtains configuration information. timeThreshold = context.getConfiguration().getInt( "log.time.threshold", 120); } }
Example 3: Use the main() method to create a job, set parameters, and submit the job to the hadoop cluster.
public static void main(String[] args) throws Exception { // Initialize environment variables. Configuration conf = new Configuration(); // Security login. LoginUtil.login(PRINCIPAL, KEYTAB, KRB, conf); // Obtain input parameters. String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: collect female info <in> <out>"); System.exit(2); } // Initialize the job object. @SuppressWarnings("deprecation") Job job = new Job(conf, "Collect Female Info"); job.setJarByClass(FemaleInfoCollector.class); // Set map and reduce classes to be executed, or specify the map and reduce classes using configuration files. job.setMapperClass(CollectionMapper.class); job.setReducerClass(CollectionReducer.class); // Set the Combiner class. The combiner class is not used by default. Classes same as the reduce class are used. // Exercise caution when using the Combiner class. You can specify it using configuration files. job.setCombinerClass(CollectionReducer.class); // Set the output type of the job. job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // Submit the job to a remote environment for execution. System.exit(job.waitForCompletion(true) ? 0 : 1); }
Example 4: CollectionCombiner class combines the mapped data on the map side to reduce the amount of data transmitted from map to reduce.
/** * Combiner class */ public static class CollectionCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { // Intermediate statistical results private IntWritable intermediateResult = new IntWritable(); /** * @param key Text : key after Mapper * @param values Iterable : all results with the same key in this map task * @param context Context * @throws IOException , InterruptedException */ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } intermediateResult.set(sum); // In the output information, key indicates netizen information, // and value indicates the total online time of the netizen in this map task. context.write(key, intermediateResult); } }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot