Kafka Token Authentication
Scenario
The token authentication mechanism is a lightweight authentication mechanism that does not require Kerberos authentication. It can be used in APIs.
Sample Code
The token authentication mechanism can be used for APIs. Therefore, you can configure the token authentication mechanism in Producer() and Consumer() of the secondary development sample.
- The sample code of Producer() is as follows:
public static Properties initProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker address list props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Client ID props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer")); // Key serialization class props.put(KEY_SERIALIZER, kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // Value serialization class props.put(VALUE_SERIALIZER, kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer")); // Protocol type: Currently, the SASL_PLAINTEXT or PLAINTEXT protocol types can be used. props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // Service name props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // Domain name props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // Partition class name props.put(PARTITIONER_NAME, kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner")); // Generate token configurations. StringBuilder token = new StringBuilder(); String LINE_SEPARATOR = System.getProperty("line.separator"); token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR); /** * TOKENID generated by the user. */ token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR); /** * Token HMAC generated by the user. */ token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR); token.append("tokenauth=true;"); // Use SCRAM-SHA-512 as the SASL mechanism for the user. props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", token.toString()); return props; }
- The sample code of Consumer() is as follows:
public static Properties initProperties() { Properties props = new Properties(); KafkaProperties kafkaProc = KafkaProperties.getInstance(); // Broker connection address props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007")); // Group id props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer")); // Whether to automatically submit the offset. props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true")); // Interval for automatically submitting the offset. props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000")); // Session timeout props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000")); // Deserialization class used by the message key value props.put(KEY_DESERIALIZER, kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); // Deserialization class used by the message content props.put(VALUE_DESERIALIZER, kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer")); // Security protocol type props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT")); // Service name props.put(SASL_KERBEROS_SERVICE_NAME, "kafka"); // Domain name props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com")); // Generate token configurations. StringBuilder token = new StringBuilder(); String LINE_SEPARATOR = System.getProperty("line.separator"); token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR); /** * TOKENID generated by the user. */ token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR); /** * Token HMAC generated by the user. */ token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR); token.append("tokenauth=true;"); // Use SCRAM-SHA-512 as the SASL mechanism for the user. props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config", token.toString()); return props; }
- Set BOOTSTRAP_SERVERS to the host name and port number of the Kafka broker node based on site requirements. You can choose Cluster > Services > Kafka > Instance on FusionInsight Manager to view the broker instance information.
- Set SECURITY_PROTOCOL to the protocol for connecting to Kafka. In this example, set this parameter to SASL_PLAINTEXT.
- TOKENID and HMAC are generated when you generate tokens. For how to generate a token, see Kafka Token Authentication Mechanism Tool Usage
- When using the token authentication mechanism, you need to comment out the Kerberos authentication mechanism to ensure that only one authentication mechanism is used during code running, as shown in the following:
public static void main(String[] args) { if (isSecurityModel()) { // try // { // LOG.info("Securitymode start."); // // //!!Note: When using security authentication, you need to manually change the account to a machine-machine one. // securityPrepare(); // } // catch (IOException e) // { // LOG.error("Security prepare failure."); // LOG.error("The IOException occured.", e); // return; // } LOG.info("Security prepare success."); } // Specify whether to use the asynchronous sending mode. final boolean asyncEnable = false; Producer producerThread = new Producer(KafkaProperties.TOPIC, asyncEnable); producerThread.start(); }
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot