文档首页> 数据湖探索 DLI> 开发指南> Flink Jar作业开发指南> 使用Flink Jar写入数据到OBS开发指南
更新时间:2024-07-04 GMT+08:00

使用Flink Jar写入数据到OBS开发指南

概述

DLI提供了使用自定义Jar运行Flink作业并将数据写入到OBS的能力。本章节JAVA样例代码演示将kafka数据处理后写入到OBS,具体参数配置请根据实际环境修改。

环境准备

已安装和配置IntelliJ IDEA等开发工具以及安装JDK和Maven。

  • Maven工程的pom.xml文件配置请参考JAVA样例代码(Flink 1.12)中“pom文件配置”说明。
  • 确保本地编译环境可以正常访问公网。

约束与限制

  • 需要在DLI控制台下“全局配置 > 服务授权”开启Tenant Adminstrator(全局服务)。
  • 写入数据到OBS的桶必须为主账号下所创建的OBS桶。
  • 使用Flink1.15版本的计算引擎时,需要用户自行配置委托,否则可能影响作业运行。

    详细操作请参考自定义DLI委托权限

Java样例代码(Flink 1.15)

  • pom文件配置
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>com.huaweicloud</groupId>
            <artifactId>dli-flink-demo</artifactId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <groupId>org.example</groupId>
        <artifactId>flink-1.15-demo</artifactId>
        <properties>
            <flink.version>1.15.0</flink.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_2.12</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.14.2</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.huawei.dli.GetUserConfigFileDemo</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
            <resources>
                <resource>
                    <directory>src/main/resources</directory>
                    <filtering>true</filtering>
                    <includes>
                        <include>**/*.*</include>
                    </includes>
                </resource>
            </resources>
        </build>
    </project>
    
  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    package com.huawei.dli;
    
    import com.huawei.dli.source.CustomParallelSource;
    
    import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.File;
    import java.io.IOException;
    import java.net.URL;
    import java.time.LocalDateTime;
    import java.time.ZoneOffset;
    import java.time.format.DateTimeFormatter;
    
    public class GetUserConfigFileDemo {
        private static final Logger LOG = LoggerFactory.getLogger(GetUserConfigFileDemo.class);
    
        public static void main(String[] args) {
            try {
                ParameterTool params = ParameterTool.fromArgs(args);
                LOG.info("Params: " + params.toString());
    
                StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    
                // set checkpoint
                String checkpointPath = params.get("checkpoint.path", "obs://bucket/checkpoint/jobId_jobName/");
                LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(System.currentTimeMillis() / 1000,
                    0, ZoneOffset.ofHours(8));
                String dt = localDateTime.format(DateTimeFormatter.ofPattern("yyyyMMdd_HH:mm:ss"));
                checkpointPath = checkpointPath + dt;
    
                streamEnv.setStateBackend(new EmbeddedRocksDBStateBackend());
                streamEnv.getCheckpointConfig().setCheckpointStorage(checkpointPath);
                streamEnv.getCheckpointConfig().setExternalizedCheckpointCleanup(
                    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
                streamEnv.enableCheckpointing(30 * 1000);
    
                DataStream<String> stream = streamEnv.addSource(new CustomParallelSource())
                    .setParallelism(1)
                    .disableChaining();
    
                String outputPath = params.get("output.path", "obs://bucket/outputPath/jobId_jobName");
    
                // Get user dependents config
                URL url = GetUserConfigFileDemo.class.getClassLoader().getResource("userData/user.config");
                if (url != null) {
                    Path filePath = org.apache.flink.util.FileUtils.absolutizePath(new Path(url.getPath()));
                    try {
                        String config = org.apache.flink.util.FileUtils.readFileUtf8(new File(filePath.getPath()));
                        LOG.info("config is {}", config);
                        // Do something by config
                    } catch (IOException e) {
                        LOG.error(e.getMessage(), e);
                    }
                }
    
                // Sink OBS
                final StreamingFileSink<String> sinkForRow = StreamingFileSink
                    .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                    .withRollingPolicy(OnCheckpointRollingPolicy.build())
                    .build();
    
                stream.addSink(sinkForRow);
    
                streamEnv.execute("sinkForRow");
            } catch (Throwable e) {
                LOG.error(e.getMessage(), e);
            }
        }
    }
    
    表1 参数说明

    参数名

    具体含义

    举例

    output.path

    数据写入的OBS路径

    obs://bucket/output

    checkpoint.path

    checkpoint的OBS路径

    obs://bucket/checkpoint

JAVA样例代码(Flink 1.12)

  • pom文件配置
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>Flink-demo</artifactId>
            <groupId>com.huaweicloud</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>flink-kafka-to-obs</artifactId>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <!--Flink 版本-->
            <flink.version>1.12.2</flink.version>
            <!--JDK 版本-->
            <java.version>1.8</java.version>
            <!--Scala 2.11 版本-->
            <scala.binary.version>2.11</scala.binary.version>
            <slf4j.version>2.13.3</slf4j.version>
            <log4j.version>2.10.0</log4j.version>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <!-- flink -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <!--  kafka  -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!--  logging  -->
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>${slf4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-jcl</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.huaweicloud.dli.FlinkKafkaToObsExample</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
            <resources>
                <resource>
                    <directory>../main/config</directory>
                    <filtering>true</filtering>
                    <includes>
                        <include>**/*.*</include>
                    </includes>
                </resource>
            </resources>
        </build>
    </project>
    
  • 示例代码
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    import org.apache.flink.api.common.serialization.SimpleStringEncoder; 
    import org.apache.flink.api.common.serialization.SimpleStringSchema; 
    import org.apache.flink.api.java.utils.ParameterTool; 
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; 
    import org.apache.flink.core.fs.Path; 
    import org.apache.flink.runtime.state.filesystem.FsStateBackend; 
    import org.apache.flink.streaming.api.datastream.DataStream; 
    import org.apache.flink.streaming.api.environment.CheckpointConfig; 
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; 
    import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; 
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; 
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; 
    import org.apache.kafka.clients.consumer.ConsumerConfig; 
    import org.slf4j.Logger; 
    import org.slf4j.LoggerFactory; 
    
    import java.util.Properties; 
    
    /** 
     * @author xxx 
     * @date 6/26/21 
     */ 
    public class FlinkKafkaToObsExample { 
        private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaToObsExample.class); 
    
        public static void main(String[] args) throws Exception { 
            LOG.info("Start Kafka2OBS Flink Streaming Source Java Demo."); 
            ParameterTool params = ParameterTool.fromArgs(args); 
            LOG.info("Params: " + params.toString()); 
    
            // Kafka连接地址 
            String bootstrapServers; 
            // Kafka消费组 
            String kafkaGroup; 
            // Kafka topic 
            String kafkaTopic; 
            // 消费策略,只有当分区没有Checkpoint或者Checkpoint过期时,才会使用此配置的策略; 
            //          如果存在有效的Checkpoint,则会从此Checkpoint开始继续消费 
            // 取值有: LATEST,从最新的数据开始消费,此策略会忽略通道中已有数据 
            //         EARLIEST,从最老的数据开始消费,此策略会获取通道中所有的有效数据 
            String offsetPolicy; 
            // OBS文件输出路径,格式obs://bucket/path 
            String outputPath; 
            // Checkpoint输出路径,格式obs://bucket/path 
            String checkpointPath; 
    
            bootstrapServers = params.get("bootstrap.servers", "xxxx:9092,xxxx:9092,xxxx:9092"); 
            kafkaGroup = params.get("group.id", "test-group"); 
            kafkaTopic = params.get("topic", "test-topic"); 
            offsetPolicy = params.get("offset.policy", "earliest"); 
            outputPath = params.get("output.path", "obs://bucket/output"); 
           checkpointPath = params.get("checkpoint.path", "obs://bucket/checkpoint"); 
    
            try { 
                // 创建执行环境
                StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
                streamEnv.setParallelism(4);
                RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath, true);
                RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true); 
                streamEnv.setStateBackend(rocksDbBackend); 
                // 开启Flink CheckPoint配置,开启时若触发CheckPoint,会将Offset信息同步到Kafka 
                streamEnv.enableCheckpointing(300000); 
                // 设置两次checkpoint的最小间隔时间 
                streamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000); 
                // 设置checkpoint超时时间 
                streamEnv.getCheckpointConfig().setCheckpointTimeout(60000); 
                // 设置checkpoint最大并发数 
                streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
                // 设置作业取消时保留checkpoint 
                streamEnv.getCheckpointConfig().enableExternalizedCheckpoints( 
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 
    
                // Source: 连接kafka数据源 
                Properties properties = new Properties(); 
                properties.setProperty("bootstrap.servers", bootstrapServers); 
                properties.setProperty("group.id", kafkaGroup); 
                properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetPolicy); 
                String topic = kafkaTopic; 
    
                // 创建kafka consumer 
                FlinkKafkaConsumer<String> kafkaConsumer = 
                        new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); 
                /** 
                 * 从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。 
                 * 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置。 
                 * 详情 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/ 
                 */ 
                kafkaConsumer.setStartFromGroupOffsets(); 
    
                //kafka 加入数据源 
                DataStream<String> stream = streamEnv.addSource(kafkaConsumer).setParallelism(3).disableChaining(); 
    
                // 创建文件输出流 
                final StreamingFileSink<String> sink = StreamingFileSink 
                        // 指定文件输出路径与行编码格式 
                        .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")) 
                        // 指定文件输出路径批量编码格式,以parquet格式输出 
                        //.forBulkFormat(new Path(outputPath), ParquetAvroWriters.forGenericRecord(schema)) 
                        // 指定自定义桶分配器 
                        .withBucketAssigner(new DateTimeBucketAssigner<>()) 
                        // 指定滚动策略 
                        .withRollingPolicy(OnCheckpointRollingPolicy.build()) 
                        .build(); 
    
                // Add sink for DIS Consumer data source 
                stream.addSink(sink).disableChaining().name("obs"); 
    
                // stream.print(); 
                streamEnv.execute(); 
            } catch (Exception e) { 
                LOG.error(e.getMessage(), e); 
            } 
        } 
    }
    
    表2 参数说明

    参数名

    具体含义

    举例

    bootstrap.servers

    kafka连接地址

    kafka服务IP地址1:9092,kafka服务IP地址2:9092,kafka服务IP地址3:9092

    group.id

    kafka消费组

    如当前kafka消费组为test-group

    topic

    kafka消费topic

    如当前kafka消费topic为test-topic

    offset.policy

    kafka的offset策略

    如当前kafka的offset策略为earliest

    output.path

    数据写入的OBS路径

    obs://bucket/output

    checkpoint.path

    checkpoint的OBS路径

    obs://bucket/checkpoint

编译运行

应用程序开发完成后,参考Flink Jar作业开发基础样例将编译打包的JAR包上传到DLI运行,查看对应OBS路径下是否有相关的数据信息。