更新时间:2024-11-29 GMT+08:00
配置依赖管理
Flink支持通过第三方依赖包来运行自定义Flink作业。可以在Flink WebUI界面中上传并管理依赖jar包,然后在运行作业时调用对应依赖。依赖管理暂不支持“语义”校验功能,依赖jar包名称需以字母、数字或下划线开头,且不超过32个字符。支持如下两种第三方依赖:
- 自定义connector依赖:用户自定义connector jar包,上传后在Flink WebUI界面中“依赖类型”显示为“connector”。
- 非自定义connector依赖:非用户自定义connector jar包,如作业依赖包,上传后在Flink WebUI界面中“依赖类型”显示为“normal”。
前提条件
准备依赖文件。若通过“指定路径”方式将依赖上传到集群,需提前创建HDFS路径,并将jar包上传至HDFS中。
上传依赖包
- 登录FusionInsight Manager,访问Flink WebUI,请参考访问Flink WebUI。
- 单击“依赖管理”进入依赖管理页面。
- 单击“添加依赖”,可参考如下添加依赖。
表1 添加依赖 参数
描述
示例
是否自定义connector
是否自定义connector,根据实际需求选择:
- 是:文件为自定义connector依赖包。
- 否:文件为非自定义connector依赖包。
是
名称
添加的依赖名称,需与上传的依赖包中connector的连接名一致。不支持上传同名依赖包。
kafka
注册jar
jar包的上传方式:
- 上传文件:添加本地的jar包
- 指定路径:已准备好的依赖文件的HDFS路径
上传文件
上传文件
注册jar选择为“上传文件”时,需通过该项上传本地jar文件。
-
指定路径
注册jar选择为“指定路径”时,需通过该项输入依赖文件的HDFS路径(需提前准备好jar包上传至HDFS)。
/flink_upload_test/flink-connector-kafka-customization.jar
描述信息
添加的依赖的描述信息。
-
- 单击“确定”。
使用示例
- 自定义connector依赖
- 参考上传依赖包上传自定义connector依赖。
如上传依赖名称为“kafka”,自定义connector jar包名称为“flink-connector-kafka-customization.jar”。
- 参考创建作业新建SQL作业,该SQL中的“connector”需填写为对应的依赖名称,如'connector' = 'kafka'。
CREATE TABLE KafkaSinkTable (`user_id` INT, `name` VARCHAR) WITH ( 'connector' = 'kafka', 'topic' = 'test_sink6', 'properties.bootstrap.servers' = '192.168.20.134:21005', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TABLE datagen (`user_id` INT, `name` VARCHAR) WITH ( 'connector' = 'datagen', 'rows-per-second' = '5', 'fields.user_id.kind' = 'sequence', 'fields.user_id.start' = '1', 'fields.user_id.end' = '1000' ); insert INTO KafkaSinkTable select * from datagen;
- 参考上传依赖包上传自定义connector依赖。
- 非自定义connector依赖使用样例
参考上传依赖包上传作业依赖的非自定义connector依赖即可。
父主题: 配置开发Flink可视化作业