更新时间:2024-06-27 GMT+08:00
分享

通过Java API提交Oozie作业

功能简介

Oozie通过org.apache.oozie.client.OozieClient的run方法提交作业,通过getJobInfo获取作业信息。

代码样例

代码示例中请根据实际情况,修改“OOZIE_URL_DEFALUT”为实际的任意Oozie的主机名,例如“https://10-1-131-131:21003/oozie/”。

    public void test(String jobFilePath) {
        try {
            runJob(jobFilePath);
        } catch (Exception exception) {
            exception.printStackTrace();
        }
    }

    private void runJob(String jobFilePath) throws OozieClientException, InterruptedException {

        Properties conf = getJobProperties(jobFilePath);
        String user = PropertiesCache.getInstance().getProperty("submit_user");
        conf.setProperty("user.name", user);

        // submit and start the workflow job
        String jobId = oozieClient.run(conf);

        logger.info("Workflow job submitted: {}" , jobId);

        // wait until the workflow job finishes printing the status every 10 seconds
        while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) {
            logger.info("Workflow job running ... {}" , jobId);
            Thread.sleep(10 * 1000);
        }

        // print the final status of the workflow job
        logger.info("Workflow job completed ... {}" , jobId);
        logger.info(String.valueOf(oozieClient.getJobInfo(jobId)));
    }

    /**
     * Get job.properties File in filePath
     *
     * @param filePath file path
     * @return job.properties
     * @since 2020-09-30
     */
    public Properties getJobProperties(String filePath) {
        File configFile = new File(filePath);
        if (!configFile.exists()) {
            logger.info(filePath , "{} is not exist.");
        }

        InputStream inputStream = null;

        // create a workflow job configuration
        Properties properties = oozieClient.createConfiguration();
        try {
            inputStream = new FileInputStream(filePath);
            properties.load(inputStream);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (IOException ex) {
                    ex.printStackTrace();
                }
            }
        }

        return properties;
    }

相关文档