更新时间:2022-07-19 GMT+08:00
示例
统计日志文件中本周末网购停留总时间超过2个小时的女性网民信息。
主要分为三个部分。
- 从原文件中筛选女性网民上网时间数据信息,通过类MapperClass继承Mapper抽象类实现。
- 汇总每个女性上网时间,并输出时间大于两个小时的女性网民信息,通过类ReducerClass继承Reducer抽象类实现。
- main方法提供建立一个MapReduce job,并提交MapReduce作业到hadoop集群
样例1:类MapperClass定义Mapper抽象类的map()方法和setup()方法。
public static class MapperClass extends Mapper<Object, Text, Text, IntWritable> { // 分隔符。 String delim; // 性别筛选。 String sexFilter; private final static IntWritable timeInfo = new IntWritable(1); private Text nameInfo = new Text(); /** * map的输入,key为原文件位置偏移量,value为原文件的一行字符数据。 * 其map的输入key,value为文件分割方法InputFormat提供,用户不设置,默认使用TextInputFormat。 */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 读取的一行字符串数据 String line = value.toString(); if (line.contains(sexFilter)) { // 获取姓名 String name = line.substring(0, line.indexOf(delim)); nameInfo.set(name); // 获取上网停留时间 String time = line.substring(line.lastIndexOf(delim), line.length()); timeInfo.set(Integer.parseInt(time)); // map输出key,value键值对 context.write(nameInfo, timeInfo); } } /** * setup()方法只在进入map任务的map()方法之前或者reduce任务的reduce()方法之前调用一次 */ public void setup(Context context) throws IOException, InterruptedException { // 通过Context可以获得配置信息。 sexFilter = delim + context.getConfiguration().get("log.sex.filter", "female") + delim; } }
样例2:类ReducerClass定义Reducer抽象类的reduce()方法。
public static class ReducerClass extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); // 总时间门槛。 private int timeThreshold; /** * @param 输入为一个key和value值集合迭代器。 * 由各个map汇总相同的key而来。reduce方法汇总相同key的个数。 * 并调用context.write(key, value)输出到指定目录。 * 其reduce的输出的key,value由Outputformat写入文件系统。 * 默认使用TextOutputFormat写入HDFS。 */ public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } // 如果时间小于门槛时间,不输出结果。 if (sum < timeThreshold) { return; } result.set(sum); // reduce输出为key:网民的信息,value:该网民上网总时间 context.write(key, result); } /** * setup()方法只在进入map任务的map()方法之前或者reduce任务的reduce()方法之前调用一次。 */ public void setup(Context context) throws IOException, InterruptedException { // Context可以获得配置信息。 timeThreshold = context.getConfiguration().getInt( "log.time.threshold", 120); } }
样例3:main()方法创建一个job,指定参数,提交作业到hadoop集群。
public static void main(String[] args) throws Exception { Configuration conf = getConfiguration(); // main方法输入参数:args[0]为样例MR作业输入路径,args[1]为样例MR作业输出路径 String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: <in> <out>"); System.exit(2); } Job job = new Job(conf, "Collect Female Info"); // 设置找到主任务所在的jar包。 job.setJar("D:\\mapreduce-examples\\hadoop-mapreduce-examples\\mapreduce-examples.jar"); // job.setJarByClass(TestWordCount.class); // 设置运行时执行map,reduce的类,也可以通过配置文件指定。 job.setMapperClass(TokenizerMapperV1.class); job.setReducerClass(IntSumReducerV1.class); // 设置combiner类,默认不使用,使用时通常使用和reduce一样的类,Combiner类需要谨慎使用,也可以通过配置文件指定。 job.setCombinerClass(IntSumReducerV1.class); // 设置作业的输出类型,也可以通过配置文件指定。 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置该job的输入输出路径,也可以通过配置文件指定。 Path outputPath = new Path(otherArgs[1]); FileSystem fs = outputPath.getFileSystem(conf); // 如果输出路径已存在,删除该路径。 if (fs.exists(outputPath)) { fs.delete(outputPath, true); } FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) 0 : 1); }
父主题: 开发规范