更新时间:2024-01-09 GMT+08:00

使用Flink Jar读写DIS开发指南

概述

本节操作介绍基于Flink 1.12版本的Flink Jar作业读写DIS数据的操作方法。

Flink 1.12版本Flink Opensource SQL作业不支持使用DLI提供的connector读写DIS,因此推荐您使用本节操作提供的方法。

环境准备

  • 已在DLI控制台购买了通用队列。
  • 已购买了DIS通道。开通DIS通道
  • 用户在使用Flink 1.12版本,则依赖的Dis connector版本需要不低于2.0.1,详细代码参考DISFlinkConnector相关依赖,如何配置connector,详细参考自定义Flink Streaming作业

    若读取DIS,且配置groupId,则需要提前在DIS的“App管理”中创建所需的App名称。

  • 请勿将disToDis.properties放在生成的jar包中,在代码里有关于disToDis.properties的路径,如果放在jar包中,代码会找不到disToDis.properties路径。

操作步骤

  1. 创建所需要的DIS通道,具体流程可参开通DIS通道
    在DIS控制台,打开“App管理 > 创建App”,填写App名称,App名称对应的是代码中的groupId。
    图1 创建App
  2. 创建Flink Jar对应的程序包。
    在DLI控制台,打开“数据管理 > 程序包管理”,单击“创建程序包”,创建Flink Jar对应的程序包。代码样例请参考FlinkDisToDisExample.java样例。
    表1 创建Flink Jar对应的程序包主要参数说明

    参数名称

    说明

    示例

    包类型

    支持的包类型如下:

    • JAR:用户jar文件
    • PyFile:用户Python文件
    • File:用户文件
    • ModelFile:用户AI模型文件

    JAR

    OBS路径

    选择对应程序包的OBS路径。

    说明:
    • 程序包需提前上传至OBS服务中保存。
    • 只支持选择文件。

    Flink Jar所在的OBS路径

    分组名称

    • 选择“已有分组”:可选择已有的分组。
    • 选择“创建新分组”:可输入自定义的组名称。
    • 选择“不分组”:不需要选择或输入组名称。

    自定义分组或选择已有的分组名称。

    图2 创建对应的Flink Jar包
  3. 创建distoDis对应的包。
    在DLI控制台,打开“数据管理 > 程序包管理”,单击“创建程序包”,创建disToDis.properties对应的程序包。代码样例请参考disToDis.properties样例
    表2 创建disToDis.properties对应的程序包主要参数说明

    参数名称

    说明

    示例

    包类型

    支持的包类型如下:

    • JAR:用户jar文件
    • PyFile:用户Python文件
    • File:用户文件
    • ModelFile:用户AI模型文件

    File

    OBS路径

    选择对应程序包的OBS路径。

    说明:
    • 程序包需提前上传至OBS服务中保存。
    • 只支持选择文件。

    disToDis.properties所在的OBS路径。

    分组名称

    • 选择“已有分组”:可选择已有的分组。
    • 选择“创建新分组”:可输入自定义的组名称。
    • 选择“不分组”:不需要选择或输入组名称。

    自定义分组或选择已有的分组名称。

    图3 创建disToDis.properties对应的程序包
  4. 创建Flink Jar作业并运行。

    详情参考创建Flink Jar作业。在应用程序中选择步骤2中创建的Flink Jar文件,在其他依赖文件中选择步骤3中创建的properties文件,并指定主类。

    表3 创建Flink Jar作业参数说明

    参数

    说明

    示例

    所属队列

    说明:
    • Flink Jar作业只能运行在预先创建的独享队列上。
    • 如果“所属队列”下拉框中无可用的独享队列,请先创建一个独享队列并将该队列绑定到当前用户

    选择Flink Jar作业运行的队列

    应用程序

    用户自定义的程序包

    自定义的程序包

    主类

    指定加载的Jar包类名,如FlinkDisToDisExample。

    • 默认:根据Jar包文件的Manifest文件指定。
    • 指定:必须输入“类名”并确定类参数列表(参数间用空格分隔)
      说明:

      当类属于某个包时,主类路径需要包含完整包路径,例如:packagePath.KafkaMessageStreaming

    指定

    其他依赖文件

    用户自定义的依赖文件。其他依赖文件需要自行在代码中引用。

    在选择依赖文件之前需要将对应的文件上传至OBS桶中,并在“数据管理>程序包管理”中创建程序包,包类型没有限制。具体操作请参考创建程序包

    通过在应用程序中添加以下内容可访问对应的依赖文件。其中,“fileName”为需要访问的文件名,“ClassName”为需要访问该文件的类名。

    ClassName.class.getClassLoader().getResource("userData/fileName")

    选择3的properties文件。

    Flink版本

    选择Flink版本前,需要先选择所属的队列。当前支持“1.10”和“1.11”和“1.12”。

    1.12

    图4 创建Flink Jar作业
  5. 结果校验。

作业处于运行中状态时,向DIS的source通道发送数据,验证DIS的sink通道能否收到数据。发送和接受都有字节数证明接收到数据。

图5 查看校验结果

JAVA样例代码

  • DIS Flink Connector相关依赖
    <dependency>
                <groupId>com.huaweicloud.dis</groupId>
                <artifactId>huaweicloud-dis-flink-connector_2.11</artifactId>
                <version>2.0.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.logging.log4j</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
  • pom文件配置
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
     
        <groupId>org.example</groupId>
        <artifactId>Flink-dis-12</artifactId>
        <version>1.0-SNAPSHOT</version>
     
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <!--Flink 版本-->
            <flink.version>1.12.2</flink.version>
            <!--JDK 版本-->
            <java.version>1.8</java.version>
            <!--Scala 2.11 版本-->
            <scala.binary.version>2.11</scala.binary.version>
            <slf4j.version>2.13.3</slf4j.version>
            <log4j.version>2.10.0</log4j.version>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
     
        <dependencies>
            <!-- dis -->
            <dependency>
                <groupId>com.huaweicloud.dis</groupId>
                <artifactId>huaweicloud-dis-flink-connector_2.11</artifactId>
                <version>2.0.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.logging.log4j</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
     
            <!-- flink -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
     
            <!--  logging  -->
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>${slf4j.version}</version>
                <scope>provided</scope>
            </dependency>
     
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
     
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
     
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-jcl</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
     
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.flink.FlinkDisToDisExample</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
            <resources>
                <resource>
                    <directory>../main/config</directory>
                    <filtering>true</filtering>
                    <includes>
                        <include>**/*.*</include>
                    </includes>
                </resource>
            </resources>
        </build>
    </project>
  • FlinkDisToDisExample.java样例(可参考自定义Flink Streaming作业
    package com.flink;
    import com.huaweicloud.dis.DISConfig;
    import com.huaweicloud.dis.adapter.common.consumer.DisConsumerConfig;
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.dis.FlinkDisConsumer;
    import org.apache.flink.streaming.connectors.dis.FlinkDisProducer;
    import org.apache.flink.util.TernaryBoolean;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
     
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Collections;
     
    /**
     * Read data from dis and then write them into another dis channel.
     */
    public class FlinkDisToDisExample {
        private static final Logger LOG = LoggerFactory.getLogger(FlinkDisToDisExample.class);
     
        public static void main(String[] args) throws IOException {
            LOG.info("Read data from dis and write them into dis.");
            String propertiesPath = FlinkDisToDisExample.class.getClassLoader()
                .getResource("userData/disToDis.properties").getPath();
            ParameterTool params = ParameterTool.fromPropertiesFile(propertiesPath);
     
            // DIS终端节点,如 https://dis.cn-north-1.myhuaweicloud.com
            String endpoint = params.get("disEndpoint");
            // DIS服务所在区域ID,如 cn-north-1
            String region = params.get("region");
            // 用户的AK
            String ak = params.get("ak");
            // 用户的SK
            String sk = params.get("sk");
            // 用户的项目ID
            String projectId = params.get("projectId");
            // 作为source的DIS通道名称
            String sourceChannel = params.get("sourceChannel");
            // 作为sink的DIS通道名称
            String sinkChannel = params.get("sinkChannel");
            // 消费策略,只有当分区没有Checkpoint或者Checkpoint过期时,才会使用此配置的策略;如果存在有效的Checkpoint,则会从此Checkpoint开始继续消费
            // 取值有: LATEST      从最新的数据开始消费,此策略会忽略通道中已有数据
            //         EARLIEST    从最老的数据开始消费,此策略会获取通道中所有的有效数据
            String startingOffsets = params.get("startOffset");
            // 消费组标识,同一个消费组下的不同客户端可以同时消费同一个通道
            String groupId = params.get("groupId");
            // Checkpoint输出路径,格式obs://bucket/path/
            String checkpointBucket = params.get("checkpointPath");
     
            // DIS Config
            DISConfig disConfig = DISConfig.buildDefaultConfig();
            disConfig.put(DISConfig.PROPERTY_ENDPOINT, endpoint);
            disConfig.put(DISConfig.PROPERTY_REGION_ID, region);
            disConfig.put(DISConfig.PROPERTY_AK, ak);
            disConfig.put(DISConfig.PROPERTY_SK, sk);
            disConfig.put(DISConfig.PROPERTY_PROJECT_ID, projectId);
            disConfig.put(DisConsumerConfig.AUTO_OFFSET_RESET_CONFIG, startingOffsets);
            disConfig.put(DisConsumerConfig.GROUP_ID_CONFIG, groupId);
            // 是否主动更新分片信息及更新时间间隔(毫秒),若有主动扩缩容需求,可以开启
            disConfig.put(FlinkDisConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10000");
     
            SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
            String time = dateTimeFormat.format(System.currentTimeMillis());
            String checkpointPath = checkpointBucket + time;
     
            try {
                StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
                // 设置checkpoint
                RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new FsStateBackend(checkpointPath),
                    TernaryBoolean.TRUE);
                env.setStateBackend(rocksDBStateBackend);
                // 开启Flink CheckPoint配置,开启时若触发CheckPoint,会将Offset信息同步到Kafka
                env.enableCheckpointing(180000);
                // 设置两次checkpoint的最小间隔时间
                env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000);
                // 设置checkpoint超时时间
                env.getCheckpointConfig().setCheckpointTimeout(60000);
                // 设置checkpoint最大并发数
                env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
                // 设置作业取消时保留checkpoint
                env.getCheckpointConfig().enableExternalizedCheckpoints(
                    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
     
                FlinkDisConsumer<String> consumer =
                    new FlinkDisConsumer<>(
                        Collections.singletonList(sourceChannel), new SimpleStringSchema(), disConfig);
                DataStream<String> sourceStream = env.addSource(consumer, "disSource");
     
                FlinkDisProducer<String> producer = new FlinkDisProducer<>(
                    sinkChannel, new SimpleStringSchema(), disConfig);
                sourceStream.addSink(producer).disableChaining().name("dis-to-dis");
     
                env.execute();
            } catch (Exception ex) {
                LOG.error(ex.getMessage(), ex);
            }
        }
     }
  • disToDis.properties样例(注意:groupId的值为步骤1中创建的App名称)

    认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全。

    # dis所在局点的endpoint,如 https://dis.cn-north-1.myhuaweicloud.com
    disEndpoint=xx
    # DIS服务所在区域ID,如 cn-north-1
    region=xx
    # 用户的AK
    ak=xx
    # 用户的SK
    sk=xx 
    # id示例:6m3nhAGTLxmNfZ4HOit
    projectId=xx      
    # 作为source的dis通道,如:OpenSource_outputmTtkR
    sourceChannel=xx
    # 作为sink的dis通道,如:OpenSource_disQFXD
    sinkChannel=xx
    # dis消费组,需要提前在dis的Apps中创建
    groupId=xx
    # 消费模式,从最开始消费或者EARLIEST从最新开始消费LATEST
    startOffset=LATEST
    # flink保存checkpoint的路径
    checkpointPath=obs://bucket/path/

常见问题

  • Q:作业运行失败,运行日志中有如下报错信息,应该怎么解决?
    java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V

    A:该问题是因为所选择的huaweicloud-dis-flink-connector_2.11版本过低导致,请选择2.0.1及以上版本。

  • Q:运行作业读取DIS数据时,无法读出数据且Taskmanager的运行日志中有如下报错信息,应该怎么解决?
    ERROR com.huaweicloud.dis.adapter.common.consumer.Coordinator      [] - Failed to getCheckpointAsync, error : [400 : {"errorCode":"DIS.4332","message":"app not found. "}], request : [{"stream_name":"xx","partition_id":"shardId-0000000000","checkpoint_type":"LAST_READ","app_name":"xx"}]

    A: 该问题是因为读取DIS数据所使用的group.id在DIS的Apps中并没有提前创建。