更新时间:2025-12-16 GMT+08:00
分享

SpringBoot对接HDFS

该章节仅适用于MRS 3.6.0及之后版本。

功能简介

通过SpringBoot对接HDFS集群,实现向HDFS读写数据的功能。

配置文件介绍

登录HDFS时会使用到如表1所示的配置文件。这些文件均已导入到“hdfs-examples”工程的“src/main/resource”目录。
表1 配置文件

文件名称

作用

core-site.xml

配置HDFS详细参数。

hdfs-site.xml

配置HDFS详细参数。

application.properties

Spring boot的配置信息,目前需要配置下面信息:

  • “hdfs-examples”工程的“src/main/resource”目录的绝对地址。
  • 登录HDFS所需要的用户名。

代码样例

SpringBoot对接HDFS集群,实现读写HDFS的核心样例代码如下:

@Repository
public class CustomerHDFSTemplate {
    protected static final Logger logger = LoggerFactory.getLogger(CustomerHDFSTemplate.class.getName());
    @Value("${spring.hdfs.config.dir}")
    private String configDir;
    @Value("${hdfs.user}")
    private String user;
    private FileSystem fSystem = null;
    @PostConstruct
    private void init() throws IOException {
        Configuration conf = new Configuration();
        // conf file
        conf.addResource(new Path(configDir + "/hdfs-site.xml"));
        conf.addResource(new Path(configDir + "/core-site.xml"));
        if ("kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) {
            String krb5Conf = configDir + File.separator + "krb5.conf";
            String keytab = configDir + File.separator + "user.keytab";
            System.setProperty("java.security.krb5.conf", krb5Conf);
            LoginUtil.login(user, keytab, krb5Conf, conf);
        }
        fSystem = FileSystem.get(conf);
    }
    public void mkdir(String path) throws IOException {
        Path filePath = new Path(path);
        if (!fSystem.exists(filePath)) {
            fSystem.mkdirs(filePath);
            logger.info("mkdir path:{} success", path);
            return;
        }
        logger.info("path :{} exists", path);
    }
    public void remove(String path) throws IOException {
        Path filePath = new Path(path);
        if (fSystem.exists(filePath)) {
            fSystem.delete(filePath, true);
            logger.info("remove path:{} success", path);
            return;
        }
        logger.info("path :{} not exists", path);
    }
    public void write(String destPath, String fileName) throws IOException {
        final String content = "hi, I am bigdata. It is successful if you can see me.";
        FSDataOutputStream out = null;
        try {
            out = fSystem.create(new Path(destPath + File.separator + fileName));
            out.write(content.getBytes());
            out.hsync();
            logger.info("success to write.");
        } finally {
            // make sure the stream is closed finally.
            IOUtils.closeStream(out);
        }
    }
    public String read(String destPath, String fileName) throws IOException {
        String strPath = destPath + File.separator + fileName;
        Path path = new Path(strPath);
        FSDataInputStream in = null;
        BufferedReader reader = null;
        StringBuilder strBuffer = new StringBuilder();
        try {
            in = fSystem.open(path);
            reader = new BufferedReader(new InputStreamReader(in));
            String sTempOneLine;
            // write file
            while ((sTempOneLine = reader.readLine()) != null) {
                strBuffer.append(sTempOneLine);
            }
            logger.info("result is : " + strBuffer);
            logger.info("success to read.");
        } finally {
            // make sure the streams are closed finally.
            IOUtils.closeStream(reader);
            IOUtils.closeStream(in);
        }
        return strBuffer.toString();
    }
}

上述样例代码中需要配置的变量可以在“springboot > hdfs-examples > src > main > resources > application.properties”中配置,也可以在样例运行环境上手动编写application.properties文件。参数内容需要根据实际环境修改,变量的含义如下:

  • spring.hdfs.config.dir:是访问HDFS的配置文件目录地址,其中配置文件包括:hdfs-site.xml、core-site.xml。
  • hdfs.user:访问HDFS的业务用户名称。

相关文档