使用Flink Jar读写DIS开发指南
概述
本节操作介绍基于Flink 1.12版本的Flink Jar作业读写DIS数据的操作方法。
Flink 1.12版本Flink Opensource SQL作业不支持使用DLI提供的connector读写DIS,因此推荐您使用本节操作提供的方法。
Flink 1.15不再推荐使用DIS服务, 建议搭配DMS kafka使用。请参考Kafka connector。
环境准备
- 已在DLI控制台购买了通用队列。
- 已购买了DIS通道。开通DIS通道。
- 用户在使用Flink 1.12版本,则依赖的Dis connector版本需要不低于2.0.1,详细代码参考DISFlinkConnector相关依赖,如何配置connector,详细参考自定义Flink Streaming作业。
若读取DIS,且配置groupId,则需要提前在DIS的“App管理”中创建所需的App名称。
- 请勿将disToDis.properties放在生成的jar包中,在代码里有关于disToDis.properties的路径,如果放在jar包中,代码会找不到disToDis.properties路径。
操作步骤
- 创建所需要的DIS通道,具体流程可参开通DIS通道。
在DIS控制台,打开“App管理 > 创建App”,填写App名称,App名称对应的是代码中的groupId。图1 创建App
- 创建Flink Jar对应的程序包。
在DLI控制台,打开“数据管理 > 程序包管理”,单击“创建程序包”,创建Flink Jar对应的程序包。代码样例请参考FlinkDisToDisExample.java样例。
表1 创建Flink Jar对应的程序包主要参数说明 参数名称
说明
示例
包类型
支持的包类型如下:
- JAR:用户jar文件
- PyFile:用户Python文件
- File:用户文件
- ModelFile:用户AI模型文件
JAR
OBS路径
选择对应程序包的OBS路径。
说明:- 程序包需提前上传至OBS服务中保存。
- 只支持选择文件。
Flink Jar所在的OBS路径
分组名称
- 选择“已有分组”:可选择已有的分组。
- 选择“创建新分组”:可输入自定义的组名称。
- 选择“不分组”:不需要选择或输入组名称。
自定义分组或选择已有的分组名称。
图2 创建对应的Flink Jar包
- 创建distoDis对应的包。
在DLI控制台,打开“数据管理 > 程序包管理”,单击“创建程序包”,创建disToDis.properties对应的程序包。代码样例请参考disToDis.properties样例。
表2 创建disToDis.properties对应的程序包主要参数说明 参数名称
说明
示例
包类型
支持的包类型如下:
- JAR:用户jar文件
- PyFile:用户Python文件
- File:用户文件
- ModelFile:用户AI模型文件
File
OBS路径
选择对应程序包的OBS路径。
说明:- 程序包需提前上传至OBS服务中保存。
- 只支持选择文件。
disToDis.properties所在的OBS路径。
分组名称
- 选择“已有分组”:可选择已有的分组。
- 选择“创建新分组”:可输入自定义的组名称。
- 选择“不分组”:不需要选择或输入组名称。
自定义分组或选择已有的分组名称。
图3 创建disToDis.properties对应的程序包
- 创建Flink Jar作业并运行。
详情参考创建Flink Jar作业。在应用程序中选择步骤2中创建的Flink Jar文件,在其他依赖文件中选择步骤3中创建的properties文件,并指定主类。
表3 创建Flink Jar作业参数说明 参数
说明
示例
所属队列
说明:- Flink Jar作业只能运行在预先创建的独享队列上。
- 如果“所属队列”下拉框中无可用的独享队列,请先创建一个独享队列并将该队列绑定到当前用户
选择Flink Jar作业运行的队列
应用程序
用户自定义的程序包
自定义的程序包
主类
指定加载的Jar包类名,如FlinkDisToDisExample。
- 默认:根据Jar包文件的Manifest文件指定。
- 指定:必须输入“类名”并确定类参数列表(参数间用空格分隔)
说明:
当类属于某个包时,主类路径需要包含完整包路径,例如:packagePath.KafkaMessageStreaming
指定
其他依赖文件
用户自定义的依赖文件。其他依赖文件需要自行在代码中引用。
在选择依赖文件之前需要将对应的文件上传至OBS桶中,并在“数据管理>程序包管理”中创建程序包,包类型没有限制。具体操作请参考创建程序包。
通过在应用程序中添加以下内容可访问对应的依赖文件。其中,“fileName”为需要访问的文件名,“ClassName”为需要访问该文件的类名。
ClassName.class.getClassLoader().getResource("userData/fileName")
选择3的properties文件。
Flink版本
选择Flink版本前,需要先选择所属的队列。当前支持“1.10”和“1.11”和“1.12”。
1.12
图4 创建Flink Jar作业
- 结果校验。
作业处于运行中状态时,向DIS的source通道发送数据,验证DIS的sink通道能否收到数据。发送和接受都有字节数证明接收到数据。
JAVA样例代码
- DIS Flink Connector相关依赖
<dependency> <groupId>com.huaweicloud.dis</groupId> <artifactId>huaweicloud-dis-flink-connector_2.11</artifactId> <version>2.0.1</version> <exclusions> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
- pom文件配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>Flink-dis-12</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!--Flink 版本--> <flink.version>1.12.2</flink.version> <!--JDK 版本--> <java.version>1.8</java.version> <!--Scala 2.11 版本--> <scala.binary.version>2.11</scala.binary.version> <slf4j.version>2.13.3</slf4j.version> <log4j.version>2.10.0</log4j.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- dis --> <dependency> <groupId>com.huaweicloud.dis</groupId> <artifactId>huaweicloud-dis-flink-connector_2.11</artifactId> <version>2.0.1</version> <exclusions> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <!-- flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- logging --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-jcl</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <archive> <manifest> <mainClass>com.flink.FlinkDisToDisExample</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> <resources> <resource> <directory>../main/config</directory> <filtering>true</filtering> <includes> <include>**/*.*</include> </includes> </resource> </resources> </build> </project>
- FlinkDisToDisExample.java样例(可参考自定义Flink Streaming作业)
package com.flink; import com.huaweicloud.dis.DISConfig; import com.huaweicloud.dis.adapter.common.consumer.DisConsumerConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.dis.FlinkDisConsumer; import org.apache.flink.streaming.connectors.dis.FlinkDisProducer; import org.apache.flink.util.TernaryBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Collections; /** * Read data from dis and then write them into another dis channel. */ public class FlinkDisToDisExample { private static final Logger LOG = LoggerFactory.getLogger(FlinkDisToDisExample.class); public static void main(String[] args) throws IOException { LOG.info("Read data from dis and write them into dis."); String propertiesPath = FlinkDisToDisExample.class.getClassLoader() .getResource("userData/disToDis.properties").getPath(); ParameterTool params = ParameterTool.fromPropertiesFile(propertiesPath); // DIS终端节点,如 https://dis.cn-north-1.myhuaweicloud.com String endpoint = params.get("disEndpoint"); // DIS服务所在区域ID,如 cn-north-1 String region = params.get("region"); // 用户的AK String ak = params.get("ak"); // 用户的SK String sk = params.get("sk"); // 用户的项目ID String projectId = params.get("projectId"); // 作为source的DIS通道名称 String sourceChannel = params.get("sourceChannel"); // 作为sink的DIS通道名称 String sinkChannel = params.get("sinkChannel"); // 消费策略,只有当分区没有Checkpoint或者Checkpoint过期时,才会使用此配置的策略;如果存在有效的Checkpoint,则会从此Checkpoint开始继续消费 // 取值有: LATEST 从最新的数据开始消费,此策略会忽略通道中已有数据 // EARLIEST 从最老的数据开始消费,此策略会获取通道中所有的有效数据 String startingOffsets = params.get("startOffset"); // 消费组标识,同一个消费组下的不同客户端可以同时消费同一个通道 String groupId = params.get("groupId"); // Checkpoint输出路径,格式obs://bucket/path/ String checkpointBucket = params.get("checkpointPath"); // DIS Config DISConfig disConfig = DISConfig.buildDefaultConfig(); disConfig.put(DISConfig.PROPERTY_ENDPOINT, endpoint); disConfig.put(DISConfig.PROPERTY_REGION_ID, region); disConfig.put(DISConfig.PROPERTY_AK, ak); disConfig.put(DISConfig.PROPERTY_SK, sk); disConfig.put(DISConfig.PROPERTY_PROJECT_ID, projectId); disConfig.put(DisConsumerConfig.AUTO_OFFSET_RESET_CONFIG, startingOffsets); disConfig.put(DisConsumerConfig.GROUP_ID_CONFIG, groupId); // 是否主动更新分片信息及更新时间间隔(毫秒),若有主动扩缩容需求,可以开启 disConfig.put(FlinkDisConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10000"); SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); String time = dateTimeFormat.format(System.currentTimeMillis()); String checkpointPath = checkpointBucket + time; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置checkpoint RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), TernaryBoolean.TRUE); env.setStateBackend(rocksDBStateBackend); // 开启Flink CheckPoint配置,开启时若触发CheckPoint,会将Offset信息同步到Kafka env.enableCheckpointing(180000); // 设置两次checkpoint的最小间隔时间 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000); // 设置checkpoint超时时间 env.getCheckpointConfig().setCheckpointTimeout(60000); // 设置checkpoint最大并发数 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 设置作业取消时保留checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); FlinkDisConsumer<String> consumer = new FlinkDisConsumer<>( Collections.singletonList(sourceChannel), new SimpleStringSchema(), disConfig); DataStream<String> sourceStream = env.addSource(consumer, "disSource"); FlinkDisProducer<String> producer = new FlinkDisProducer<>( sinkChannel, new SimpleStringSchema(), disConfig); sourceStream.addSink(producer).disableChaining().name("dis-to-dis"); env.execute(); } catch (Exception ex) { LOG.error(ex.getMessage(), ex); } } }
- disToDis.properties样例(注意:groupId的值为步骤1中创建的App名称)
认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。
# dis所在局点的endpoint,如 https://dis.cn-north-1.myhuaweicloud.com disEndpoint=xx # DIS服务所在区域ID,如 cn-north-1 region=xx # 用户的AK ak=xx # 用户的SK sk=xx # id示例:6m3nhAGTLxmNfZ4HOit projectId=xx # 作为source的dis通道,如:OpenSource_outputmTtkR sourceChannel=xx # 作为sink的dis通道,如:OpenSource_disQFXD sinkChannel=xx # dis消费组,需要提前在dis的Apps中创建 groupId=xx # 消费模式,从最开始消费或者EARLIEST从最新开始消费LATEST startOffset=LATEST # flink保存checkpoint的路径 checkpointPath=obs://bucket/path/
常见问题
- Q:作业运行失败,运行日志中有如下报错信息,应该怎么解决?
java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V
A:该问题是因为所选择的huaweicloud-dis-flink-connector_2.11版本过低导致,请选择2.0.1及以上版本。
- Q:运行作业读取DIS数据时,无法读出数据且Taskmanager的运行日志中有如下报错信息,应该怎么解决?
ERROR com.huaweicloud.dis.adapter.common.consumer.Coordinator [] - Failed to getCheckpointAsync, error : [400 : {"errorCode":"DIS.4332","message":"app not found. "}], request : [{"stream_name":"xx","partition_id":"shardId-0000000000","checkpoint_type":"LAST_READ","app_name":"xx"}]
A: 该问题是因为读取DIS数据所使用的group.id在DIS的Apps中并没有提前创建。