文档首页> 数据湖探索 DLI> 开发指南> Flink Jar作业开发指南> 使用Flink Jar连接开启SASL_SSL认证的Kafka
更新时间:2024-03-30 GMT+08:00
分享

使用Flink Jar连接开启SASL_SSL认证的Kafka

概述

本节操作介绍使用Flink Jar连接开启SASL_SSL认证的Kafka的操作方法。

如需使用Flink OpenSource SQL连接开启SASL_SSL认证的Kafka,请参考Flink SQL语法参考-Kafka源表

环境准备

  • 已在DLI控制台购买了通用队列。
  • 已购买了Kafka实例,并开启了SASL_SSL认证。
  • 已在DLI创建增强型跨源并绑定队列,确保DLI队列和Kafka连通。

操作须知

  • 连接带SASL_SSL的Kafka,无论是消费者还是生产者,在对应的properties中都需要指定truststore文件的路径。
  • 初始化consumer/producer都是在taskmanager里执行的,所以需要获取到taskmanager对应container下truststore文件的路径,在初始化前将其引入properties中才能生效。
  • kafka source可以在open里引入。
    图1 获取kafka source
  • kafka sink可以在initializeState里引入。
    图2 获取kafka sink

操作步骤

  1. 从Kafka实例的基本信息页面下载SSL证书,解压后将其中的clinet.jks文件上传到OBS。

    图3 下载SSL证书

  2. 在DLI控制台,打开“数据管理 > 程序包管理”,单击“创建”,创建clinet.jks对应的程序包。

    主要参数的填写说明:
    • 包类型:File
    • OBS路径:clinet.jks所在的OBS路径。
    • 分组名称:自定义分组或选择已有的分组名称。
      图4 创建clinet.jks

  3. 将示例代码打包,在DLI控制台,打开“数据管理 > 程序包管理”,单击“创建”,创建Flink Jar对应的程序包。代码样例请参考Kafaka ToKafakaExample.java样例•SinkKafkaProducer.java样...SourceKafkaConsumer.java...

    主要参数的填写说明:
    • 包类型:JAR
    • OBS路径:Flink Jar所在的OBS路径。
    • 分组名称:自定义分组或选择已有的分组名称。
    图5 创建Flink Jar对应的程序包

  4. 在DLI控制台,打开“数据管理 > 程序包管理”,单击“创建”,创建KafkaToKafka.properties对应的程序包。代码样例请参考•KafkaToKafkaExample.pro...

    主要参数的填写说明:
    • 包类型:File
    • OBS路径:KafkaToKafka.properties所在的OBS路径。
    • 分组名称:自定义分组或选择已有的分组名称。
    图6 创建KafkaToKafka.properties程序包

  5. 创建Flink Jar作业并运行。

    创建Flink Jar作业,在应用程序、其他依赖文件选择步骤3导入Flink Jar,并指定主类。

    主要参数的填写说明:

    • 所属队列:选择Flink Jar作业运行的队列。
    • 应用程序:自定义的程序包
    • 主类:指定
    • 类名:输入类名并确定类参数列表(参数间用空格分隔)。
    • 其他依赖文件:自定义的依赖文件。选择24导入的jks和properties文件。
    • Flink版本:1.10
    图7 创建Flink Jar作业

  6. 结果校验。

    作业处于运行中状态时,向kafka source.topic发送数据,验证kafka sink.topic能否收到数据。

    图8 查看作业任务
    图9 查看kafka sink.topic

JAVA样例代码

  • pom文件配置
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>Flink-demo</artifactId>
            <groupId>com.huaweicloud</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>flink-kafka-to-obs</artifactId>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <!--Flink 版本-->
            <flink.version>1.12.2</flink.version>
            <!--JDK 版本-->
            <java.version>1.8</java.version>
            <!--Scala 2.11 版本-->
            <scala.binary.version>2.11</scala.binary.version>
            <slf4j.version>2.13.3</slf4j.version>
            <log4j.version>2.10.0</log4j.version>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <!-- flink -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <!--  kafka  -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!--  logging  -->
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>${slf4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-jcl</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.huaweicloud.dli.FlinkKafkaToObsExample</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
            <resources>
                <resource>
                    <directory>../main/config</directory>
                    <filtering>true</filtering>
                    <includes>
                        <include>**/*.*</include>
                    </includes>
                </resource>
            </resources>
        </build>
    </project>
    
  • KafkaToKafkaExample.java样例
    package kafka_to_kafka;
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Properties;
    
    
    public class KafkaToKafkaExample {
        private static final Logger LOG = LoggerFactory.getLogger(KafkaToKafkaExample.class);
    
        public static void main(String[] args) throws Exception {
            LOG.info("Start Kafka2Kafka Flink Streaming Source Java Demo.");
            String propertiesPath = KafkaToKafkaExample.class.getClassLoader()
                .getResource("userData/KafkaToKafka.properties").getPath();
            ParameterTool params = ParameterTool.fromPropertiesFile(propertiesPath);
            LOG.info("Params: " + params.toString());
    
            // Kafka连接地址
            String bootstrapServers;
            // Kafka消费组
            String kafkaGroup;
            // Kafka topic
            String sourceTopic;
            String sinkTopic;
            // 消费策略,只有当分区没有Checkpoint或者Checkpoint过期时,才会使用此配置的策略;
            //          如果存在有效的Checkpoint,则会从此Checkpoint开始继续消费
            // 取值有: LATEST,从最新的数据开始消费,此策略会忽略通道中已有数据
            //         EARLIEST,从最老的数据开始消费,此策略会获取通道中所有的有效数据
            String offsetPolicy;
            // SASL_SSL相关配置项。设置jaas账号和密码,username和password为创建Kafka实例过程中开启SASL_SSL时填入的用户名和密码,
            // 或者创建SASL_SSL用户时设置的用户名和密码。格式如下,
            // org.apache.kafka.common.security.plain.PlainLoginModule required
            // username="yourUsername"
            // password="yourPassword";
            String saslJaasConfig;
            // Checkpoint输出路径,格式obs://bucket/path
            String checkpointPath;
    
            bootstrapServers = params.get("bootstrap.servers", "xxx:9093,xxx:9093,xxx:9093");
            kafkaGroup = params.get("source.group", "test-group");
            sourceTopic = params.get("source.topic", "test-source-topic");
            sinkTopic = params.get("sink.topic", "test-sink-topic");
            offsetPolicy = params.get("offset.policy", "earliest");
            saslJaasConfig = params.get("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule "
                    + "required\nusername=\"yourUsername\"\npassword=\"yourPassword\";");
            checkpointPath = params.get("checkpoint.path", "obs://bucket/path");
    
    
            try {
                // 创建执行环境
                StreamExecutionEnvonment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
                RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath, true);
                streamEnv.setStateBackend(rocksDbBackend);
                // 开启Flink CheckPoint配置,开启时若触发CheckPoint,会将Offset信息同步到Kafka
                streamEnv.enableCheckpointing(300000);
                // 设置两次checkpoint的最小间隔时间
                streamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000);
                // 设置checkpoint超时时间
                streamEnv.getCheckpointConfig().setCheckpointTimeout(60000);
                // 设置checkpoint最大并发数
                streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
                // 设置作业取消时保留checkpoint
                streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(
                    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
                // Source: 连接kafka数据源
                Properties sourceProperties = new Properties();
                sourceProperties.setProperty("bootstrap.servers", bootstrapServers);
                sourceProperties.setProperty("group.id", kafkaGroup);
                sourceProperties.setProperty("sasl.jaas.config", saslJaasConfig);
                sourceProperties.setProperty("sasl.mechanism", "PLAIN");
                sourceProperties.setProperty("security.protocol", "SASL_SSL");
                sourceProperties.setProperty("ssl.truststore.password", "dms@kafka");
                sourceProperties.setProperty("ssl.endpoint.identification.algorithm", "");
                sourceProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetPolicy);
    
    
                // 创建kafka consumer
                SourceKafkaConsumer<String> kafkaConsumer =
                    new SourceKafkaConsumer<>(sourceTopic, new SimpleStringSchema(), sourceProperties);
                /**
                 * 从 Kafka brokers 中的 consumer 组(consumer 属性中的 group.id 设置)提交的偏移量中开始读取分区。
                 * 如果找不到分区的偏移量,那么将会使用配置中的 auto.offset.reset 设置。
                 * 详情 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/
                 */
                kafkaConsumer.setStartFromGroupOffsets();
    
                //将kafka 加入数据源
                DataStream<String> stream = streamEnv.addSource(kafkaConsumer).name("kafka_source").setParallelism(1).disableChaining();
    
                // Sink: 连接kafka数据汇
                Properties sinkProperties = new Properties();
                sinkProperties.setProperty("bootstrap.servers", bootstrapServers);
                sinkProperties.setProperty("sasl.jaas.config", saslJaasConfig);
                sinkProperties.setProperty("sasl.mechanism", "PLAIN");
                sinkProperties.setProperty("security.protocol", "SASL_SSL");
                sinkProperties.setProperty("ssl.truststore.password", "dms@kafka");
                sinkProperties.setProperty("ssl.endpoint.identification.algorithm", "");
    
                // 创建kafka producer
                SinkKafkaProducer<String> kafkaProducer = new SinkKafkaProducer<>(sinkTopic, new SimpleStringSchema(),
                    sinkProperties);
    
                //将kafka 加入数据汇
                stream.addSink(kafkaProducer).name("kafka_sink").setParallelism(1).disableChaining();
    
                // stream.print();
                streamEnv.execute();
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
            }
        }
    }
  • SinkKafkaProducer.java样例
    package kafka_to_kafka;
    
    import org.apache.flink.api.common.serialization.SerializationSchema;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    
    import java.util.Properties;
    
    public class SinkKafkaProducer<IN> extends FlinkKafkaProducer<IN>{
    
        public SinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
            super(topicId, serializationSchema, producerConfig);
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            String jksPath = SinkKafkaProducer.class.getClassLoader().getResource("userData/client.jks").getPath();
            producerConfig.setProperty("ssl.truststore.location", jksPath);
            super.initializeState(context);
        }
    }
  • SourceKafkaConsumer.java样例
    package kafka_to_kafka;
    
    import org.apache.flink.api.common.serialization.DeserializationSchema;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    
    import java.util.Properties;
    
    public class SourceKafkaConsumer<T> extends FlinkKafkaConsumer<T> {
    
        public SourceKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
            super(topic, valueDeserializer, props);
        }
    
        @Override
        public void open(Configuration configuration) throws Exception {
            String jksPath = SourceKafkaConsumer.class.getClassLoader().getResource("userData/client.jks").getPath();
            super.properties.setProperty("ssl.truststore.location", jksPath);
            super.open(configuration);
        }
    }
  • KafkaToKafkaExample.properties样例
    bootstrap.servers=xxx:9093,xxx:9093,xxx:9093
    checkpoint.path=obs://bucket/path
    source.group=swq-test-group
    source.topic=topic-swq
    sink.topic=topic-swq-out
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required\n\
      username="xxxx"\n\
      password="xxxx";
分享:

    相关文档

    相关产品