Connecting Flink to OBS
Overview
Flink is a distributed data processing engine for processing bounded and unbounded data streams. Flink defines the file system APIs and OBS implements the defined APIs, so that OBS can be used as the Flink StateBackend and the carrier of data read/write.
Precautions
- flink-obs-fs-hadoop currently supports only OBS parallel file systems.
- You are advised not to store stateful data on OBS.
- To reduce output logs, add the following configurations to the /opt/flink-1.12.1/conf/log4j.properties file:
logger.obs.name=com.obs logger.obs.level=ERROR
- flink-obs-fs-hadoop is implemented based on the plug-in loading mechanism of Flink (introduced from Flink 1.9). It must be loaded using this mechanism, that is, placing flink-obs-fs-hadoop in the /opt/flink-1.12.1/plugins/obs-fs-hadoop directory.
Procedure
The following uses flink-1.12.1 as an example.
- Download flink-1.12.1-bin-scala_2.11.tgz and decompress it to the /opt/flink-1.12.1 directory.
- Add the following content to the /etc/profile file:
export FLINK_HOME=/opt/flink-1.12.1 export PATH=$FLINK_HOME/bin:$PATH
- Install flink-obs-fs-hadoop.
- Download it from GitHub.
- In flink-obs-fs-hadoop-${flinkversion}-hw-${version}.jar, flinkversion indicates the Flink version number, and version indicates the version number of flink-obs-fs-hadoop.
- If no JAR package of a required version is available, modify the Flink version in the POM file under the flink-obs-fs-hadoop directory and recompile the file.
- Create the obs-fs-hadoop directory under the /opt/flink-1.12.1/plugins directory and save the JAR package above to obs-fs-hadoop.
- Download it from GitHub.
- Configure Flink.
Configure the following parameters in the /opt/flink-1.12.1/conf/flink-conf.yaml file or in the code:
fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem fs.obs.access.key: xxx fs.obs.secret.key: xxx fs.obs.endpoint: xxx fs.obs.buffer.dir: /data/buf # Local temporary directory for you to write data to OBS. The Flink must have the read and write permissions for this directory.
- Compile the Flink application.
- Set StateBackend to a path in OBS.
Example:
1
env.setStateBackend(new FsStateBackend("obs://obs-bucket/test/checkpoint"));
- Set StreamingFileSink to a path in OBS.
Example:
1 2 3 4 5 6 7
final StreamingFileSink<String> sink = StreamingFileSink .forRowFormat(new Path("obs://obs-bucket/test/data"), new SimpleStringEncoder<String>("UTF-8")) .withBucketAssigner(new BasePathBucketAssigner()) .withRollingPolicy(rollingPolicy) .withBucketCheckInterval(1000L) .build();
- Set StateBackend to a path in OBS.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.