更新时间:2024-11-26 GMT+08:00
Flink对接OBS
概述
Flink是一个分布式的数据处理引擎,用于处理有界和无界流式数据。Flink定义了文件系统抽象,OBS服务实现了Flink的文件系统抽象,使得OBS可以作为flink StateBackend和数据读写的载体。
注意事项
- flink-obs-fs-hadoop目前仅支持OBS并行文件系统。
- 为了减少日志输出,在/opt/flink-1.12.1/conf/log4j.properties文件中增加配置:
logger.obs.name=com.obs logger.obs.level=ERROR
- flink-obs-fs-hadoop的实现基于flink的plugin加载机制(flink从1.9开始引入),flink-obs-fs-hadoop必须通过flink的plugin机制进行加载,即将flink-obs-fs-hadoop放入/opt/flink-1.12.1/plugins/obs-fs-hadoop目录下。
对接步骤
以flink-1.12.1为例。
- 下载flink-1.12.1-bin-scala_2.11.tgz,并解压到/opt/flink-1.12.1目录。
- 在/etc/profile文件中增加配置:
export FLINK_HOME=/opt/flink-1.12.1 export PATH=$FLINK_HOME/bin:$PATH
- 安装flink-obs-fs-hadoop。
- 在Github下载flink-obs-fs-hadoop:下载地址。
- flink-obs-fs-hadoop-${flinkversion}-hw-${version}.jar版本规则:flinkversion为对应的flink版本号,version为flink-obs-fs-hadoop版本号。
- 如果没有匹配版本的jar包,可自行修改flink-obs-fs-hadoop目录下pom文件中的flink版本重新编译生成。详情见编译指南。
- 自行编译flink-obs-fs-hadoop时,推荐编译依赖的hadoop.huaweicloud版本(hadoop.huaweicloud.version)不低于53.8版本。
- 在/opt/flink-1.12.1/plugins目录下创建obs-fs-hadoop目录,并将上述jar放入此目录。
- 在Github下载flink-obs-fs-hadoop:下载地址。
- 配置flink。
在/opt/flink-1.12.1/conf/flink-conf.yaml文件中或在代码中设置如下参数:
fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem fs.obs.access.key: xxx fs.obs.secret.key: xxx fs.obs.endpoint: xxx fs.obs.buffer.dir: /data/buf #写数据到OBS时需要的本地临时目录,flink程序需具备此目录读写权限
- 编写flink应用程序。
- StateBackend设置为OBS中的路径。
- StreamingFileSink设置为OBS中的路径。