更新时间:2024-11-29 GMT+08:00

配置依赖管理

Flink支持通过第三方依赖包来运行自定义Flink作业。可以在Flink WebUI界面中上传并管理依赖jar包,然后在运行作业时调用对应依赖。依赖管理暂不支持“语义”校验功能,依赖jar包名称需以字母、数字或下划线开头,且不超过32个字符。支持如下两种第三方依赖:

  • 自定义connector依赖:用户自定义connector jar包,上传后在Flink WebUI界面中“依赖类型”显示为“connector”。
  • 非自定义connector依赖:非用户自定义connector jar包,如作业依赖包,上传后在Flink WebUI界面中“依赖类型”显示为“normal”。

前提条件

准备依赖文件。若通过“指定路径”方式将依赖上传到集群,需提前创建HDFS路径,并将jar包上传至HDFS中。

上传依赖包

  1. 登录FusionInsight Manager,访问Flink WebUI,请参考访问Flink WebUI
  2. 单击“依赖管理”进入依赖管理页面。
  3. 单击“添加依赖”,可参考如下添加依赖。

    表1 添加依赖

    参数

    描述

    示例

    是否自定义connector

    是否自定义connector,根据实际需求选择:

    • 是:文件为自定义connector依赖包。
    • 否:文件为非自定义connector依赖包。

    名称

    添加的依赖名称,需与上传的依赖包中connector的连接名一致。不支持上传同名依赖包。

    kafka

    注册jar

    jar包的上传方式:

    • 上传文件:添加本地的jar包
    • 指定路径:已准备好的依赖文件的HDFS路径

    上传文件

    上传文件

    注册jar选择为“上传文件”时,需通过该项上传本地jar文件。

    -

    指定路径

    注册jar选择为“指定路径”时,需通过该项输入依赖文件的HDFS路径(需提前准备好jar包上传至HDFS)。

    /flink_upload_test/flink-connector-kafka-customization.jar

    描述信息

    添加的依赖的描述信息。

    -

  4. 单击“确定”。

使用示例

  • 自定义connector依赖
    1. 参考上传依赖包上传自定义connector依赖。

      如上传依赖名称为“kafka”,自定义connector jar包名称为“flink-connector-kafka-customization.jar”。

    2. 参考创建作业新建SQL作业,该SQL中的“connector”需填写为对应的依赖名称,如'connector' = 'kafka'
      CREATE TABLE KafkaSinkTable (`user_id` INT, `name` VARCHAR) WITH (
        'connector' = 'kafka',
        'topic' = 'test_sink6',
        'properties.bootstrap.servers' = '192.168.20.134:21005',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'csv'
      );
      CREATE TABLE datagen (`user_id` INT, `name` VARCHAR) WITH (
        'connector' = 'datagen',
        'rows-per-second' = '5',
        'fields.user_id.kind' = 'sequence',
        'fields.user_id.start' = '1',
        'fields.user_id.end' = '1000'
      );
      insert INTO
        KafkaSinkTable
      select
        *
      from
        datagen;
  • 非自定义connector依赖使用样例

    参考上传依赖包上传作业依赖的非自定义connector依赖即可。