Example Codes
Function
Collect statistics on female netizens who dwell on online shopping for more than 2 hours at a weekend.
The operation is performed in three steps:
- Filter the online time of female netizens in original files using the CollectionMapper class inherited from the Mapper abstract class.
- Count the online time of each female netizen, and output information about female netizens who dwell online for more than 2 hours using the CollectionReducer class inherited from the Reducer abstract class.
- The main method creates a MapReduce job and submits the MapReduce job to the Hadoop cluster.
Example Codes
The following code snippets are used as an example. For complete codes, see the com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector class.
Example 1: The CollectionMapper class defines the map() and setup() methods of the Mapper abstract class.
public static class CollectionMapper extends
Mapper<Object, Text, Text, IntWritable> {
// Delimiter.
String delim;
// Filter sex.
String sexFilter;
// Name.
private Text nameInfo = new Text();
// Output <key,value> must be serialized.
private IntWritable timeInfo = new IntWritable(1);
/**
* Distributed computing
*
* @param key Object: location offset of the source file.
* @param value Text: a row of characters in the source file.
* @param context Context: output parameter.
* @throws IOException , InterruptedException
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException
{
String line = value.toString();
if (line.contains(sexFilter))
{
// A character string that has been read.
String name = line.substring(0, line.indexOf(delim));
nameInfo.set(name);
// Obtain the dwell duration.
String time = line.substring(line.lastIndexOf(delim) + 1,
line.length());
timeInfo.set(Integer.parseInt(time));
// The Map task outputs a key-value pair.
context.write(nameInfo, timeInfo);
}
}
/**
* map use to init.
*
* @param context Context.
*/
public void setup(Context context) throws IOException,
InterruptedException
{
// Obtain configuration information using Context.
delim = context.getConfiguration().get("log.delimiter", ",");
sexFilter = delim
+ context.getConfiguration()
.get("log.sex.filter", "female") + delim;
}
}
Example 2: The CollectionReducer class defines the reduce() method of the Reducer abstract class.
public static class CollectionReducer extends
Reducer<Text, IntWritable, Text, IntWritable>
{
// Statistical results.
private IntWritable result = new IntWritable();
// Total time threshold.
private int timeThreshold;
/**
* @param key Text : key after Mapper.
* @param values Iterable : all statistical results with the same key.
* @param context Context
* @throws IOException , InterruptedException
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// No results are output if the time is less than the threshold.
if (sum < timeThreshold)
{
return;
}
result.set(sum);
// In the output information, key indicates netizen information, and value indicates the total online time of the netizen.
context.write(key, result);
}
/**
* The setup() method is invoked for only once before the map() method or reduce() method.
*
* @param context Context
* @throws IOException , InterruptedException
*/
public void setup(Context context) throws IOException,
InterruptedException
{
// Context obtains configuration information.
timeThreshold = context.getConfiguration().getInt(
"log.time.threshold", 120);
}
}
Example 3: Use the main() method to create a job, set parameters, and submit the job to the hadoop cluster.
public static void main(String[] args) throws Exception {
// Initialize environment variables.
Configuration conf = new Configuration();
// Obtain input parameters.
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: collect female info <in> <out>");
System.exit(2);
}
// Initialize the job object.
@SuppressWarnings("deprecation")
Job job = Job.getInstance(conf, "Collect Female Info");
job.setJarByClass(FemaleInfoCollector.class);
// Set map and reduce classes to be executed, or specify the map and reduce classes using configuration files.
job.setMapperClass(CollectionMapper.class);
job.setReducerClass(CollectionReducer.class);
// Set the Combiner class. The combiner class is not used by default. Classes same as the reduce class are used.
// Exercise caution when using the Combiner class. You can specify it using configuration files.
job.setCombinerClass(CollectionCombiner.class);
// Set the output type of the job.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// Submit the job to a remote environment for execution.
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
Example 4: CollectionCombiner class combines the mapped data on the map side to reduce the amount of data transmitted from map to reduce.
/**
* Combiner class
*/
public static class CollectionCombiner extends
Reducer<Text, IntWritable, Text, IntWritable> {
// Intermediate statistical results
private IntWritable intermediateResult = new IntWritable();
/**
* @param key Text : key after Mapper
* @param values Iterable : all results with the same key in this map task
* @param context Context
* @throws IOException , InterruptedException
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
intermediateResult.set(sum);
// In the output information, key indicates netizen information,
// and value indicates the total online time of the netizen in this map task.
context.write(key, intermediateResult);
}
}
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.