更新时间:2022-07-19 GMT+08:00
分享

规则

继承Mapper抽象类实现

在MapReduce任务的Map阶段,会执行map()及setup()方法。

正确示例:

public static class MapperClass extends

Mapper<Object, Text, Text, IntWritable> {
/**
* map的输入,key为原文件位置偏移量,value为原文件的一行字符数据。
* 其map的输入key,value为文件分割方法InputFormat提供,用户不设置,默认 * 使用TextInputFormat。
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
//自定义的实现
}
/**
* setup()方法只在进入map任务的map()方法之前或者reduce任务的reduce()方法之前调用一次
*/
public void setup(Context context) throws IOException,
InterruptedException {
//自定义的实现
}
}

继承Reducer抽象类实现。

在MapReduce任务的Reduce阶段,会执行reduce()及setup()方法。

正确示例:

public static class ReducerClass extends

Reducer<Text, IntWritable, Text, IntWritable> {

/**
* @param 输入为一个key和value值集合迭代器。
* 由各个map汇总相同的key而来。reduce方法汇总相同key的个数。
* 并调用context.write(key, value)输出到指定目录。
* 其reduce的输出的key,value由Outputformat写入文件系统。
* 默认使用TextOutputFormat写入HDFS。
*/

public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//自定义实现
}

/**
* setup()方法只在进入map任务的map()方法之前或者reduce任务的reduce()方法之前调用一次。
*/

public void setup(Context context) throws IOException,
InterruptedException {

// 自定义实现,Context可以获得配置信息。

}
}

提交一个MapReduce任务

main()方法创建一个job,指定参数,提交作业到hadoop集群。

正确示例:

public static void main(String[] args) throws Exception {
Configuration conf = getConfiguration();
// main方法输入参数:args[0]为样例MR作业输入路径,args[1]为样例MR作业输出路径
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "job name");
// 设置找到主任务所在的jar包。
job.setJar("D:\\job-examples.jar");
// job.setJarByClass(TestWordCount.class);
// 设置运行时执行map,reduce的类,也可以通过配置文件指定。
job.setMapperClass(TokenizerMapperV1.class);
job.setReducerClass(IntSumReducerV1.class);
// 设置combiner类,默认不使用,使用时通常使用和reduce一样的类,Combiner类需要谨慎使用,也可以通过配置文件指定。
job.setCombinerClass(IntSumReducerV1.class);
// 设置作业的输出类型,也可以通过配置文件指定。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置该job的输入输出路径,也可以通过配置文件指定。
Path outputPath = new Path(otherArgs[1]);
FileSystem fs = outputPath.getFileSystem(conf);
// 如果输出路径已存在,删除该路径。
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

对资源消耗较大的操作不要放到map或reduce函数中

对资源消耗较大的操作比如创建数据库链接,打开关闭文件等,不要放到map或reduce函数中。

多线程安全登录方式

如果有多线程进行login的操作,当应用程序第一次登录成功后,所有线程再次登录时应该使用relogin的方式。

login的代码样例

  private Boolean login(Configuration conf){
    boolean flag = false;
    UserGroupInformation.setConfiguration(conf);
    
    try {
      UserGroupInformation.loginUserFromKeytab(conf.get(PRINCIPAL), conf.get(KEYTAB));
      System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased());
      flag = true;
    } catch (IOException e) {
      e.printStackTrace();
    }
    return flag;
    
  }

relogin的代码样例

public Boolean relogin(){
        boolean flag = false;
        try {
            
          UserGroupInformation.getLoginUser().reloginFromKeytab();
          System.out.println("UserGroupInformation.isLoginKeytabBased(): " +UserGroupInformation.isLoginKeytabBased());
          flag = true;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return flag;
    }
分享:

    相关文档

    相关产品