Updated on 2022-09-14 GMT+08:00

Sample Code of the Kafka Token Authentication Mechanism

Function

The token authentication mechanism can be used for authentication.

Sample Code

Add the token configuration to the Producer() method.

public static Properties initProperties() {
	Properties props = new Properties();
	KafkaProperties kafkaProc = KafkaProperties.getInstance();

	// Broker address list
	props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
	// Client ID
	props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer"));
	// Key serialization class
	props.put(KEY_SERIALIZER,
			kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
	// Value serialization class
	props.put(VALUE_SERIALIZER,
			kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
	// Protocol type: Currently, the SASL_PLAINTEXT or PLAINTEXT protocol types can be used.
	props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
	// Service name
	props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
	// Domain name
	props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
	// Partition class name
	props.put(PARTITIONER_NAME,
			kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner"));
	// Generate token configurations.
	StringBuilder token = new StringBuilder();
	String LINE_SEPARATOR = System.getProperty("line.separator");
	token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR);
	/**
	 * TOKENID generated by the user.
	 */
	token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR);
	/**
	 * Token HMAC generated by the user.
	 */
	token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR);
	token.append("tokenauth=true;");
	// Use SCRAM-SHA-512 as the SASL mechanism for the user.
	props.put("sasl.mechanism", "SCRAM-SHA-512");
	props.put("sasl.jaas.config", token.toString());

	return props;
}

When using the token authentication mechanism, you need to comment out the Kerberos authentication mechanism to ensure that only one authentication mechanism is used during code running, as shown in the following:

 public static void main(String[] args)
     {
         if (isSecurityModel())
         {
 //            try
 //            {
 //                LOG.info("Securitymode start.");
 //                
 //                //!!Note: When using security authentication, you need to manually change the account to a machine-machine one.
 //                securityPrepare();
 //            }
 //            catch (IOException e)
 //            {
 //                LOG.error("Security prepare failure.");
 //                LOG.error("The IOException occured.", e);
 //                return;
 //            }
             LOG.info("Security prepare success.");
         }
         
        // Specify whether to use the asynchronous sending mode.
         final boolean asyncEnable = false;
         Producer producerThread = new Producer(KafkaProperties.TOPIC, asyncEnable);
         producerThread.start();
     }