Help Center/ Data Lake Insight/ Developer Guide/ Flink Jar Jobs/ Using Flink Jar to Connect to Kafka that Uses SASL_SSL Authentication
Updated on 2024-06-13 GMT+08:00

Using Flink Jar to Connect to Kafka that Uses SASL_SSL Authentication

Overview

Use Flink Jar to connect to a Kafka with SASL_SSL authentication enabled.

For details about how to use Flink OpenSource SQL to connect to Kafka with SASL_SSL authentication enabled, see Kafka Source Table.

Environment Preparations

  • You have purchased a general-purpose queue on the DLI console.
  • You have purchased a Kafka instance and enabled SASL_SSL authentication.
  • You have created an enhanced datasource connection on DLI and have bound it to the queue that can communicate with Kafka.

Notes

  • To connect to Kafka over SASL_SSL, you need to specify the path of the truststore file in the properties for both consumers and producers.
  • The consumer and producer are initialized in taskmanager. You need to obtain the path of the truststore file of the container corresponding to the taskmanager and import the file to the properties file for the initialization.
  • Kafka source can be introduced by the open method.
    Figure 1 Obtaining Kafka source
  • Kafka sink can be introduced by the initializeState method.
    Figure 2 Obtaining Kafka sink

Procedure

  1. Download the SSL certificate from the basic information page of the Kafka instance, decompress the certificate, and upload the clinet.jks file to OBS.

    Figure 3 Downloading SSL certificate

  2. On the DLI console, choose Data Management > Package Management in the left navigation pane. On the displayed page, click Create to create the clinet.jks package.

    The required parameters are as follows:
    • Type: Select File.
    • OBS Path: Specify the OBS path of the clinet.jks file.
    • Group Name: Enter a name for a new group or select an existing group name.
      Figure 4 Creating clinet.jks package on DLI

  3. Package the sample code. On the DLI console, choose Data Management > Package Management. Click Create on the displayed page to create a Flink JAR package. For details about the sample code, see Kafaka ToKafakaExample.java, SinkKafkaProducer.java, and SourceKafkaConsumer.java.

    The parameters are as follows:
    • Type: Select JAR.
    • OBS Path: Specify the OBS path of the Flink Jar.
    • Group Name: Enter a name for a new group or select an existing group name.
    Figure 5 Creating a Flink JAR package

  4. On the DLI console, choose Data Management > Package Management in the navigation pane on the left. On the displayed page, click Create to create the KafkaToKafka.properties package. For the sample file, see KafkaToKafkaExample.properties.

    The parameters are as follows:
    • Type: Select File.
    • OBS Path: Specify the OBS path of the KafkaToKafka.properties file.
    • Group Name: Enter a name for a new group or select an existing group name.
    Figure 6 Creating a DLI package

  5. Create a Flink Jar job and run it.

    Import the JAR imported in 3 and other dependencies to the Flink Jar job, and specify the main class.

    The parameters are as follows:

    • Queue: Select the queue where the job will run.
    • Application: Select a custom program.
    • Main Class: Select Manually assign.
    • Class Name: Enter the class name and class arguments (separate arguments with spaces).
    • Other Dependencies: Select custom dependencies. Select the .jks and properties files imported in the 2 and 4.
    • Flink Version: Select 1.10.
    Figure 7 Creating a Flink Jar job

  6. Verify the result.

    When the job is in the running state, send data to kafka source.topic to check whether kafka sink.topic can receive the data. If the data can be received, the connection is successful.

    Figure 8 Viewing job tasks
    Figure 9 Viewing kafka sink.topic

Example Java Code

  • POM file configurations
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>Flink-demo</artifactId>
            <groupId>com.huaweicloud</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>flink-kafka-to-obs</artifactId>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <!--Flink version-->
            <flink.version>1.12.2</flink.version>
            <!--JDK version -->
            <java.version>1.8</java.version>
            <!--Scala 2.11 -->
            <scala.binary.version>2.11</scala.binary.version>
            <slf4j.version>2.13.3</slf4j.version>
            <log4j.version>2.10.0</log4j.version>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <!-- flink -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <!--  kafka  -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!--  logging  -->
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-slf4j-impl</artifactId>
                <version>${slf4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-jcl</artifactId>
                <version>${log4j.version}</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.3.0</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                    <configuration>
                        <archive>
                            <manifest>
                                <mainClass>com.huaweicloud.dli.FlinkKafkaToObsExample</mainClass>
                            </manifest>
                        </archive>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                </plugin>
            </plugins>
            <resources>
                <resource>
                    <directory>../main/config</directory>
                    <filtering>true</filtering>
                    <includes>
                        <include>**/*.*</include>
                    </includes>
                </resource>
            </resources>
        </build>
    </project>
    
  • KafkaToKafkaExample.java
    package kafka_to_kafka;
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Properties;
    
    
    public class KafkaToKafkaExample {
        private static final Logger LOG = LoggerFactory.getLogger(KafkaToKafkaExample.class);
    
        public static void main(String[] args) throws Exception {
            LOG.info("Start Kafka2Kafka Flink Streaming Source Java Demo.");
            String propertiesPath = KafkaToKafkaExample.class.getClassLoader()
                .getResource("userData/KafkaToKafka.properties").getPath();
            ParameterTool params = ParameterTool.fromPropertiesFile(propertiesPath);
            LOG.info("Params: " + params.toString());
    
            // Kafka connection address
            String bootstrapServers;
            // Kafka consumer group
            String kafkaGroup;
            // Kafka topic
            String sourceTopic;
            String sinkTopic;
            // Consumption policy. This policy is used only when the partition does not have a checkpoint or the checkpoint expires.
            // If a valid checkpoint exists, consumption continues from this checkpoint.
            // When the policy is set to LATEST, the consumption starts from the latest data. This policy will ignore the existing data in the stream.
            // When the policy is set to EARLIEST, the consumption starts from the earliest data. This policy will obtain all valid data in the stream.
            String offsetPolicy;
            //SASL_SSL configuration items. Set the JAAS username and password, which are the username and password entered when SASL_SSL is enabled when the Kafka instance is created,
            // or those set when the SASL_SSL user is created. The format is as follows:
            // org.apache.kafka.common.security.plain.PlainLoginModule required
            // username="yourUsername"
            // password="yourPassword";
            String saslJaasConfig;
            // Checkpoint output path, which is in the format of obs://bucket/path.
            String checkpointPath;
    
            bootstrapServers = params.get("bootstrap.servers", "xxx:9093,xxx:9093,xxx:9093");
            kafkaGroup = params.get("source.group", "test-group");
            sourceTopic = params.get("source.topic", "test-source-topic");
            sinkTopic = params.get("sink.topic", "test-sink-topic");
            offsetPolicy = params.get("offset.policy", "earliest");
            saslJaasConfig = params.get("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule"
                    + "required\nusername=\"yourUsername\"\npassword=\"yourPassword\";");
            checkpointPath = params.get("checkpoint.path", "obs://bucket/path");
    
    
            try {
                // Create the execution environment.
                StreamExecutionEnvonment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
                RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath, true);
                streamEnv.setStateBackend(rocksDbBackend);
                // Enable Flink checkpointing mechanism. If enabled, the offset information will be synchronized to Kafka.
                streamEnv.enableCheckpointing(300000);
                // Set the minimum interval between two checkpoints.
                streamEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(60000);
                // Set the checkpoint timeout duration.
                streamEnv.getCheckpointConfig().setCheckpointTimeout(60000);
                // Set the maximum number of concurrent checkpoints.
                streamEnv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
                // Retain checkpoints when a job is canceled.
                streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(
                    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
                // Connect to the Kafka data source.
                Properties sourceProperties = new Properties();
                sourceProperties.setProperty("bootstrap.servers", bootstrapServers);
                sourceProperties.setProperty("group.id", kafkaGroup);
                sourceProperties.setProperty("sasl.jaas.config", saslJaasConfig);
                sourceProperties.setProperty("sasl.mechanism", "PLAIN");
                sourceProperties.setProperty("security.protocol", "SASL_SSL");
                sourceProperties.setProperty("ssl.truststore.password", "dms@kafka");
                sourceProperties.setProperty("ssl.endpoint.identification.algorithm", "");
                sourceProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetPolicy);
    
    
                // Create a Kafka consumer.
                SourceKafkaConsumer<String> kafkaConsumer =
                    new SourceKafkaConsumer<>(sourceTopic, new SimpleStringSchema(), sourceProperties);
                /**
                 * Read partitions from the offset submitted by the consumer group (specified by group.id in the consumer attribute) in Kafka brokers.
                 * If the partition offset cannot be found, set it by using the auto.offset.reset parameter.
                 * For details, see https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/.
                 */
                kafkaConsumer.setStartFromGroupOffsets();
    
                // Add Kafka to the data source.
                DataStream<String> stream = streamEnv.addSource(kafkaConsumer).name("kafka_source").setParallelism(1).disableChaining();
    
                // Connect to the Kafka data sink.
                Properties sinkProperties = new Properties();
                sinkProperties.setProperty("bootstrap.servers", bootstrapServers);
                sinkProperties.setProperty("sasl.jaas.config", saslJaasConfig);
                sinkProperties.setProperty("sasl.mechanism", "PLAIN");
                sinkProperties.setProperty("security.protocol", "SASL_SSL");
                sinkProperties.setProperty("ssl.truststore.password", "dms@kafka");
                sinkProperties.setProperty("ssl.endpoint.identification.algorithm", "");
    
                // Create a Kafka producer.
                SinkKafkaProducer<String> kafkaProducer = new SinkKafkaProducer<>(sinkTopic, new SimpleStringSchema(),
                    sinkProperties);
    
                // Add Kafka to the data sink.
                stream.addSink(kafkaProducer).name("kafka_sink").setParallelism(1).disableChaining();
    
                // stream.print();
                streamEnv.execute();
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
            }
        }
    }
  • SinkKafkaProducer.java
    package kafka_to_kafka;
    
    import org.apache.flink.api.common.serialization.SerializationSchema;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    
    import java.util.Properties;
    
    public class SinkKafkaProducer<IN> extends FlinkKafkaProducer<IN>{
    
        public SinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
            super(topicId, serializationSchema, producerConfig);
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            String jksPath = SinkKafkaProducer.class.getClassLoader().getResource("userData/client.jks").getPath();
            producerConfig.setProperty("ssl.truststore.location", jksPath);
            super.initializeState(context);
        }
    }
  • SourceKafkaConsumer.java
    package kafka_to_kafka;
    
    import org.apache.flink.api.common.serialization.DeserializationSchema;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    
    import java.util.Properties;
    
    public class SourceKafkaConsumer<T> extends FlinkKafkaConsumer<T> {
    
        public SourceKafkaConsumer(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
            super(topic, valueDeserializer, props);
        }
    
        @Override
        public void open(Configuration configuration) throws Exception {
            String jksPath = SourceKafkaConsumer.class.getClassLoader().getResource("userData/client.jks").getPath();
            super.properties.setProperty("ssl.truststore.location", jksPath);
            super.open(configuration);
        }
    }
  • KafkaToKafkaExample.properties
    bootstrap.servers=xxx:9093,xxx:9093,xxx:9093
    checkpoint.path=obs://bucket/path
    source.group=swq-test-group
    source.topic=topic-swq
    sink.topic=topic-swq-out
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required\n\
      username="xxxx"\n\
      password="xxxx";