更新时间:2024-11-06 GMT+08:00
Flink Jar作业是否支持上传配置文件,要如何操作?
Flink Jar作业上传配置文件操作流程
自定义(JAR)作业支持上传配置文件。
- 将配置文件通过程序包管理上传到DLI;
- 在Flink jar作业的其他依赖文件参数中,选择创建的DLI程序包;
- 在代码中通过ClassName.class.getClassLoader().getResource("userData/fileName")加载该文件。
- ClassName”为需要访问该文件的类名。
- userData为固定文件路径名,不支持修改或自定义其他路径名。
- fileName为需要访问的文件名。
本节示例适用于Flink 1.12版本。Flink 1.15版本的Jar作业开发指导请参考Flink Jar写入数据到OBS开发指南。
配置文件使用方法
- 方案一:直接在main函数里面加载文件内容到内存,然后广播到各个taskmanager,这种方式适合那种需要提前加载的少量变量。
- 方案二:在open里面初始化算子的时候加载文件,可以使用相对路径/绝对路径的方式
以kafka sink为例:需要加载两个文件(userData/kafka-sink.conf,userData/client.truststore.jks)
- 使用相对路径的配置示例:
使用相对路径:confPath = userData/kafka-sink.conf @Override public void open(Configuration parameters) throws Exception { super.open(parameters); initConf(); producer = new KafkaProducer<>(props); } private void initConf() { try { URL url = DliFlinkDemoDis2Kafka.class.getClassLoader().getResource(confPath); if (url != null) { LOGGER.info("kafka main-url: " + url.getFile()); } else { LOGGER.info("kafka url error......"); } InputStream inputStream = new BufferedInputStream(new FileInputStream(new File(url.getFile()).getAbsolutePath())); props.load(new InputStreamReader(inputStream, "UTF-8")); topic = props.getProperty("topic"); partition = Integer.parseInt(props.getProperty("partition")); vaildProps(); } catch (Exception e) { LOGGER.info("load kafka conf failed"); e.printStackTrace(); } }
图1 相对路径配置示例
- 使用绝对路径的配置示例:
使用绝对路径:confPath = userData/kafka-sink.conf / path = /opt/data1/hadoop/tmp/usercache/omm/appcache/application_xxx_0015/container_xxx_0015_01_000002/userData/client.truststore.jks @Override public void open(Configuration parameters) throws Exception { super.open(parameters); initConf(); String path = DliFlinkDemoDis2Kafka.class.getClassLoader().getResource("userData/client.truststore.jks").getPath(); LOGGER.info("kafka abs path " + path); props.setProperty("ssl.truststore.location", path); producer = new KafkaProducer<>(props); } private void initConf() { try { URL url = DliFlinkDemoDis2Kafka.class.getClassLoader().getResource(confPath); if (url != null) { LOGGER.info("kafka main-url: " + url.getFile()); } else { LOGGER.info("kafka url error......"); } InputStream inputStream = new BufferedInputStream(new FileInputStream(new File(url.getFile()).getAbsolutePath())); props.load(new InputStreamReader(inputStream, "UTF-8")); topic = props.getProperty("topic"); partition = Integer.parseInt(props.getProperty("partition")); vaildProps(); } catch (Exception e) { LOGGER.info("load kafka conf failed"); e.printStackTrace(); } }
图2 绝对路径配置示例
- 使用相对路径的配置示例:
父主题: Flink Jar作业类