通过Java API提交Oozie作业
功能简介
Oozie通过org.apache.oozie.client.OozieClient的run方法提交作业,通过getJobInfo获取作业信息。
代码样例
代码示例中请根据实际情况,修改“OOZIE_URL_DEFALUT”为实际的任意Oozie节点的主机名,例如“https://10-1-131-131:21003/oozie/”。
public void test(String jobFilePath) { try { UserGroupInformation.getLoginUser() .doAs( new PrivilegedExceptionAction<Void>() { /** * run job * * @return null * @throws Exception exception */ public Void run() throws Exception { runJob(jobFilePath); return null; } }); } catch (Exception e) { e.printStackTrace(); } } private void runJob(String jobFilePath) throws OozieClientException, InterruptedException { Properties conf = getJobProperties(jobFilePath); // submit and start the workflow job String jobId = wc.run(conf); logger.info("Workflow job submitted : {}" , jobId); // wait until the workflow job finishes printing the status every 10 seconds while (wc.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) { logger.info("Workflow job running ... {}" , jobId); Thread.sleep(10 * 1000); } // print the final status of the workflow job logger.info("Workflow job completed ... {}" , jobId); logger.info(String.valueOf(wc.getJobInfo(jobId))); } /** * Get job.properties File in filePath * * @param filePath file path * @return job.properties * @since 2020-09-30 */ public Properties getJobProperties(String filePath) { File configFile = new File(filePath); if (!configFile.exists()) { logger.info(filePath , "{} is not exist."); } InputStream inputStream = null; // create a workflow job configuration Properties properties = wc.createConfiguration(); try { inputStream = new FileInputStream(filePath); properties.load(inputStream); } catch (Exception e) { e.printStackTrace(); } finally { if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } } } return properties; }
注意事项
通过Java API访问Oozie需要先参考环境准备章节进行安全认证,并将依赖的配置文件(配置文件Workflow.xml的开发参见workflow.xml)与jar包上传到HDFS,并确保进行过安全认证的用户有权限访问HDFS上对应的目录(目录的属主是该用户,或与该用户属于同一个用户组)。