更新时间:2025-12-26 GMT+08:00
分享

Kafka安全使用说明

MRS 3.x及之后版本,Kafka不支持旧Producer API和旧Consumer API。

Kafka API简单说明

  • 新Producer API

    指org.apache.kafka.clients.producer.KafkaProducer中定义的接口,在使用“kafka-console-producer.sh”时,默认使用此API。

  • 旧Producer API

    指kafka.producer.Producer中定义的接口,在使用“kafka-console-producer.sh”时,加“--old-producer”参数会调用此API。

  • 新Consumer API

    指org.apache.kafka.clients.consumer.KafkaConsumer中定义的接口,在使用“kafka-console-consumer.sh”时,加“--new-consumer”参数会调用此API。

  • 旧Consumer API

    指kafka.consumer.ConsumerConnector中定义的接口,在使用“kafka-console-consumer.sh”时,默认使用此API。

新Producer API和新Consumer API,在下文中统称为新API。

Kafka访问协议说明

Kafka当前支持四种协议类型的访问:PLAINTEXT、SSL、SASL_PLAINTEXT、SASL_SSL。

Kafka服务启动时,默认会启动PLAINTEXT和SASL_PLAINTEXT两种协议类型的访问监测。可通过设置Kafka服务配置参数“ssl.mode.enable”为“true”,来启动SSL和SASL_SSL两种协议类型。

下表是四种协议类型的简单说明:

协议类型

说明

支持的API

默认端口

PLAINTEXT

支持无认证的明文访问

新API和旧API

9092

SASL_PLAINTEXT

支持Kerberos认证的明文访问

新API

21007

SSL

支持无认证的SSL加密访问

新API

9093

SASL_SSL

支持Kerberos认证的SSL加密访问

新API

21009

Topic的ACL设置

Kafka支持安全访问,因此可以针对Topic进行ACL设置,从而控制不同的用户可以访问不同的Topic。Topic的权限信息,需要在Linux客户端上,使用“kafka-acls.sh”脚本进行查看和设置。

该任务指导Kafka管理员根据业务需求,为其他使用Kafka的系统用户授予相关Topic的特定权限。

Kafka默认用户组信息表所示。

用户组名

描述

kafkaadmin

Kafka管理员用户组。添加入本组的用户,拥有所有Topic的创建,删除,授权及读写权限。

kafkasuperuser

添加入本组的用户,拥有所有Topic的读写权限。

kafka

Kafka普通用户组。添加入本组的用户,需要被kafkaadmin组用户授予特定Topic的读写权限,才能访问对应Topic。

  • 前提条件
    • 系统管理员已明确业务需求,并准备一个Kafka管理员用户(属于kafkaadmin组)。
    • 已安装客户端,例如安装目录为“/opt/client,以下操作的客户端目录只是举例,请根据实际安装目录修改

      下载并安装集群客户端的具体操作,请参考安装MRS集群客户端

  • 操作步骤
    1. 登录MRS集群Manager。

      登录集群Manager具体操作,请参考访问MRS集群Manager

    2. 获取ZooKeeper节点业务IP及端口。
      1. 选择“集群 > 服务 > ZooKeeper > 实例”,查看并记录任意一个ZooKeeper角色实例的业务IP地址。
      2. 选择“配置 > 全部配置”,搜索“clientPort”,查看并记录端口号。

        默认端口如下:

        • 开源端口默认值为:2181
        • 定制端口默认值为:24002

        端口定制/开源区分:创建LTS版本类型集群时,可以选择“组件端口”为“开源”或是“定制”,选择“开源”使用开源端口,选择“定制”使用定制端口。

    3. 以客户端安装用户,登录安装客户端的节点。
    4. 执行以下命令,切换到客户端安装目录,例如安装目录为“/opt/client,具体以实际替换
      cd /opt/client
    5. 执行以下命令配置环境变量。
      source bigdata_env
    6. 执行以下命令,进行用户认证。(集群未启用Kerberos认证(普通模式)跳过此步骤)
      kinit 组件业务用户
    7. 执行以下命令进入Kafka客户端“bin”目录。
      cd Kafka/kafka/bin
    8. 使用“kafka-acl.sh”进行用户授权常用命令如下:
      • 查看某Topic权限控制列表:
        ./kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群业务IP:2181/kafka> --list --topic <Topic名称>
      • 添加给某用户Producer权限:
        ./kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群业务IP:2181/kafka> --add --allow-principal User:<用户名> --producer --topic <Topic名称>
      • 删除某用户Producer权限:
        ./kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群业务IP:2181/kafka> --remove --allow-principal User:<用户名> --producer --topic <Topic名称>
      • 添加给某用户Consumer权限:
        ./kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群业务IP:2181/kafka> --add --allow-principal User:<用户名> --consumer --topic <Topic名称> --group <消费者组名称>
      • 删除某用户Consumer权限:
        ./kafka-acls.sh --authorizer-properties zookeeper.connect=<ZooKeeper集群业务IP:2181/kafka> --remove --allow-principal User:<用户名> --consumer --topic <Topic名称> --group <消费者组名称>

      MRS 1.6.3及之前版本,无论集群是否开启Kerberos认证ZooKeeper默认端口号均为24002。MRS 1.6.3及之后版本,无论集群是否开启Kerberos认证ZooKeeper默认端口号均为2181。

针对不同的Topic访问场景 ,Kafka新旧API使用说明

  • 场景一:访问设置了ACL的Topic

    使用的API

    用户属组

    客户端参数

    服务端参数

    访问的端口

    新API

    用户需满足以下条件之一即可:

    • 属于系统管理员组
    • 属于kafkaadmin组
    • 属于kafkasuperuser组
    • 被授权的kafka组的用户

    security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name = kafka

    -

    sasl.port(默认21007)

    security.protocol=SASL_SSL sasl.kerberos.service.name = kafka

    ssl.mode.enable配置为true

    sasl-ssl.port(默认21009)

    旧API

    不涉及

    不涉及

    不涉及

    不涉及

  • 场景二:访问未设置ACL的Topic

    使用的API

    用户属组

    客户端参数

    服务端参数

    访问的端口

    新API

    用户需满足以下条件之一:

    • 属于系统管理员组
    • 属于kafkaadmin组
    • 属于kafkasuperuser组

    security.protocol=SASL_PLAINTEXT

    sasl.kerberos.service.name = kafka

    -

    sasl.port(默认21007)

    用户属于kafka组

    allow.everyone.if.no.acl.found配置为true

    sasl.port(默认21007)

    用户需满足以下条件之一:

    • 属于系统管理员组
    • 属于kafkaadmin组
    • kafkasuperuser组用户

    security.protocol=SASL_SSL

    sasl.kerberos.service.name = kafka

    ssl-enable配置为“true”

    sasl-ssl.port(默认21009)

    用户属于kafka组

    allow.everyone.if.no.acl.found配置为“true”

    ssl-enable配置为“true”

    sasl-ssl.port(默认21009)

    -

    security.protocol=PLAINTEXT

    allow.everyone.if.no.acl.found配置为“true”

    port(默认21005)

    -

    security.protocol=SSL

    allow.everyone.if.no.acl.found配置为“true”

    ssl-enable配置为“true”

    ssl.port(默认21008)

    旧Producer

    -

    -

    allow.everyone.if.no.acl.found配置为“true”

    port(默认21005)

    旧Consumer

    -

    -

    allow.everyone.if.no.acl.found配置为“true”

    ZooKeeper服务端口:clientPort(默认24002)

相关文档