MapReduce统计样例程序开发思路
场景说明
假定用户有某个周末网民网购停留时间的日志文本,基于某些业务要求,要求开发MapReduce应用程序实现如下功能。
- 统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。
- 周末两天的日志文件第一列为姓名,第二列为性别,第三列为本次停留时间,单位为分钟,分隔符为“,”。
log1.txt:周六网民停留日志
LiuYang,female,20 YuanJing,male,10 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
log2.txt:周日网民停留日志
LiuYang,female,20 YuanJing,male,10 CaiXuyu,female,50 FangBo,female,50 GuoYijun,male,5 CaiXuyu,female,50 Liyuan,male,20 CaiXuyu,female,50 FangBo,female,50 LiuYang,female,20 YuanJing,male,10 FangBo,female,50 GuoYijun,male,50 CaiXuyu,female,50 FangBo,female,60
数据规划
首先需要把原日志文件放置在HDFS系统里。
- 本地新建两个文本文件,将log1.txt中的内容复制保存到input_data1.txt,将log2.txt中的内容复制保存到input_data2.txt。
- 在HDFS上建立一个文件夹,“/tmp/input”,并上传input_data1.txt,input_data2.txt到此目录,命令如下。
- 在Linux系统HDFS客户端使用命令hdfs dfs -mkdir /tmp/input
- 在Linux系统HDFS客户端使用命令hdfs dfs -put local_filepath /tmp/input
开发思路
统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。
主要分为四个部分。
- 读取原文件数据。
- 筛选女性网民上网时间数据信息。
- 汇总每个女性上网总时间。
- 筛选出停留总时间大于两个小时的女性网民信息。
功能介绍
统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。
主要分为三个部分。
- 从原文件中筛选女性网民上网时间数据信息,通过类CollectionMapper继承Mapper抽象类实现。
- 汇总每个女性上网时间,并输出时间大于两个小时的女性网民信息,通过类CollectionReducer继承Reducer抽象类实现。
- main方法提供建立一个MapReduce job,并提交MapReduce作业到hadoop集群。
代码样例
下面代码片段仅为演示,具体代码参见com.huawei.bigdata.mapreduce.examples.FemaleInfoCollector类
样例1:类CollectionMapper定义Mapper抽象类的map()方法和setup()方法。
public static class CollectionMapper extends Mapper<Object, Text, Text, IntWritable> { // 分隔符。 String delim; // 性别筛选。 String sexFilter; // 姓名信息。 private Text nameInfo = new Text(); // 输出的key,value要求是序列化的。 private IntWritable timeInfo = new IntWritable(1); /** * 分布式计算 * * @param key Object : 原文件位置偏移量。 * @param value Text : 原文件的一行字符数据。 * @param context Context : 出参。 * @throws IOException , InterruptedException */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if (line.contains(sexFilter)) { // 读取的一行字符串数据。 String name = line.substring(0, line.indexOf(delim)); nameInfo.set(name); // 获取上网停留时间。 String time = line.substring(line.lastIndexOf(delim) + 1, line.length()); timeInfo.set(Integer.parseInt(time)); // map输出key,value键值对。 context.write(nameInfo, timeInfo); } } /** * map调用,做一些初始工作。 * * @param context Context */ public void setup(Context context) throws IOException, InterruptedException { // 通过Context可以获得配置信息。 delim = context.getConfiguration().get("log.delimiter", ","); sexFilter = delim + context.getConfiguration() .get("log.sex.filter", "female") + delim; } }
样例2:类CollectionReducer定义Reducer抽象类的reduce()方法。
public static class CollectionReducer extends Reducer<Text, IntWritable, Text, IntWritable> { // 统计结果。 private IntWritable result = new IntWritable(); // 总时间门槛。 private int timeThreshold; /** * @param key Text : Mapper后的key项。 * @param values Iterable : 相同key项的所有统计结果。 * @param context Context * @throws IOException , InterruptedException */ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } // 如果时间小于门槛时间,不输出结果。 if (sum < timeThreshold) { return; } result.set(sum); // reduce输出为key:网民的信息,value:该网民上网总时间。 context.write(key, result); } /** * setup()方法只在进入map任务的map()方法之前或者reduce任务的reduce()方法之前调用一次。 * * @param context Context * @throws IOException , InterruptedException */ public void setup(Context context) throws IOException, InterruptedException { // Context可以获得配置信息。 timeThreshold = context.getConfiguration().getInt( "log.time.threshold", 120); } }
样例3:main()方法创建一个job,指定参数,提交作业到hadoop集群。
public static void main(String[] args) throws Exception { // 初始化环境变量。 Configuration conf = new Configuration(); // 获取入参。 String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: collect female info <in> <out>"); System.exit(2); } // 判断是否为安全模式 if("kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))){ //security mode System.setProperty("java.security.krb5.conf", KRB); LoginUtil.login(PRINCIPAL, KEYTAB, KRB, conf); } // 初始化Job任务对象。 Job job = Job.getInstance(conf, "Collect Female Info"); job.setJarByClass(FemaleInfoCollector.class); // 设置运行时执行map,reduce的类,也可以通过配置文件指定。 job.setMapperClass(CollectionMapper.class); job.setReducerClass(CollectionReducer.class); // 设置combiner类,默认不使用,使用时通常使用和reduce一样的类。 // Combiner类需要谨慎使用,也可以通过配置文件指定。 job.setCombinerClass(CollectionCombiner.class); // 设置作业的输出类型。 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 提交任务交到远程环境上执行。 System.exit(job.waitForCompletion(true) ? 0 : 1); }
样例4:类CollectionCombiner实现了在map端先合并一下map输出的数据,减少map和reduce之间传输的数据量。
/** * Combiner class */ public static class CollectionCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { // Intermediate statistical results private IntWritable intermediateResult = new IntWritable(); /** * @param key Text : key after Mapper * @param values Iterable : all results with the same key in this map task * @param context Context * @throws IOException , InterruptedException */ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } intermediateResult.set(sum); // In the output information, key indicates netizen information, // and value indicates the total online time of the netizen in this map task. context.write(key, intermediateResult); } }