Updated on 2023-08-31 GMT+08:00

Kafka Token Authentication

Scenario

The token authentication mechanism is a lightweight authentication mechanism that does not require Kerberos authentication. It can be used in APIs.

Sample Code

The token authentication mechanism can be used for APIs. Therefore, you can configure the token authentication mechanism in Producer() and Consumer() of the secondary development sample.

  • The sample code of Producer() is as follows:
    public static Properties initProperties() {
    	Properties props = new Properties();
    	KafkaProperties kafkaProc = KafkaProperties.getInstance();
    
    	// Broker address list
    	props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
    	// Client ID
    	props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer"));
    	// Key serialization class
    	props.put(KEY_SERIALIZER,
    			kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
    	// Value serialization class
    	props.put(VALUE_SERIALIZER,
    			kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
    	// Protocol type: Currently, the SASL_PLAINTEXT or PLAINTEXT protocol types can be used.
    	props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
    	// Service name
    	props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
    	// Domain name
    	props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
    	// Partition class name
    	props.put(PARTITIONER_NAME,
    			kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner"));
    	// Generate token configurations.
    	StringBuilder token = new StringBuilder();
    	String LINE_SEPARATOR = System.getProperty("line.separator");
    	token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR);
    	/**
    	 * TOKENID generated by the user.
    	 */
    	token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR);
    	/**
    	 * Token HMAC generated by the user.
    	 */
    	token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR);
    	token.append("tokenauth=true;");
    	// Use SCRAM-SHA-512 as the SASL mechanism for the user.
    	props.put("sasl.mechanism", "SCRAM-SHA-512");
    	props.put("sasl.jaas.config", token.toString());
    
    	return props;
    }
  • The sample code of Consumer() is as follows:
    public static Properties initProperties() {
    	Properties props = new Properties();
    	KafkaProperties kafkaProc = KafkaProperties.getInstance();
            // Broker connection address
            props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
            // Group id
            props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));
            // Whether to automatically submit the offset.
            props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true"));
            // Interval for automatically submitting the offset.
            props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000"));
            // Session timeout
            props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000"));
            // Deserialization class used by the message key value
            props.put(KEY_DESERIALIZER,
                kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
            // Deserialization class used by the message content
            props.put(VALUE_DESERIALIZER,
                kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
            // Security protocol type
            props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
             // Service name
            props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
            // Domain name
            props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
    	// Generate token configurations.
    	StringBuilder token = new StringBuilder();
    	String LINE_SEPARATOR = System.getProperty("line.separator");
    	token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR);
    	/**
    	 * TOKENID generated by the user.
    	 */
    	token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR);
    	/**
    	 * Token HMAC generated by the user.
    	 */
    	token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR);
    	token.append("tokenauth=true;");
    	// Use SCRAM-SHA-512 as the SASL mechanism for the user.
    	props.put("sasl.mechanism", "SCRAM-SHA-512");
    	props.put("sasl.jaas.config", token.toString());
    
    	return props;
    }
  • Set BOOTSTRAP_SERVERS to the host name and port number of the Kafka broker node based on site requirements. You can choose Cluster > Services > Kafka > Instance on FusionInsight Manager to view the broker instance information.
  • Set SECURITY_PROTOCOL to the protocol for connecting to Kafka. In this example, set this parameter to SASL_PLAINTEXT.
  • TOKENID and HMAC are generated when you generate tokens. For how to generate a token, see Kafka Token Authentication Mechanism Tool Usage
  • When using the token authentication mechanism, you need to comment out the Kerberos authentication mechanism to ensure that only one authentication mechanism is used during code running, as shown in the following:
     public static void main(String[] args)
         {
             if (isSecurityModel())
             {
     //            try
     //            {
     //                LOG.info("Securitymode start.");
     //                
     //                //!!Note: When using security authentication, you need to manually change the account to a machine-machine one.
     //                securityPrepare();
     //            }
     //            catch (IOException e)
     //            {
     //                LOG.error("Security prepare failure.");
     //                LOG.error("The IOException occured.", e);
     //                return;
     //            }
                 LOG.info("Security prepare success.");
             }
             
            // Specify whether to use the asynchronous sending mode.
             final boolean asyncEnable = false;
             Producer producerThread = new Producer(KafkaProperties.TOPIC, asyncEnable);
             producerThread.start();
         }