MapReduce统计样例代码
功能介绍
统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。
主要分为三个部分:
- 从原文件中筛选女性网民上网时间数据信息,通过类CollectionMapper继承Mapper抽象类实现。
- 汇总每个女性上网时间,并输出时间大于两个小时的女性网民信息,通过类CollectionReducer继承Reducer抽象类实现。
- main方法提供建立一个MapReduce job,并提交MapReduce作业到hadoop集群。
代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector类:
样例1:类CollectionMapper定义Mapper抽象类的map()方法和setup()方法。
public static class CollectionMapper extends
Mapper<Object, Text, Text, IntWritable> {
// 分隔符。
String delim;
// 性别筛选。
String sexFilter;
// 姓名信息。
private Text nameInfo = new Text();
// 输出的key,value要求是序列化的。
private IntWritable timeInfo = new IntWritable(1);
/**
* 分布式计算
*
* @param key Object : 原文件位置偏移量。
* @param value Text : 原文件的一行字符数据。
* @param context Context : 出参。
* @throws IOException , InterruptedException
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException
{
String line = value.toString();
if (line.contains(sexFilter))
{
// 读取的一行字符串数据。
String name = line.substring(0, line.indexOf(delim));
nameInfo.set(name);
// 获取上网停留时间。
String time = line.substring(line.lastIndexOf(delim) + 1,
line.length());
timeInfo.set(Integer.parseInt(time));
// map输出key,value键值对。
context.write(nameInfo, timeInfo);
}
}
/**
* map调用,做一些初始工作。
*
* @param context Context
*/
public void setup(Context context) throws IOException,
InterruptedException
{
// 通过Context可以获得配置信息。
delim = context.getConfiguration().get("log.delimiter", ",");
sexFilter = delim + context.getConfiguration().get("log.sex.filter", "female") + delim;
}
}
样例2:类CollectionReducer定义Reducer抽象类的reduce()方法。
public static class CollectionReducer extends
Reducer<Text, IntWritable, Text, IntWritable>
{
// 统计结果。
private IntWritable result = new IntWritable();
// 总时间门槛。
private int timeThreshold;
/**
* @param key Text : Mapper后的key项。
* @param values Iterable : 相同key项的所有统计结果。
* @param context Context
* @throws IOException , InterruptedException
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// 如果时间小于门槛时间,不输出结果。
if (sum < timeThreshold)
{
return;
}
result.set(sum);
// reduce输出为key:网民的信息,value:该网民上网总时间。
context.write(key, result);
}
/**
* setup()方法只在进入map任务的map()方法之前或者reduce任务的reduce()方法之前调用一次。
*
* @param context Context
* @throws IOException , InterruptedException
*/
public void setup(Context context) throws IOException,
InterruptedException
{
// Context可以获得配置信息。
timeThreshold = context.getConfiguration().getInt(
"log.time.threshold", 120);
}
}
样例3:main()方法创建一个job,指定参数,提交作业到hadoop集群。
public static void main(String[] args) throws Exception {
// 初始化环境变量。
Configuration conf = new Configuration();
// 获取入参。
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: collect female info <in> <out>");
System.exit(2);
}
// 初始化Job任务对象。
Job job = Job.getInstance(conf, "Collect Female Info");
job.setJarByClass(FemaleInfoCollector.class);
// 设置运行时执行map,reduce的类,也可以通过配置文件指定。
job.setMapperClass(CollectionMapper.class);
job.setReducerClass(CollectionReducer.class);
// 设置combiner类,默认不使用,使用时通常使用和reduce一样的类。
// Combiner类需要谨慎使用,也可以通过配置文件指定。
job.setCombinerClass(CollectionCombiner.class);
// 设置作业的输出类型。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
// 提交任务交到远程环境上执行。
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
样例4:类CollectionCombiner实现了在map端先合并一下map输出的数据,减少map和reduce之间传输的数据量。
/**
* Combiner class
*/
public static class CollectionCombiner extends
Reducer<Text, IntWritable, Text, IntWritable> {
// Intermediate statistical results
private IntWritable intermediateResult = new IntWritable();
/**
* @param key Text : key after Mapper
* @param values Iterable : all results with the same key in this map task
* @param context Context
* @throws IOException , InterruptedException
*/
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
intermediateResult.set(sum);
// In the output information, key indicates netizen information,
// and value indicates the total online time of the netizen in this map task.
context.write(key, intermediateResult);
}
}