更新时间:2024-10-11 GMT+08:00

Kafka安全使用说明

Kafka API简单说明

  • 新Producer API

    指org.apache.kafka.clients.producer.KafkaProducer中定义的接口,在使用“kafka-console-producer.sh”时,默认使用此API。

  • 旧Producer API

    指kafka.producer.Producer中定义的接口,在使用“kafka-console-producer.sh”时,加“--old-producer”参数会调用此API。

  • 新Consumer API

    指org.apache.kafka.clients.consumer.KafkaConsumer中定义的接口,在使用“kafka-console-consumer.sh”时,加“--new-consumer”参数会调用此API。

  • 旧Consumer API

    指kafka.consumer.ConsumerConnector中定义的接口,在使用“kafka-console-consumer.sh”时,默认使用此API。

新Producer API和新Consumer API,在下文中统称为新API。

Kafka 访问协议说明

Kafka当前支持四种协议类型的访问:PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。

Kafka服务启动时,默认会启动PLAINTEXT和SASL_PLAINTEXT两种协议类型的访问监听。可通过设置Kafka服务配置参数“ssl.mode.enable”为“true”,来启动SSL和SASL_SSL两种协议类型的访问监听。

下表是四中协议类型的简单说明:

协议类型

说明

支持的API

默认端口

PLAINTEXT

支持无认证的明文访问

新API和旧API

9092

SASL_PLAINTEXT

支持Kerberos认证的明文访问

新API

21007

SSL

支持无认证的SSL加密访问

新API

9093

SASL_SSL

支持Kerberos认证的SSL加密访问

新API

21009

Topic的ACL设置

Kafka支持安全访问,因此可以针对Topic进行ACL设置,从而控制不同的用户可以访问不同的Topic。Topic的权限信息,需要在Linux客户端上,使用“kafka-acls.sh”脚本进行查看和设置。

  • 操作场景

    该任务指导Kafka管理员根据业务需求,为其他使用Kafka的系统用户授予相关Topic的特定权限。

    Kafka默认用户组信息表所示。

    用户组名

    描述

    kafkaadmin

    Kafka管理员用户组。添加入本组的用户,拥有所有Topic的创建,删除,授权及读写权限。

    kafkasuperuser

    添加入本组的用户,拥有所有Topic的读写权限。

    kafka

    Kafka普通用户组。添加入本组的用户,需要被kafkaadmin组用户授予特定Topic的读写权限,才能访问对应Topic。

  • 前提条件
    1. 系统管理员已明确业务需求,并准备一个Kafka管理员用户(属于kafkaadmin组)。
    2. 已安装Kafka客户端。
  • 操作步骤
    1. 以客户端安装用户,登录安装Kafka客户端的节点。
    2. 切换到Kafka客户端安装目录,例如“/opt/kafkaclient”。

      cd /opt/kafkaclient

    3. 执行以下命令,配置环境变量。

      source bigdata_env

    4. 执行以下命令,进行用户认证。(普通集群跳过此步骤)

      kinit 组件业务用户

    5. 执行以下命令,切换到Kafka客户端安装目录。

      cd Kafka/kafka/bin

    6. 使用“kafka-acl.sh”进行用户授权常用命令如下:
      • 查看某Topic权限控制列表:

        ./kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群业务IP:2181/kafka > --list --topic <Topic名称>

      • 添加给某用户Producer权限:

        ./kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群业务IP:2181/kafka > --add --allow-principal User:<用户名> --producer --topic <Topic名称>

      • 删除某用户Producer权限:

        ./kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群业务IP:2181/kafka > --remove --allow-principal User:<用户名> --producer --topic <Topic名称>

      • 添加给某用户Consumer权限:

        ./kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群业务IP:2181/kafka > --add --allow-principal User:<用户名> --consumer --topic <Topic名称> --group <消费者组名称>

      • 删除某用户Consumer权限:

        ./kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群业务IP:2181/kafka > --remove --allow-principal User:<用户名> --consumer --topic <Topic名称> --group <消费者组名称>

针对不同的Topic访问场景 ,Kafka新旧API使用说明

  • 场景一:访问设置了ACL的Topic

    使用的API

    用户属组

    客户端参数

    服务端参数

    访问的端口

    新API

    用户需满足以下条件之一即可:

    • 属于系统管理员组
    • 属于kafkaadmin组
    • 属于kafkasuperuser组
    • 被授权的kafka组的用户

    security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name = kafka

    -

    sasl.port(默认21007)

    security.protocol=SASL_SSL sasl.kerberos.service.name = kafka

    ssl.mode.enable配置为true

    sasl-ssl.port(默认21009)

    旧API

    不涉及

    不涉及

    不涉及

    不涉及

  • 场景二:访问未设置ACL的Topic

    使用的API

    用户属组

    客户端参数

    服务端参数

    访问的端口

    新API

    用户需满足以下条件之一:

    • 属于系统管理员组
    • 属于kafkaadmin组
    • 属于kafkasuperuser组

    security.protocol=SASL_PLAINTEXT

    sasl.kerberos.service.name = kafka

    -

    sasl.port(默认21007)

    用户属于kafka组

    allow.everyone.if.no.acl.found配置为true

    sasl.port(默认21007)

    用户需满足以下条件之一:

    • 属于系统管理员组
    • 属于kafkaadmin组
    • kafkasuperuser组用户

    security.protocol=SASL_SSLsasl.kerberos.service.name = kafka

    ssl-enable配置为“true”

    sasl-ssl.port(默认21009)

    用户属于kafka组

    allow.everyone.if.no.acl.found配置为“true”

    ssl-enable配置为“true”

    sasl-ssl.port(默认21009)

    -

    security.protocol=PLAINTEXT

    allow.everyone.if.no.acl.found配置为“true”

    port(默认21005)

    -

    security.protocol=SSL

    allow.everyone.if.no.acl.found配置为“true”

    ssl-enable配置为“true”

    ssl.port(默认21008)

    旧Producer

    -

    -

    allow.everyone.if.no.acl.found配置为“true”

    port(默认21005)

    旧Consumer

    -

    -

    allow.everyone.if.no.acl.found配置为“true”

    ZooKeeper服务端口:clientPort(默认24002)