Using Flink Jar to Read and Write Data from and to DIS
Overview
Read and write data from and to DIS using a Flink Jar job based on Flink 1.12.
In Flink 1.12, Flink Opensource SQL jobs cannot use to read and write data from and to DIS. So, you are advised to use the method described in this section.
Environment Preparations
- You have created an elastic resource pool on the DLI console and created a general-purpose queue with the pool.
    For details, see Creating an Elastic Resource Pool and Creating Queues Within It. 
- You have purchased a DIS stream by referring to .
- If Flink 1.12 is used, the DIS connector version must be 2.0.1 or later. For details about the code, see DIS Flink Connector Dependencies. For details about how to configure the connector, see .
      If data is read from DIS and groupId is configured, you need to create the required app name on the App Management page of the DIS console in advance. 
- Do not place the disToDis.properties file in the generated JAR file. The code contains the path of the file. If the file is placed in the JAR file, the code cannot find the path of the file.
Procedure
- Create a DIS stream. For details, see .
- Create a Flink JAR file.
    In the left navigation pane of the DLI console, choose Data Management > Package Management. On the page displayed, click Create Package to create a Flink JAR file. For details about the example code, see FlinkDisToDisExample.java.Table 1 Main parameters for creating a Flink JAR file Parameter Description Example Value Type Package type. Possible values are as follows: - JAR: JAR file
- PyFile: User Python file
- File: User file
- ModelFile: User AI model file
 JAR OBS Path Select the OBS path of the corresponding package. NOTE:- The program package must be uploaded to OBS in advance.
- Only files can be selected.
 OBS path where Flink Jar is stored Group Name - If Use existing is selected for Group, select an existing group.
- If Use new is selected for Group, enter a custom group name.
- If Do not use is selected for Group, this parameter is unavailable.
 Enter a custom group name or select an existing group name. Figure 2 Creating a Flink JAR file  
- Create the distoDis package.
    In the left navigation pane of the DLI console, choose Data Management > Package Management. On the page displayed, click Create Package to create the package corresponding to the disToDis.properties file. For details about the example code, see Example disToDis.properties.Table 2 Main parameters for creating the package corresponding to disToDis.properties Parameter Description Example Value Type Package type. Possible values are as follows: - JAR: JAR file
- PyFile: User Python file
- File: User file
- ModelFile: User AI model file
 File OBS Path Select the OBS path of the corresponding package. NOTE:- The program package must be uploaded to OBS in advance.
- Only files can be selected.
 OBS path where disToDis.properties is stored. Group Name - If Use existing is selected for Group, select an existing group.
- If Use new is selected for Group, enter a custom group name.
- If Do not use is selected for Group, this parameter is unavailable.
 Enter a custom group name or select an existing group name. Figure 3 Creating the package corresponding to disToDis.properties  
- Create a Flink Jar job and run it.
    For details about how to create a Flink Jar job, see Creating a Flink Jar Job. Select the Flink Jar file created in 2 in the application, select the properties file created in 3 in other dependency files, and specify the main class. Table 3 Parameters for creating a Flink Jar job Parameter Description Example Value Queue NOTE:- A Flink Jar job can run only on a pre-created dedicated queue.
- If no dedicated queue is available in the Queue drop-down list, create a dedicated queue and bind it to the current user.
 Select the queue where the job will run. Application User-defined package. Custom program package Main Class Name of the JAR file to be loaded, for example, FlinkDisToDisExample. Possible values are as follows: - Default: The value is specified based on the Manifest file in the JAR file.
- Manually assign: You must enter the class name and confirm the class arguments (separated by spaces).
            NOTE:When a class belongs to a package, the main class path must contain the complete package path, for example, packagePath.KafkaMessageStreaming. 
 Manually assign Other Dependencies User-defined dependency files. Other dependency files need to be referenced in the code. Before selecting a dependency file, upload the file to the OBS bucket and choose Data Management > Package Management to create a package. The package type is not limited. You can add the following command to the application to access the corresponding dependency file. fileName indicates the name of the file to be accessed, and ClassName indicates the name of the class that needs to access the file. ClassName.class.getClassLoader().getResource("userData/fileName")Select the properties file in 3. Flink Version Before selecting a Flink version, you need to select the queue to which the Flink version belongs. Currently, 1.10, 1.11, and 1.12 are supported. 1.12 Figure 4 Creating a Flink Jar job  
- Verify the result.
When a job is in the Running state, send data to the DIS source stream to check whether the DIS sink stream can receive the data. If there are bytes sent and received, it indicates that data is received.
 
  Example Java Code
- Dependencies of DIS Flink connector
    <dependency> <groupId>com.huaweicloud.dis</groupId> <artifactId>huaweicloud-dis-flink-connector_2.11</artifactId> <version>2.0.1</version> <exclusions> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency>
- POM file configurations
    <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>Flink-dis-12</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!--Flink version--> <flink.version>1.12.2</flink.version> <!--JDK version --> <java.version>1.8</java.version> <!--Scala 2.11 --> <scala.binary.version>2.11</scala.binary.version> <slf4j.version>2.13.3</slf4j.version> <log4j.version>2.10.0</log4j.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- dis --> <dependency> <groupId>com.huaweicloud.dis</groupId> <artifactId>huaweicloud-dis-flink-connector_2.11</artifactId> <version>2.0.1</version> <exclusions> <exclusion> <groupId>org.apache.flink</groupId> <artifactId>*</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> </dependency> <!-- flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- logging --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-jcl</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <archive> <manifest> <mainClass>com.flink.FlinkDisToDisExample</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> <resources> <resource> <directory>../main/config</directory> <filtering>true</filtering> <includes> <include>**/*.*</include> </includes> </resource> </resources> </build> </project>
- FlinkDisToDisExample.java example
    package com.flink; import com.huaweicloud.dis.DISConfig; import com.huaweicloud.dis.adapter.common.consumer.DisConsumerConfig; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.dis.FlinkDisConsumer; import org.apache.flink.streaming.connectors.dis.FlinkDisProducer; import org.apache.flink.util.TernaryBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Collections; /** * Read data from dis and then write them into another dis channel. */ public class FlinkDisToDisExample { private static final Logger LOG = LoggerFactory.getLogger(FlinkDisToDisExample.class); public static void main(String[] args) throws IOException { LOG.info("Read data from dis and write them into dis."); String propertiesPath = FlinkDisToDisExample.class.getClassLoader() .getResource("userData/disToDis.properties").getPath(); ParameterTool params = ParameterTool.fromPropertiesFile(propertiesPath); // DIS endpoint, for example, https://dis.cn-north-1.myhuaweicloud.com String endpoint = params.get("disEndpoint"); // ID of the region where DIS is, for example, cn-north-1 String region = params.get("region"); // AK of the user String ak = params.get("ak"); // SK of the user String sk = params.get("sk"); // Project ID of the user String projectId = params.get("projectId"); // Name of the DIS source stream String sourceChannel = params.get("sourceChannel"); // Name of the DIS sink stream String sinkChannel = params.get("sinkChannel"); // Consumption policy. This policy is used only when the partition has no checkpoint or the checkpoint has expired. If a valid checkpoint exists, the consumption continues from this checkpoint. // When the policy is set to LATEST, the consumption starts from the latest data. This policy will ignore the existing data in the stream. // When the policy is set to EARLIEST, the consumption begins with the earliest available data in the stream. This policy ensures that all valid data in the stream is obtained. String startingOffsets = params.get("startOffset"); // Consumer group ID. Different clients in the same consumer group can consume the same stream at the same time. String groupId = params.get("groupId"); // Checkpoint output path, which is in the format of obs://bucket/path/. String checkpointBucket = params.get("checkpointPath"); // DIS Config DISConfig disConfig = DISConfig.buildDefaultConfig(); disConfig.put(DISConfig.PROPERTY_ENDPOINT, endpoint); disConfig.put(DISConfig.PROPERTY_REGION_ID, region); disConfig.put(DISConfig.PROPERTY_AK, ak); disConfig.put(DISConfig.PROPERTY_SK, sk); disConfig.put(DISConfig.PROPERTY_PROJECT_ID, projectId); disConfig.put(DisConsumerConfig.AUTO_OFFSET_RESET_CONFIG, startingOffsets); disConfig.put(DisConsumerConfig.GROUP_ID_CONFIG, groupId); // Whether to proactively update segment information and the update interval (ms). If proactive scaling is required, enable this function. disConfig.put(FlinkDisConsumer.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10000"); SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); String time = dateTimeFormat.format(System.currentTimeMillis()); String checkpointPath = checkpointBucket + time; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // Set the checkpoint. RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), TernaryBoolean.TRUE); env.setStateBackend(rocksDBStateBackend); // Enable Flink checkpointing. If enabled, the offset information will be synchronized to Kafka. env.enableCheckpointing(180000); // Set the minimum interval between two checkpoints. env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000); // Set the checkpoint timeout duration. env.getCheckpointConfig().setCheckpointTimeout(60000); // Set the maximum number of concurrent checkpoints. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // Retain checkpoints when a job is canceled. env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); FlinkDisConsumer<String> consumer = new FlinkDisConsumer<>( Collections.singletonList(sourceChannel), new SimpleStringSchema(), disConfig); DataStream<String> sourceStream = env.addSource(consumer, "disSource"); FlinkDisProducer<String> producer = new FlinkDisProducer<>( sinkChannel, new SimpleStringSchema(), disConfig); sourceStream.addSink(producer).disableChaining().name("dis-to-dis"); env.execute(); } catch (Exception ex) { LOG.error(ex.getMessage(), ex); } } }
- Example disToDis.properties (Note: The value of groupId is the name of the app created in step 1.)
      Hard-coded or plaintext AK and SK pose significant security risks. To ensure security, encrypt your AK and SK, store them in configuration files or environment variables, and decrypt them when needed. # Endpoint of the site where DIS is deployed, for example, https://dis.cn-north-1.myhuaweicloud.com disEndpoint=xx # ID of the region where DIS is, for example, cn-north-1 region=xx # AK of the user ak=xx # SK of the user sk=xx # Example ID: 6m3nhAGTLxmNfZ4HOit projectId=xx # DIS source stream, for example, OpenSource_outputmTtkR sourceChannel=xx # DIS sink stream, for example, OpenSource_disQFXD sinkChannel=xx # DIS consumer group, which needs to be created in DIS App Management page in advance. groupId=xx # Consumption mode. The value can be EARLIEST or LATEST. startOffset=LATEST # Flink path for storing checkpoints checkpointPath=obs://bucket/path/ 
FAQ
- Q: Why does a job fail to be executed and the run log contain the following error information?
    java.lang.NoSuchMethodError: org.apache.flink.api.java.ClosureCleaner.clean(Ljava/lang/Object;Z)V A: Because the huaweicloud-dis-flink-connector_2.11 version you selected is too early. Select 2.0.1 or later. 
- Q: Why cannot my job read data from DIS and does the TaskManager run log contain the following error information?
    ERROR com.huaweicloud.dis.adapter.common.consumer.Coordinator [] - Failed to getCheckpointAsync, error : [400 : {"errorCode":"DIS.4332","message":"app not found. "}], request : [{"stream_name":"xx","partition_id":"shardId-0000000000","checkpoint_type":"LAST_READ","app_name":"xx"}]A: Because group.id used to read data from DIS is not created on the DIS Application Management page in advance. 
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.
 
     
      