更新时间:2024-08-05 GMT+08:00

使用Kafka Token认证

场景说明

Token认证机制是一种轻量级身份认证机制,无需访问Kerberos认证,可在API中使用。

代码样例

Token认证机制支持API,用户可在二次开发样例的Producer()Consumer()中对其进行配置。

  • Producer()配置的样例代码如下:
    public static Properties initProperties() {
    	Properties props = new Properties();
    	KafkaProperties kafkaProc = KafkaProperties.getInstance();
    
    	// Broker地址列表
    	props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
    	// 客户端ID
    	props.put(CLIENT_ID, kafkaProc.getValues(CLIENT_ID, "DemoProducer"));
    	// Key序列化类
    	props.put(KEY_SERIALIZER,
    			kafkaProc.getValues(KEY_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
    	// Value序列化类
    	props.put(VALUE_SERIALIZER,
    			kafkaProc.getValues(VALUE_SERIALIZER, "org.apache.kafka.common.serialization.StringSerializer"));
    	// 协议类型:当前支持配置为SASL_PLAINTEXT或者PLAINTEXT
    	props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
    	// 服务名
    	props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
    	// 域名
    	props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
    	// 分区类名
    	props.put(PARTITIONER_NAME,
    			kafkaProc.getValues(PARTITIONER_NAME, "com.huawei.bigdata.kafka.example.SimplePartitioner"));
    	// 生成Token配置
    	StringBuilder token = new StringBuilder();
    	String LINE_SEPARATOR = System.getProperty("line.separator");
    	token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR);
    	/**
    	 * 用户自己生成的Token的TOKENID
    	 */
    	token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR);
    	/**
    	 * 用户自己生成的Token的HMAC
    	 */
    	token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR);
    	token.append("tokenauth=true;");
    	// 用户使用的SASL机制,配置为SCRAM-SHA-512
    	props.put("sasl.mechanism", "SCRAM-SHA-512");
    	props.put("sasl.jaas.config", token.toString());
    
    	return props;
    }
  • Consumer()配置的样例代码如下:
    public static Properties initProperties() {
    	Properties props = new Properties();
    	KafkaProperties kafkaProc = KafkaProperties.getInstance();
            // Broker连接地址
            props.put(BOOTSTRAP_SERVER, kafkaProc.getValues(BOOTSTRAP_SERVER, "localhost:21007"));
            // Group id
            props.put(GROUP_ID, kafkaProc.getValues(GROUP_ID, "DemoConsumer"));
            // 是否自动提交offset
            props.put(ENABLE_AUTO_COMMIT, kafkaProc.getValues(ENABLE_AUTO_COMMIT, "true"));
            // 自动提交offset的时间间隔
            props.put(AUTO_COMMIT_INTERVAL_MS, kafkaProc.getValues(AUTO_COMMIT_INTERVAL_MS,"1000"));
            // 会话超时时间
            props.put(SESSION_TIMEOUT_MS, kafkaProc.getValues(SESSION_TIMEOUT_MS, "30000"));
            // 消息Key值使用的反序列化类
            props.put(KEY_DESERIALIZER,
                kafkaProc.getValues(KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
            // 消息内容使用的反序列化类
            props.put(VALUE_DESERIALIZER,
                kafkaProc.getValues(VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"));
            // 安全协议类型
            props.put(SECURITY_PROTOCOL, kafkaProc.getValues(SECURITY_PROTOCOL, "SASL_PLAINTEXT"));
            // 服务名
            props.put(SASL_KERBEROS_SERVICE_NAME, "kafka");
            // 域名
            props.put(KERBEROS_DOMAIN_NAME, kafkaProc.getValues(KERBEROS_DOMAIN_NAME, "hadoop.hadoop.com"));
    	// 生成Token配置
    	StringBuilder token = new StringBuilder();
    	String LINE_SEPARATOR = System.getProperty("line.separator");
    	token.append("org.apache.kafka.common.security.scram.ScramLoginModule required").append(LINE_SEPARATOR);
    	/**
    	 * 用户自己生成的Token的TOKENID
    	 */
    	token.append("username=\"PPVz2cxuQC-okwJVZnFKFg\"").append(LINE_SEPARATOR);
    	/**
    	 * 用户自己生成的Token的HMAC
    	 */
    	token.append("password=\"pL5nHsIUODg5u0dRM+o62cOIf/j6yATSt6uaPBYfIb29dj/jbpiAnRGSWDJ6tL4KXo89dot0axcRIDsMagyN4g==\"").append(LINE_SEPARATOR);
    	token.append("tokenauth=true;");
    	// 用户使用的SASL机制,配置为SCRAM-SHA-512
    	props.put("sasl.mechanism", "SCRAM-SHA-512");
    	props.put("sasl.jaas.config", token.toString());
    
    	return props;
    }
  • BOOTSTRAP_SERVERS需根据集群实际情况,配置为Kafka Broker节点的主机名及端口,可通过集群FusionInsight Manager界面中选择“集群 > 服务 > Kafka > 实例”查看。
  • SECURITY_PROTOCOL为连接Kafka的协议类型,在本示例中,配置为“SASL_PLAINTEXT”。
  • “TOKENID”“HMAC”参考Kafka Token认证机制工具使用说明为用户生成Token时产生。
  • 在使用Token认证机制时,需要把Kerberos认证机制注释掉,保证代码运行过程中只使用一个认证机制,如下所示:
     public static void main(String[] args)
         {
             if (isSecurityModel())
             {
     //            try
     //            {
     //                LOG.info("Securitymode start.");
     //                
     //                //!!注意,安全认证时,需要用户手动修改为自己申请的机机账号
     //                securityPrepare();
     //            }
     //            catch (IOException e)
     //            {
     //                LOG.error("Security prepare failure.");
     //                LOG.error("The IOException occured.", e);
     //                return;
     //            }
                 LOG.info("Security prepare success.");
             }
             
             // 是否使用异步发送模式
             final boolean asyncEnable = false;
             Producer producerThread = new Producer(KafkaProperties.TOPIC, asyncEnable);
             producerThread.start();
         }