文档首页/ 数据湖探索 DLI/ 常见问题/ Flink作业类/ Flink Jar作业类/ Flink Jar作业是否支持上传配置文件,要如何操作?
更新时间:2024-11-08 GMT+08:00

Flink Jar作业是否支持上传配置文件,要如何操作?

Flink Jar作业上传配置文件操作流程

自定义(JAR)作业支持上传配置文件。

  1. 将配置文件通过程序包管理上传到DLI;
  2. 在Flink jar作业的其他依赖文件参数中,选择创建的DLI程序包;
  3. 在代码中通过ClassName.class.getClassLoader().getResource("userData/fileName")加载该文件。
    • ClassName”为需要访问该文件的类名。
    • userData为固定文件路径名,不支持修改或自定义其他路径名。
    • fileName为需要访问的文件名。

本节示例适用于Flink 1.12版本。Flink 1.15版本的Jar作业开发指导请参考Flink Jar写入数据到OBS开发指南

配置文件使用方法

  • 方案一:直接在main函数里面加载文件内容到内存,然后广播到各个taskmanager,这种方式适合那种需要提前加载的少量变量。
  • 方案二:在open里面初始化算子的时候加载文件,可以使用相对路径/绝对路径的方式

    以kafka sink为例:需要加载两个文件(userData/kafka-sink.conf,userData/client.truststore.jks)

    • 使用相对路径的配置示例:
      使用相对路径:confPath = userData/kafka-sink.conf
      @Override
      public void open(Configuration parameters) throws Exception {
          super.open(parameters);
          initConf();
          producer = new KafkaProducer<>(props);
      }
      private void initConf() {
          try {
              URL url = DliFlinkDemoDis2Kafka.class.getClassLoader().getResource(confPath);
              if (url != null) {
                  LOGGER.info("kafka main-url: " + url.getFile());
              } else {
                  LOGGER.info("kafka url error......");
              }
              InputStream inputStream = new BufferedInputStream(new FileInputStream(new File(url.getFile()).getAbsolutePath()));
              props.load(new InputStreamReader(inputStream, "UTF-8"));
              topic = props.getProperty("topic");
              partition = Integer.parseInt(props.getProperty("partition"));
              vaildProps();
          } catch (Exception e) {
              LOGGER.info("load kafka conf failed");
              e.printStackTrace();
          }
      }
      图1 相对路径配置示例
    • 使用绝对路径的配置示例:
      使用绝对路径:confPath = userData/kafka-sink.conf / path = /opt/data1/hadoop/tmp/usercache/omm/appcache/application_xxx_0015/container_xxx_0015_01_000002/userData/client.truststore.jks
      @Override
      public void open(Configuration parameters) throws Exception {
          super.open(parameters);
          initConf();
      	String path = DliFlinkDemoDis2Kafka.class.getClassLoader().getResource("userData/client.truststore.jks").getPath();
      	LOGGER.info("kafka abs path " + path);
      	props.setProperty("ssl.truststore.location", path);
          producer = new KafkaProducer<>(props);
      }
      private void initConf() {
          try {
              URL url = DliFlinkDemoDis2Kafka.class.getClassLoader().getResource(confPath);
              if (url != null) {
                  LOGGER.info("kafka main-url: " + url.getFile());
              } else {
                  LOGGER.info("kafka url error......");
              }
              InputStream inputStream = new BufferedInputStream(new FileInputStream(new File(url.getFile()).getAbsolutePath()));
              props.load(new InputStreamReader(inputStream, "UTF-8"));
              topic = props.getProperty("topic");
              partition = Integer.parseInt(props.getProperty("partition"));
              vaildProps();
          } catch (Exception e) {
              LOGGER.info("load kafka conf failed");
              e.printStackTrace();
          }
      }
      图2 绝对路径配置示例