文档首页> 数据湖探索 DLI> 开发指南> Flink Jar作业开发指南> 使用Flink Jar写入数据到OBS开发指南
更新时间:2024-01-09 GMT+08:00

使用Flink Jar写入数据到OBS开发指南

概述

DLI提供了使用自定义Jar运行Flink作业并将数据写入到OBS的能力。本章节JAVA样例代码演示将kafka数据处理后写入到OBS,具体参数配置请根据实际环境修改。

环境准备

已安装和配置IntelliJ IDEA等开发工具以及安装JDK和Maven。

  • Maven工程的pom.xml文件配置请参考JAVA样例代码中“pom文件配置”说明。
  • 确保本地编译环境可以正常访问公网。

约束与限制

  • 需要在DLI控制台下“全局配置 > 服务授权”开启Tenant Adminstrator(全局服务)。
  • 写入数据到OBS的桶必须为主账号下所创建的OBS桶。

JAVA样例代码

  • pom文件配置
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>Flink-demo</artifactId>
            <groupId>com.huaweicloud</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>flink-kafka-to-obs</artifactId>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <!--Flink 版本-->
            <flink.version>1.12.2</flink.version>
            <!--JDK 版本-->
            <java.version>1.8</java.version>
            <!--Scala 2.11 版本-->
            <scala.binary.version>2.11</scala.binary.version>
            <slf4j.version>2.13.3</slf4j.version>
            <log4j.version>2.10.0</log4j.version>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <!-- flink -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <!--  kafka  -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!--  logging  -->
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>${slf4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-jcl</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.huaweicloud.dli.FlinkKafkaToObsExample</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
            <resources>
                <resource>
                    <directory>../main/config</directory>
                    <filtering>true</filtering>
                    <includes>
                        <include>**/*.*</include>
                    </includes>
                </resource>
            </resources>
        </build>
    </project>
    
  • 示例代码
    import org.apache.flink.api.common.serialization.SimpleStringEncoder; 
    import org.apache.flink.api.common.serialization.SimpleStringSchema; 
    import org.apache.flink.api.java.utils.ParameterTool; 
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; 
    import org.apache.flink.core.fs.Path; 
    import org.apache.flink.runtime.state.filesystem.FsStateBackend; 
    import org.apache.flink.streaming.api.datastream.DataStream; 
    import org.apache.flink.streaming.api.environment.CheckpointConfig; 
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 
    import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; 
    import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; 
    import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; 
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; 
    import org.apache.kafka.clients.consumer.ConsumerConfig; 
    import org.slf4j.Logger; 
    import org.slf4j.LoggerFactory; 
    
    import java.util.Properties; 
    
    /** 
     * @author xxx 
     * @date 6/26/21 
     */ 
    public class FlinkKafkaToObsExample { 
        private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaToObsExample.class); 
    
        public static void main(String[] args) throws Exception { 
            LOG.info("Start Kafka2OBS Flink Streaming Source Java Demo."); 
            ParameterTool params = ParameterTool.fromArgs(args); 
            LOG.info("Params: " + params.toString()); 
    
            // Kafka连接地址 
            String bootstrapServers; 
            // Kafka消费组 
            String kafkaGroup; 
            // Kafka topic 
            String kafkaTopic; 
            // 消费策略,只有当分区没有Checkpoint或者Checkpoint过期时,才会使用此配置的策略; 
            //          如果存在有效的Checkpoint,则会从此Checkpoint开始继续消费 
            // 取值有: LATEST,从最新的数据开始消费,此策略会忽略通道中已有数据 
            //         EARLIEST,从最老的数据开始消费,此策略会获取通道中所有的有效数据 
            String offsetPolicy; 
            // OBS文件输出路径,格式obs://bucket/path 
            String outputPath; 
            // Checkpoint输出路径,格式obs://bucket/path 
            String checkpointPath; 
    
            bootstrapServers = params.get("bootstrap.servers", "xxxx:9092,xxxx:9092,xxxx:9092"); 
            kafkaGroup = params.get("group.id", "test-group"); 
            kafkaTopic = params.get("topic", "test-topic"); 
            offsetPolicy = params.get("offset.policy", "earliest"); 
            outputPath = params.get("output.path", "obs://bucket/output"); 
           checkpointPath = params.get("checkpoint.path", "obs://bucket/checkpoint"); 
    
            try { 
                // 创建执行环境
                StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
                streamEnv.setParallelism(4);
                RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath, true);
                RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true); 
                streamEnv.setStateBackend(rocksDbBackend); 
                // 开启Flink CheckPoint配置,开启时若触发CheckPoint,会将Offset信息同步到Kafka 
                streamEnv.enableCheckpointing(300000); 
                // 设置两次checkpoint的最小间隔时间 
                streamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000); 
                // 设置checkpoint超时时间 
                streamEnv.getCheckpointConfig().setCheckpointTimeout(60000); 
                // 设置checkpoint最大并发数 
                streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 
                // 设置作业取消时保留checkpoint 
                streamEnv.getCheckpointConfig().enableExternalizedCheckpoints( 
                        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 
    
                // Source: 连接kafka数据源 
                Properties properties = new Properties(); 
                properties.setProperty("bootstrap.servers", bootstrapServers); 
                properties.setProperty("group.id", kafkaGroup); 
                properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetPolicy); 
                String topic = kafkaTopic; 
    
                // 创建kafka consumer 
                FlinkKafkaConsumer<String> kafkaConsumer = 
                        new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); 
                /** 
                 * 从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。 
                 * 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置。 
                 * 详情 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/ 
                 */ 
                kafkaConsumer.setStartFromGroupOffsets(); 
    
                //将kafka 加入数据源 
                DataStream<String> stream = streamEnv.addSource(kafkaConsumer).setParallelism(3).disableChaining(); 
    
                // 创建文件输出流 
                final StreamingFileSink<String> sink = StreamingFileSink 
                        // 指定文件输出路径与行编码格式 
                        .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")) 
                        // 指定文件输出路径批量编码格式,以parquet格式输出 
                        //.forBulkFormat(new Path(outputPath), ParquetAvroWriters.forGenericRecord(schema)) 
                        // 指定自定义桶分配器 
                        .withBucketAssigner(new DateTimeBucketAssigner<>()) 
                        // 指定滚动策略 
                        .withRollingPolicy(OnCheckpointRollingPolicy.build()) 
                        .build(); 
    
                // Add sink for DIS Consumer data source 
                stream.addSink(sink).disableChaining().name("obs"); 
    
                // stream.print(); 
                streamEnv.execute(); 
            } catch (Exception e) { 
                LOG.error(e.getMessage(), e); 
            } 
        } 
    }
    
    表1 参数说明

    参数名

    具体含义

    举例

    bootstrap.servers

    kafka连接地址

    kafka服务IP地址1:9092,kafka服务IP地址2:9092,kafka服务IP地址3:9092

    group.id

    kafka消费组

    如当前kafka消费组为test-group

    topic

    kafka消费topic

    如当前kafka消费topic为test-topic

    offset.policy

    kafka的offset策略

    如当前kafka的offset策略为earliest

    output.path

    数据写入的OBS路径

    obs://bucket/output

    checkpoint.path

    checkpoint的OBS路径

    obs://bucket/checkpoint

编译运行

应用程序开发完成后,参考Flink Jar作业开发基础样例将编译打包的JAR包上传到DLI运行,查看对应OBS路径下是否有相关的数据信息。