更新时间:2024-06-27 GMT+08:00
分享

通过Java API提交Oozie作业

功能简介

Oozie通过org.apache.oozie.client.OozieClient的run方法提交作业,通过getJobInfo获取作业信息。

代码样例

代码示例中请根据实际情况,修改“OOZIE_URL_DEFALUT”为实际的任意Oozie节点的主机名,例如“https://10-1-131-131:21003/oozie/”。

    public void test(String jobFilePath) {
        try {
            UserGroupInformation.getLoginUser()
                    .doAs(
                            new PrivilegedExceptionAction<Void>() {
                                /**
                                 * run job
                                 *
                                 * @return null
                                 * @throws Exception exception
                                 */
                                public Void run() throws Exception {
                                    runJob(jobFilePath);
                                    return null;
                                }
                            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void runJob(String jobFilePath) throws OozieClientException, InterruptedException {

        Properties conf = getJobProperties(jobFilePath);

        // submit and start the workflow job
        String jobId = wc.run(conf);

        logger.info("Workflow job submitted : {}" , jobId);

        // wait until the workflow job finishes printing the status every 10 seconds
        while (wc.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) {
            logger.info("Workflow job running ... {}" , jobId);
            Thread.sleep(10 * 1000);
        }

        // print the final status of the workflow job
        logger.info("Workflow job completed ... {}" , jobId);
        logger.info(String.valueOf(wc.getJobInfo(jobId)));
    }

    /**
     * Get job.properties File in filePath
     *
     * @param filePath file path
     * @return job.properties
     * @since 2020-09-30
     */
    public Properties getJobProperties(String filePath) {
        File configFile = new File(filePath);
        if (!configFile.exists()) {
            logger.info(filePath , "{} is not exist.");
        }

        InputStream inputStream = null;

        // create a workflow job configuration
        Properties properties = wc.createConfiguration();
        try {
            inputStream = new FileInputStream(filePath);
            properties.load(inputStream);

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        return properties;
    }

注意事项

通过Java API访问Oozie需要先参考环境准备章节进行安全认证,并将依赖的配置文件(配置文件Workflow.xml的开发参见workflow.xml)与jar包上传到HDFS,并确保进行过安全认证的用户有权限访问HDFS上对应的目录(目录的属主是该用户,或与该用户属于同一个用户组)。

相关文档