使用Flink Jar写入数据到OBS开发指南
概述
DLI提供了使用自定义Jar运行Flink作业并将数据写入到OBS的能力。本章节JAVA样例代码演示将kafka数据处理后写入到OBS,具体参数配置请根据实际环境修改。
环境准备
已安装和配置IntelliJ IDEA等开发工具以及安装JDK和Maven。
- Maven工程的pom.xml文件配置请参考JAVA样例代码(Flink 1.12)中“pom文件配置”说明。
- 确保本地编译环境可以正常访问公网。
约束与限制
- 需要在DLI控制台下“全局配置 > 服务授权”开启Tenant Adminstrator(全局服务)。
- 写入数据到OBS的桶必须为主账号下所创建的OBS桶。
- 使用Flink1.15版本的计算引擎时,需要用户自行配置委托,否则可能影响作业运行。
详细操作请参考自定义DLI委托权限。
Java样例代码(Flink 1.15)
- pom文件配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>com.huaweicloud</groupId> <artifactId>dli-flink-demo</artifactId> <version>1.0-SNAPSHOT</version> </parent> <groupId>org.example</groupId> <artifactId>flink-1.15-demo</artifactId> <properties> <flink.version>1.15.0</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.14.2</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <archive> <manifest> <mainClass>com.huawei.dli.GetUserConfigFileDemo</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> <resources> <resource> <directory>src/main/resources</directory> <filtering>true</filtering> <includes> <include>**/*.*</include> </includes> </resource> </resources> </build> </project>
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
package com.huawei.dli; import com.huawei.dli.source.CustomParallelSource; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.net.URL; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; public class GetUserConfigFileDemo { private static final Logger LOG = LoggerFactory.getLogger(GetUserConfigFileDemo.class); public static void main(String[] args) { try { ParameterTool params = ParameterTool.fromArgs(args); LOG.info("Params: " + params.toString()); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); // set checkpoint String checkpointPath = params.get("checkpoint.path", "obs://bucket/checkpoint/jobId_jobName/"); LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(System.currentTimeMillis() / 1000, 0, ZoneOffset.ofHours(8)); String dt = localDateTime.format(DateTimeFormatter.ofPattern("yyyyMMdd_HH:mm:ss")); checkpointPath = checkpointPath + dt; streamEnv.setStateBackend(new EmbeddedRocksDBStateBackend()); streamEnv.getCheckpointConfig().setCheckpointStorage(checkpointPath); streamEnv.getCheckpointConfig().setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); streamEnv.enableCheckpointing(30 * 1000); DataStream<String> stream = streamEnv.addSource(new CustomParallelSource()) .setParallelism(1) .disableChaining(); String outputPath = params.get("output.path", "obs://bucket/outputPath/jobId_jobName"); // Get user dependents config URL url = GetUserConfigFileDemo.class.getClassLoader().getResource("userData/user.config"); if (url != null) { Path filePath = org.apache.flink.util.FileUtils.absolutizePath(new Path(url.getPath())); try { String config = org.apache.flink.util.FileUtils.readFileUtf8(new File(filePath.getPath())); LOG.info("config is {}", config); // Do something by config } catch (IOException e) { LOG.error(e.getMessage(), e); } } // Sink OBS final StreamingFileSink<String> sinkForRow = StreamingFileSink .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")) .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); stream.addSink(sinkForRow); streamEnv.execute("sinkForRow"); } catch (Throwable e) { LOG.error(e.getMessage(), e); } } }
表1 参数说明 参数名
具体含义
举例
output.path
数据写入的OBS路径
obs://bucket/output
checkpoint.path
checkpoint的OBS路径
obs://bucket/checkpoint
JAVA样例代码(Flink 1.12)
- pom文件配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>Flink-demo</artifactId> <groupId>com.huaweicloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>flink-kafka-to-obs</artifactId> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!--Flink 版本--> <flink.version>1.12.2</flink.version> <!--JDK 版本--> <java.version>1.8</java.version> <!--Scala 2.11 版本--> <scala.binary.version>2.11</scala.binary.version> <slf4j.version>2.13.3</slf4j.version> <log4j.version>2.10.0</log4j.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- logging --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-jcl</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <archive> <manifest> <mainClass>com.huaweicloud.dli.FlinkKafkaToObsExample</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> <resources> <resource> <directory>../main/config</directory> <filtering>true</filtering> <includes> <include>**/*.*</include> </includes> </resource> </resources> </build> </project>
- 示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; /** * @author xxx * @date 6/26/21 */ public class FlinkKafkaToObsExample { private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaToObsExample.class); public static void main(String[] args) throws Exception { LOG.info("Start Kafka2OBS Flink Streaming Source Java Demo."); ParameterTool params = ParameterTool.fromArgs(args); LOG.info("Params: " + params.toString()); // Kafka连接地址 String bootstrapServers; // Kafka消费组 String kafkaGroup; // Kafka topic String kafkaTopic; // 消费策略,只有当分区没有Checkpoint或者Checkpoint过期时,才会使用此配置的策略; // 如果存在有效的Checkpoint,则会从此Checkpoint开始继续消费 // 取值有: LATEST,从最新的数据开始消费,此策略会忽略通道中已有数据 // EARLIEST,从最老的数据开始消费,此策略会获取通道中所有的有效数据 String offsetPolicy; // OBS文件输出路径,格式obs://bucket/path String outputPath; // Checkpoint输出路径,格式obs://bucket/path String checkpointPath; bootstrapServers = params.get("bootstrap.servers", "xxxx:9092,xxxx:9092,xxxx:9092"); kafkaGroup = params.get("group.id", "test-group"); kafkaTopic = params.get("topic", "test-topic"); offsetPolicy = params.get("offset.policy", "earliest"); outputPath = params.get("output.path", "obs://bucket/output"); checkpointPath = params.get("checkpoint.path", "obs://bucket/checkpoint"); try { // 创建执行环境 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.setParallelism(4); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath, true); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), true); streamEnv.setStateBackend(rocksDbBackend); // 开启Flink CheckPoint配置,开启时若触发CheckPoint,会将Offset信息同步到Kafka streamEnv.enableCheckpointing(300000); // 设置两次checkpoint的最小间隔时间 streamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000); // 设置checkpoint超时时间 streamEnv.getCheckpointConfig().setCheckpointTimeout(60000); // 设置checkpoint最大并发数 streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 设置作业取消时保留checkpoint streamEnv.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // Source: 连接kafka数据源 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", bootstrapServers); properties.setProperty("group.id", kafkaGroup); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetPolicy); String topic = kafkaTopic; // 创建kafka consumer FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); /** * 从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。 * 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置。 * 详情 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/ */ kafkaConsumer.setStartFromGroupOffsets(); //将kafka 加入数据源 DataStream<String> stream = streamEnv.addSource(kafkaConsumer).setParallelism(3).disableChaining(); // 创建文件输出流 final StreamingFileSink<String> sink = StreamingFileSink // 指定文件输出路径与行编码格式 .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")) // 指定文件输出路径批量编码格式,以parquet格式输出 //.forBulkFormat(new Path(outputPath), ParquetAvroWriters.forGenericRecord(schema)) // 指定自定义桶分配器 .withBucketAssigner(new DateTimeBucketAssigner<>()) // 指定滚动策略 .withRollingPolicy(OnCheckpointRollingPolicy.build()) .build(); // Add sink for DIS Consumer data source stream.addSink(sink).disableChaining().name("obs"); // stream.print(); streamEnv.execute(); } catch (Exception e) { LOG.error(e.getMessage(), e); } } }
表2 参数说明 参数名
具体含义
举例
bootstrap.servers
kafka连接地址
kafka服务IP地址1:9092,kafka服务IP地址2:9092,kafka服务IP地址3:9092
group.id
kafka消费组
如当前kafka消费组为test-group
topic
kafka消费topic
如当前kafka消费topic为test-topic
offset.policy
kafka的offset策略
如当前kafka的offset策略为earliest
output.path
数据写入的OBS路径
obs://bucket/output
checkpoint.path
checkpoint的OBS路径
obs://bucket/checkpoint
编译运行
应用程序开发完成后,参考Flink Jar作业开发基础样例将编译打包的JAR包上传到DLI运行,查看对应OBS路径下是否有相关的数据信息。