Updated on 2024-10-17 GMT+08:00

Connecting Flink to OBS

Overview

Flink is a distributed data processing engine for processing bounded and unbounded data streams. Flink defines the file system APIs and OBS implements the defined APIs, so that OBS can be used as the Flink StateBackend and the carrier of data read/write.

Precautions

  • flink-obs-fs-hadoop currently supports only OBS parallel file systems.
  • To reduce output logs, add the following configurations to the /opt/flink-1.12.1/conf/log4j.properties file:
    logger.obs.name=com.obs
    logger.obs.level=ERROR
  • flink-obs-fs-hadoop is implemented based on the plug-in loading mechanism of Flink (introduced from Flink 1.9). It must be loaded using this mechanism, that is, placing flink-obs-fs-hadoop in the /opt/flink-1.12.1/plugins/obs-fs-hadoop directory.

Procedure

The following uses flink-1.12.1 as an example.

  1. Download flink-1.12.1-bin-scala_2.11.tgz and decompress it to the /opt/flink-1.12.1 directory.
  2. Add the following content to the /etc/profile file:

    export FLINK_HOME=/opt/flink-1.12.1
    export PATH=$FLINK_HOME/bin:$PATH

  3. Install flink-obs-fs-hadoop.

    1. Download it from GitHub.
      • In flink-obs-fs-hadoop-${flinkversion}-hw-${version}.jar, flinkversion indicates the Flink version number, and version indicates the version number of flink-obs-fs-hadoop.
      • If no JAR package of a required version is available, modify the Flink version in the POM file under the flink-obs-fs-hadoop directory and recompile the file.
      • When compiling flink-obs-fs-hadoop, you are advised to use hadoop-huaweicloud in version 53.8 or later.
    2. Create the obs-fs-hadoop directory under the /opt/flink-1.12.1/plugins directory and save the JAR package above to obs-fs-hadoop.

  4. Configure Flink.

    Configure the following parameters in the /opt/flink-1.12.1/conf/flink-conf.yaml file or in the code:
    fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem
    fs.obs.access.key: xxx
    fs.obs.secret.key: xxx
    fs.obs.endpoint: xxx
    fs.obs.buffer.dir: /data/buf # Local temporary directory for you to write data to OBS. The Flink must have the read and write permissions for this directory.

  5. Compile the Flink application.

    1. Set StateBackend to a path in OBS.
      Example:
      1
      env.setStateBackend(new FsStateBackend("obs://obs-bucket/test/checkpoint"));
      
    2. Set StreamingFileSink to a path in OBS.
      Example:
      1
      2
      3
      4
      5
      6
      7
      final StreamingFileSink<String> sink = StreamingFileSink
      .forRowFormat(new Path("obs://obs-bucket/test/data"),
      new SimpleStringEncoder<String>("UTF-8"))
      .withBucketAssigner(new BasePathBucketAssigner())
      .withRollingPolicy(rollingPolicy)
      .withBucketCheckInterval(1000L)
      .build();