Help Center> Data Lake Insight> Developer Guide> Flink Jar Jobs> Using Flink Jar to Read and Write Data from and to DIS
Updated on 2024-01-09 GMT+08:00

Using Flink Jar to Read and Write Data from and to DIS

Overview

Read and write data from and to DIS using a Flink Jar job based on Flink 1.12.

In Flink 1.12, Flink Opensource SQL jobs cannot use to read and write data from and to DIS. So, you are advised to use the method described in this section.

Environment Preparations

  • You have purchased a general-purpose queue on the DLI console.
  • You have purchased a DIS stream by referring to .
  • If Flink 1.12 is used, the DIS connector version must be 2.0.1 or later. For details about the code, see DIS Flink Connector Dependencies. For details about how to configure the connector, see .

    If data is read from DIS and groupId is configured, you need to create the required app name on the App Management page of the DIS console in advance.

  • Do not place the disToDis.properties file in the generated JAR file. The code contains the path of the file. If the file is placed in the JAR file, the code cannot find the path of the file.

Procedure

  1. Create a DIS stream. For details, see .
    In the left navigation pane of the DIS console, choose App Management. On the page displayed, click Create App. In the Create App dialog box, set Name to the value of groupId.
    Figure 1 Create App
  2. Create a Flink JAR file.
    In the left navigation pane of the DLI console, choose Data Management > Package Management. On the page displayed, click Create Package to create a Flink JAR file. For details about the example code, see FlinkDisToDisExample.java.
    Table 1 Main parameters for creating a Flink JAR file

    Parameter

    Description

    Example Value

    Type

    Package type. Possible values are as follows:

    • JAR: JAR file
    • PyFile: User Python file
    • File: User file
    • ModelFile: User AI model file

    JAR

    OBS Path

    Select the OBS path of the corresponding package.

    NOTE:
    • The program package must be uploaded to OBS in advance.
    • Only files can be selected.

    OBS path where Flink Jar is stored

    Group Name

    • If Use existing is selected for Group, select an existing group.
    • If Use new is selected for Group, enter a custom group name.
    • If Do not use is selected for Group, this parameter is unavailable.

    Enter a custom group name or select an existing group name.

    Figure 2 Creating a Flink JAR file
  3. Create the distoDis package.
    In the left navigation pane of the DLI console, choose Data Management > Package Management. On the page displayed, click Create Package to create the package corresponding to the disToDis.properties file. For details about the example code, see Example disToDis.properties.
    Table 2 Main parameters for creating the package corresponding to disToDis.properties

    Parameter

    Description

    Example Value

    Type

    Package type. Possible values are as follows:

    • JAR: JAR file
    • PyFile: User Python file
    • File: User file
    • ModelFile: User AI model file

    File

    OBS Path

    Select the OBS path of the corresponding package.

    NOTE:
    • The program package must be uploaded to OBS in advance.
    • Only files can be selected.

    OBS path where disToDis.properties is stored.

    Group Name

    • If Use existing is selected for Group, select an existing group.
    • If Use new is selected for Group, enter a custom group name.
    • If Do not use is selected for Group, this parameter is unavailable.

    Enter a custom group name or select an existing group name.

    Figure 3 Creating the package corresponding to disToDis.properties
  4. Create a Flink Jar job and run it.

    For details about how to create a Flink Jar job, see Creating a Flink Jar Job. Select the Flink Jar file created in 2 in the application, select the properties file created in 3 in other dependency files, and specify the main class.

    Table 3 Parameters for creating a Flink Jar job

    Parameter

    Description

    Example Value

    Queue

    NOTE:
    • A Flink Jar job can run only on a pre-created dedicated queue.
    • If no dedicated queue is available in the Queue drop-down list, create a dedicated queue and bind it to the current user.

    Select the queue where the job will run.

    Application

    User-defined package.

    Custom program package

    Main Class

    Name of the JAR file to be loaded, for example, FlinkDisToDisExample. Possible values are as follows:

    • Default: The value is specified based on the Manifest file in the JAR file.
    • Manually assign: You must enter the class name and confirm the class arguments (separated by spaces).
      NOTE:

      When a class belongs to a package, the main class path must contain the complete package path, for example, packagePath.KafkaMessageStreaming.

    Manually assign

    Other Dependencies

    User-defined dependency files. Other dependency files need to be referenced in the code.

    Before selecting a dependency file, upload the file to the OBS bucket and choose Data Management > Package Management to create a package. The package type is not limited.

    You can add the following command to the application to access the corresponding dependency file. fileName indicates the name of the file to be accessed, and ClassName indicates the name of the class that needs to access the file.

    ClassName.class.getClassLoader().getResource("userData/fileName")

    Select the properties file in 3.

    Flink Version

    Before selecting a Flink version, you need to select the queue to which the Flink version belongs. Currently, 1.10, 1.11, and 1.12 are supported.

    1.12

    Figure 4 Creating a Flink Jar job
  5. Verify the result.

When a job is in the Running state, send data to the DIS source stream to check whether the DIS sink stream can receive the data. If there are bytes sent and received, it indicates that data is received.

Figure 5 Viewing the verification result

Example Java Code

  • Dependencies of DIS Flink connector
    <dependency>
                <groupId>com.huaweicloud.dis</groupId>
                <artifactId>huaweicloud-dis-flink-connector_2.11</artifactId>
                <version>2.0.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.logging.log4j</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
  • POM file configurations
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
     
        <groupId>org.example</groupId>
        <artifactId>Flink-dis-12</artifactId>
        <version>1.0-SNAPSHOT</version>
     
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <!--Flink version-->
            <flink.version>1.12.2</flink.version>
            <!--JDK version -->
            <java.version>1.8</java.version>
            <!--Scala 2.11 -->
            <scala.binary.version>2.11</scala.binary.version>
            <slf4j.version>2.13.3</slf4j.version>
            <log4j.version>2.10.0</log4j.version>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
     
        <dependencies>
            <!-- dis -->
            <dependency>
                <groupId>com.huaweicloud.dis</groupId>
                <artifactId>huaweicloud-dis-flink-connector_2.11</artifactId>
                <version>2.0.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.flink</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.apache.logging.log4j</groupId>
                        <artifactId>*</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
     
            <!-- flink -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
     
            <!--  logging  -->
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>${slf4j.version}</version>
                <scope>provided</scope>
            </dependency>
     
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
     
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
     
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-jcl</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
     
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.flink.FlinkDisToDisExample</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
            <resources>
                <resource>
                    <directory>../main/config</directory>
                    <filtering>true</filtering>
                    <includes>
                        <include>**/*.*</include>
                    </includes>
                </resource>
            </resources>
        </build>
    </project>
  • FlinkDisToDisExample.java example
    package com.flink;
    import com.huaweicloud.dis.DISConfig;
    import com.huaweicloud.dis.adapter.common.consumer.DisConsumerConfig;
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.dis.FlinkDisConsumer;
    import org.apache.flink.streaming.connectors.dis.FlinkDisProducer;
    import org.apache.flink.util.TernaryBoolean;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
     
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Collections;
     
    /**
     * Read data from dis and then write them into another dis channel.
     */
    public class FlinkDisToDisExample {
        private static final Logger LOG = LoggerFactory.getLogger(FlinkDisToDisExample.class);
     
        public static void main(String[] args) throws IOException {
            LOG.info("Read data from dis and write them into dis.");
            String propertiesPath = FlinkDisToDisExample.class.getClassLoader()
                .getResource("userData/disToDis.properties").getPath();
            ParameterTool params = ParameterTool.fromPropertiesFile(propertiesPath);
     
            // DIS endpoint, for example, https://dis.cn-north-1.myhuaweicloud.com
            String endpoint = params.get("disEndpoint");
            // ID of the region where DIS is, for example, cn-north-1
            String region = params.get("region");
            // AK of the user
            String ak = params.get("ak");
            // SK of the user
            String sk = params.get("sk");
            // Project ID of the user
            String projectId = params.get("projectId");
            // Name of the DIS source stream
            String sourceChannel = params.get("sourceChannel");
            // Name of the DIS sink stream
            String sinkChannel = params.get("sinkChannel");
            // Consumption policy. This policy is used only when the partition has no checkpoint or the checkpoint has expired. If a valid checkpoint exists, the consumption continues from this checkpoint.
            // When the policy is set to LATEST, the consumption starts from the latest data. This policy will ignore the existing data in the stream.
            // When the policy is set to EARLIEST, the consumption starts from the earliest data. This policy will obtain all valid data in the stream.
            String startingOffsets = params.get("startOffset");
            // Consumer group ID. Different clients in the same consumer group can consume the same stream at the same time.
            String groupId = params.get("groupId");
            // Checkpoint output path, which is in the format of obs://bucket/path/.
            String checkpointBucket = params.get("checkpointPath");
     
            // DIS Config
            DISConfig disConfig = DISConfig.buildDefaultConfig();
            disConfig.put(DISConfig.PROPERTY_ENDPOINT, endpoint);
            disConfig.put(DISConfig.PROPERTY_REGION_ID, region);
            disConfig.put(DISConfig.PROPERTY_AK, ak);
            disConfig.put(DISConfig.PROPERTY_SK, sk);
            disConfig.put(DISConfig.PROPERTY_PROJECT_ID, projectId);
            disConfig.put(DisConsumerConfig.AUTO_OFFSET_RESET_CONFIG, startingOffsets);
            disConfig.put(DisConsumerConfig.GROUP_ID_CONFIG, groupId);
            // Whether to proactively update segment information and the update interval (ms). If proactive scaling is required, enable this function.
            disConfig.put(FlinkDisConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10000");
     
            SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
            String time = dateTimeFormat.format(System.currentTimeMillis());
            String checkpointPath = checkpointBucket + time;
     
            try {
                StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
                // Set the checkpoint.
                RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new FsStateBackend(checkpointPath),
                    TernaryBoolean.TRUE);
                env.setStateBackend(rocksDBStateBackend);
                // Enable Flink checkpointing. If enabled, the offset information will be synchronized to Kafka.
                env.enableCheckpointing(180000);
                // Set the minimum interval between two checkpoints.
                env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000);
                // Set the checkpoint timeout duration.
                env.getCheckpointConfig().setCheckpointTimeout(60000);
                // Set the maximum number of concurrent checkpoints.
                env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
                // Retain checkpoints when a job is canceled.
                env.getCheckpointConfig().enableExternalizedCheckpoints(
                    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
     
                FlinkDisConsumer<String> consumer =
                    new FlinkDisConsumer<>(
                        Collections.singletonList(sourceChannel), new SimpleStringSchema(), disConfig);
                DataStream<String> sourceStream = env.addSource(consumer, "disSource");
     
                FlinkDisProducer<String> producer = new FlinkDisProducer<>(
                    sinkChannel, new SimpleStringSchema(), disConfig);
                sourceStream.addSink(producer).disableChaining().name("dis-to-dis");
     
                env.execute();
            } catch (Exception ex) {
                LOG.error(ex.getMessage(), ex);
            }
        }
     }
  • Example disToDis.properties (Note: The value of groupId is the name of the app created in step 1.)

    Hard-coded or plaintext AK and SK pose significant security risks. To ensure security, encrypt your AK and SK, store them in configuration files or environment variables, and decrypt them when needed.

    # Endpoint of the site where DIS is deployed, for example, https://dis.cn-north-1.myhuaweicloud.com
    disEndpoint=xx
    # ID of the region where DIS is, for example, cn-north-1
    region=xx
    # AK of the user
    ak=xx
    # SK of the user
    sk=xx 
    # Example ID: 6m3nhAGTLxmNfZ4HOit
    projectId=xx      
    # DIS source stream, for example, OpenSource_outputmTtkR
    sourceChannel=xx
    # DIS sink stream, for example, OpenSource_disQFXD
    sinkChannel=xx
    # DIS consumer group, which needs to be created in DIS App Management page in advance.
    groupId=xx
    # Consumption mode. The value can be EARLIEST or LATEST.
    startOffset=LATEST
    # Flink path for storing checkpoints
    checkpointPath=obs://bucket/path/

FAQ

  • Q: Why does a job fail to be executed and the run log contain the following error information?
    java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V

    A: Because the huaweicloud-dis-flink-connector_2.11 version you selected is too early. Select 2.0.1 or later.

  • Q: Why cannot my job read data from DIS and does the TaskManager run log contain the following error information?
    ERROR com.huaweicloud.dis.adapter.common.consumer.Coordinator      [] - Failed to getCheckpointAsync, error : [400 : {"errorCode":"DIS.4332","message":"app not found. "}], request : [{"stream_name":"xx","partition_id":"shardId-0000000000","checkpoint_type":"LAST_READ","app_name":"xx"}]

    A: Because group.id used to read data from DIS is not created on the DIS Application Management page in advance.