使用Flink Jar连接开启SASL_SSL认证的Kafka
概述
本节操作介绍使用Flink Jar连接开启SASL_SSL认证的Kafka的操作方法。
如需使用Flink OpenSource SQL连接开启SASL_SSL认证的Kafka,请参考Flink SQL语法参考-Kafka源表。
环境准备
- 已在DLI控制台购买了通用队列。
- 已购买了Kafka实例,并开启了SASL_SSL认证。
- 已在DLI创建增强型跨源并绑定队列,确保DLI队列和Kafka连通。
操作须知
- 连接带SASL_SSL的Kafka,无论是消费者还是生产者,在对应的properties中都需要指定truststore文件的路径。
- 初始化consumer/producer都是在taskmanager里执行的,所以需要获取到taskmanager对应container下truststore文件的路径,在初始化前将其引入properties中才能生效。
- kafka source可以在open里引入。
图1 获取kafka source
- kafka sink可以在initializeState里引入。
图2 获取kafka sink
操作步骤
- 从Kafka实例的基本信息页面下载SSL证书,解压后将其中的clinet.jks文件上传到OBS。
图3 下载SSL证书
- 在DLI控制台,打开“数据管理 > 程序包管理”,单击“创建”,创建clinet.jks对应的程序包。
主要参数的填写说明:
- 包类型:File
- OBS路径:clinet.jks所在的OBS路径。
- 分组名称:自定义分组或选择已有的分组名称。
图4 创建clinet.jks
- 将示例代码打包,在DLI控制台,打开“数据管理 > 程序包管理”,单击“创建”,创建Flink Jar对应的程序包。代码样例请参考Kafaka ToKafakaExample.java样例、•SinkKafkaProducer.java样...、SourceKafkaConsumer.java...。
主要参数的填写说明:
- 包类型:JAR
- OBS路径:Flink Jar所在的OBS路径。
- 分组名称:自定义分组或选择已有的分组名称。
图5 创建Flink Jar对应的程序包
- 在DLI控制台,打开“数据管理 > 程序包管理”,单击“创建”,创建KafkaToKafka.properties对应的程序包。代码样例请参考•KafkaToKafkaExample.pro...。
主要参数的填写说明:
- 包类型:File
- OBS路径:KafkaToKafka.properties所在的OBS路径。
- 分组名称:自定义分组或选择已有的分组名称。
图6 创建KafkaToKafka.properties程序包
- 创建Flink Jar作业并运行。
创建Flink Jar作业,在应用程序、其他依赖文件选择步骤3导入Flink Jar,并指定主类。
主要参数的填写说明:
- 所属队列:选择Flink Jar作业运行的队列。
- 应用程序:自定义的程序包
- 主类:指定
- 类名:输入类名并确定类参数列表(参数间用空格分隔)。
- 其他依赖文件:自定义的依赖文件。选择2和4导入的jks和properties文件。
- Flink版本:1.10
图7 创建Flink Jar作业
- 结果校验。
作业处于运行中状态时,向kafka source.topic发送数据,验证kafka sink.topic能否收到数据。
图8 查看作业任务
图9 查看kafka sink.topic
JAVA样例代码
- pom文件配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>Flink-demo</artifactId> <groupId>com.huaweicloud</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>flink-kafka-to-obs</artifactId> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!--Flink 版本--> <flink.version>1.12.2</flink.version> <!--JDK 版本--> <java.version>1.8</java.version> <!--Scala 2.11 版本--> <scala.binary.version>2.11</scala.binary.version> <slf4j.version>2.13.3</slf4j.version> <log4j.version>2.10.0</log4j.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- flink --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <!-- kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <!-- logging --> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${slf4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-jcl</artifactId> <version>${log4j.version}</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.3.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <archive> <manifest> <mainClass>com.huaweicloud.dli.FlinkKafkaToObsExample</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> <resources> <resource> <directory>../main/config</directory> <filtering>true</filtering> <includes> <include>**/*.*</include> </includes> </resource> </resources> </build> </project>
- KafkaToKafkaExample.java样例
package kafka_to_kafka; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; public class KafkaToKafkaExample { private static final Logger LOG = LoggerFactory.getLogger(KafkaToKafkaExample.class); public static void main(String[] args) throws Exception { LOG.info("Start Kafka2Kafka Flink Streaming Source Java Demo."); String propertiesPath = KafkaToKafkaExample.class.getClassLoader() .getResource("userData/KafkaToKafka.properties").getPath(); ParameterTool params = ParameterTool.fromPropertiesFile(propertiesPath); LOG.info("Params: " + params.toString()); // Kafka连接地址 String bootstrapServers; // Kafka消费组 String kafkaGroup; // Kafka topic String sourceTopic; String sinkTopic; // 消费策略,只有当分区没有Checkpoint或者Checkpoint过期时,才会使用此配置的策略; // 如果存在有效的Checkpoint,则会从此Checkpoint开始继续消费 // 取值有: LATEST,从最新的数据开始消费,此策略会忽略通道中已有数据 // EARLIEST,从最初的数据开始消费,此策略会获取通道中所有的有效数据 String offsetPolicy; // SASL_SSL相关配置项。设置jaas账号和密码,username和password为创建Kafka实例过程中开启SASL_SSL时填入的用户名和密码, // 或者创建SASL_SSL用户时设置的用户名和密码。格式如下, // org.apache.kafka.common.security.plain.PlainLoginModule required // username="yourUsername" // password="yourPassword"; String saslJaasConfig; // Checkpoint输出路径,格式obs://bucket/path String checkpointPath; bootstrapServers = params.get("bootstrap.servers", "xxx:9093,xxx:9093,xxx:9093"); kafkaGroup = params.get("source.group", "test-group"); sourceTopic = params.get("source.topic", "test-source-topic"); sinkTopic = params.get("sink.topic", "test-sink-topic"); offsetPolicy = params.get("offset.policy", "earliest"); saslJaasConfig = params.get("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule " + "required\nusername=\"yourUsername\"\npassword=\"yourPassword\";"); checkpointPath = params.get("checkpoint.path", "obs://bucket/path"); try { // 创建执行环境 StreamExecutionEnvonment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath, true); streamEnv.setStateBackend(rocksDbBackend); // 开启Flink CheckPoint配置,开启时若触发CheckPoint,会将Offset信息同步到Kafka streamEnv.enableCheckpointing(300000); // 设置两次checkpoint的最小间隔时间 streamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000); // 设置checkpoint超时时间 streamEnv.getCheckpointConfig().setCheckpointTimeout(60000); // 设置checkpoint最大并发数 streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 设置作业取消时保留checkpoint streamEnv.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // Source: 连接kafka数据源 Properties sourceProperties = new Properties(); sourceProperties.setProperty("bootstrap.servers", bootstrapServers); sourceProperties.setProperty("group.id", kafkaGroup); sourceProperties.setProperty("sasl.jaas.config", saslJaasConfig); sourceProperties.setProperty("sasl.mechanism", "PLAIN"); sourceProperties.setProperty("security.protocol", "SASL_SSL"); sourceProperties.setProperty("ssl.truststore.password", "dms@kafka"); sourceProperties.setProperty("ssl.endpoint.identification.algorithm", ""); sourceProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetPolicy); // 创建kafka consumer SourceKafkaConsumer<String> kafkaConsumer = new SourceKafkaConsumer<>(sourceTopic, new SimpleStringSchema(), sourceProperties); /** * 从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。 * 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置。 * 详情 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/ */ kafkaConsumer.setStartFromGroupOffsets(); //将kafka 加入数据源 DataStream<String> stream = streamEnv.addSource(kafkaConsumer).name("kafka_source").setParallelism(1).disableChaining(); // Sink: 连接kafka数据汇 Properties sinkProperties = new Properties(); sinkProperties.setProperty("bootstrap.servers", bootstrapServers); sinkProperties.setProperty("sasl.jaas.config", saslJaasConfig); sinkProperties.setProperty("sasl.mechanism", "PLAIN"); sinkProperties.setProperty("security.protocol", "SASL_SSL"); sinkProperties.setProperty("ssl.truststore.password", "dms@kafka"); sinkProperties.setProperty("ssl.endpoint.identification.algorithm", ""); // 创建kafka producer SinkKafkaProducer<String> kafkaProducer = new SinkKafkaProducer<>(sinkTopic, new SimpleStringSchema(), sinkProperties); //将kafka 加入数据汇 stream.addSink(kafkaProducer).name("kafka_sink").setParallelism(1).disableChaining(); // stream.print(); streamEnv.execute(); } catch (Exception e) { LOG.error(e.getMessage(), e); } } }
- SinkKafkaProducer.java样例
package kafka_to_kafka; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.Properties; public class SinkKafkaProducer<IN> extends FlinkKafkaProducer<IN>{ public SinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) { super(topicId, serializationSchema, producerConfig); } @Override public void initializeState(FunctionInitializationContext context) throws Exception { String jksPath = SinkKafkaProducer.class.getClassLoader().getResource("userData/client.jks").getPath(); producerConfig.setProperty("ssl.truststore.location", jksPath); super.initializeState(context); } }
- SourceKafkaConsumer.java样例
package kafka_to_kafka; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class SourceKafkaConsumer<T> extends FlinkKafkaConsumer<T> { public SourceKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) { super(topic, valueDeserializer, props); } @Override public void open(Configuration configuration) throws Exception { String jksPath = SourceKafkaConsumer.class.getClassLoader().getResource("userData/client.jks").getPath(); super.properties.setProperty("ssl.truststore.location", jksPath); super.open(configuration); } }
- KafkaToKafkaExample.properties样例
bootstrap.servers=xxx:9093,xxx:9093,xxx:9093 checkpoint.path=obs://bucket/path source.group=swq-test-group source.topic=topic-swq sink.topic=topic-swq-out sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required\n\ username="xxxx"\n\ password="xxxx";