更新时间:2024-03-25 GMT+08:00
分享

Flink对接OBS

概述

Flink是一个分布式的数据处理引擎,用于处理有界和无界流式数据。Flink定义了文件系统抽象,OBS服务实现了Flink的文件系统抽象,使得OBS可以作为flink StateBackend和数据读写的载体。

注意事项

  • flink-obs-fs-hadoop目前仅支持OBS并行文件系统。
  • 不推荐state状态数据存储在OBS上。
  • 为了减少日志输出,在/opt/flink-1.12.1/conf/log4j.properties文件中增加配置:
    logger.obs.name=com.obs
    logger.obs.level=ERROR
  • flink-obs-fs-hadoop的实现基于flink的plugin加载机制(flink从1.9开始引入),flink-obs-fs-hadoop必须通过flink的plugin机制进行加载,即将flink-obs-fs-hadoop放入/opt/flink-1.12.1/plugins/obs-fs-hadoop目录下。

对接步骤

以flink-1.12.1为例。

  1. 下载flink-1.12.1-bin-scala_2.11.tgz,并解压到/opt/flink-1.12.1目录。
  2. 在/etc/profile文件中增加配置:

    export FLINK_HOME=/opt/flink-1.12.1
    export PATH=$FLINK_HOME/bin:$PATH

  3. 安装flink-obs-fs-hadoop。

    1. 在Github下载flink-obs-fs-hadoop:下载地址
      • flink-obs-fs-hadoop-${flinkversion}-hw-${version}.jar版本规则:flinkversion为对应的flink版本号,version为flink-obs-fs-hadoop版本号。
      • 如果没有匹配版本的jar包,可自行修改flink-obs-fs-hadoop目录下pom文件中的flink版本重新编译生成。详情见编译指南
      • 自行编译flink-obs-fs-hadoop时,推荐编译依赖的hadoop.huaweicloud版本(hadoop.huaweicloud.version)不低于53.8版本。
    2. 在/opt/flink-1.12.1/plugins目录下创建obs-fs-hadoop目录,并将上述jar放入此目录。

  4. 配置flink。

    在/opt/flink-1.12.1/conf/flink-conf.yaml文件中或在代码中设置如下参数:
    fs.obs.impl: org.apache.hadoop.fs.obs.OBSFileSystem
    fs.obs.access.key: xxx
    fs.obs.secret.key: xxx
    fs.obs.endpoint: xxx
    fs.obs.buffer.dir: /data/buf #写数据到OBS时需要的本地临时目录,flink程序需具备此目录读写权限

  5. 编写flink应用程序。

    1. StateBackend设置为OBS中的路径。
      示例:
      1
      env.setStateBackend(new FsStateBackend("obs://obs-bucket/test/checkpoint"));
      
    2. StreamingFileSink设置为OBS中的路径。
      示例:
      1
      2
      3
      4
      5
      6
      7
      final StreamingFileSink<String> sink = StreamingFileSink
      .forRowFormat(new Path("obs://obs-bucket/test/data"),
      new SimpleStringEncoder<String>("UTF-8"))
      .withBucketAssigner(new BasePathBucketAssigner())
      .withRollingPolicy(rollingPolicy)
      .withBucketCheckInterval(1000L)
      .build();
      

分享:

    相关文档

    相关产品