更新时间:2024-07-19 GMT+08:00

Kafka 特性说明

Kafka Idempotent 特性

特性说明:Kafka从0.11.0.0版本引入了创建幂等性Producer的功能,开启此特性后,Producer自动升级成幂等性Producer,当Producer发送了相同字段值的消息后,Broker会自动感知消息是否重复,继而避免数据重复。需要注意的是,这个特性只能保证单分区上的幂等性,即一个幂等性Producer能够保证某个主题的一个分区内不出现重复消息;只能实现单会话上的幂等性,这里的会话指的是Producer进程的一次运行,即重启Producer进程后,幂等性不保证。

开启方法:

  1. 二次开发代码中添加 “props.put(“enable.idempotence”,true)”。
  2. 客户端配置文件中添加 “enable.idempotence = true”。

Kafka Transaction 特性

特性说明:Kafka在0.11版本中,引入了事务特性,Kafka事务特性指的是一系列的生产者生产消息和消费者提交偏移量的操作在一个事务中,或者说是一个原子操作,生产消息和提交偏移量同时成功或者失败,此特性提供的是read committed隔离级别的事务,保证多条消息原子性的写入到目标分区,同时也能保证Consumer只能看到成功提交的事务消息。Kafka中的事务特性主要用于以下两种场景:

  1. 生产者发送多条数据可以封装在一个事务中,形成一个原子操作。多条消息要么都发送成功,要么都发送失败。
  2. read-process-write模式:将消息消费和生产封装在一个事务中,形成一个原子操作。在一个流式处理的应用中,常常一个服务需要从上游接收消息,然后经过处理后送达到下游,这就对应着消息的消费和生产。

二次开发代码样例如下:

// 初始化配置,开启事务特性
Properties props = new Properties();
props.put("enable.idempotence", true);
props.put("transactional.id", "transaction1");
...

KafkaProducer producer = new KafkaProducer<String, String>(props);

// init 事务
producer.initTransactions();
try {
	// 开启事务
	producer.beginTransaction();
	producer.send(record1);
	producer.send(record2);
	// 结束事务
	producer.commitTransaction();
} catch (KafkaException e) {
	// 事务 abort
	producer.abortTransaction();
}

就近消费特性

特性说明:Kafka 2.4.0之前版本,客户端的生产、消费都是面向各个partition的leader副本,follower副本仅用于数据冗余,不对外提供服务,常会导致leader副本压力较大,且在跨机房、机架的消费场景下,常会导致大量的机房、机架间的数据传输;Kafka 2.4.0及之后版本,Kafka内核支持从follower副本消费数据,在跨机房、机架的场景中,会大大降低数据传输量,减轻网络带宽压力。社区开放了ReplicaSelector接口来支持此特性,MRS Kafka中默认提供两种实现此接口的方式。

  1. RackAwareReplicaSelector:优先从相同机架的副本进行消费(机架内就近消费特性)。
  2. AzAwareReplicaSelector:优先从相同AZ内的节点上的副本进行消费(AZ内就近消费特性)。
以RackAwareReplicaSelector为例,描述实现就近消费副本的选取:
public class RackAwareReplicaSelector implements ReplicaSelector {

    @Override
    public Optional<ReplicaView> select(TopicPartition topicPartition,
                                        ClientMetadata clientMetadata,
                                        PartitionView partitionView) {
        if (clientMetadata.rackId() != null && !clientMetadata.rackId().isEmpty()) {
            Set<ReplicaView> sameRackReplicas = partitionView.replicas().stream()
                    // 过滤与客户端处于相同Rack的副本
                    .filter(replicaInfo -> clientMetadata.rackId().equals(replicaInfo.endpoint().rack()))
                    .collect(Collectors.toSet());
            if (sameRackReplicas.isEmpty()) {
                // 如果没有副本与客户端处于相同Rack,则返回leader副本
                return Optional.of(partitionView.leader());
            } else {
                // 到这里说明存在与客户端位于同一Rack的副本
                if (sameRackReplicas.contains(partitionView.leader())) {
                    // 如果客户端和leader在同一个机架,则优先返回leader副本
                    return Optional.of(partitionView.leader());
                } else {
                    // 否则,返回和leader同步最新的副本
                    return sameRackReplicas.stream().max(ReplicaView.comparator());
                }
            }
        } else {
            // 如果客户端请求中不包含机架信息,则默认返回leader副本
            return Optional.of(partitionView.leader());
        }
    }
}

开启方法:

  1. 服务端:根据不同特性更新“replica.selector.class”配置项:
    • 开启“机架内就近消费特性”,配置为“org.apache.kafka.common.replica.RackAwareReplicaSelector”。
    • 开启“AZ内就近消费特性”,配置为“org.apache.kafka.common.replica.AzAwareReplicaSelector”。
  2. 客户端:在“{客户端安装目录}/Kafka/kafka/config”目录中的“consumer.properties”消费配置文件里添加“client.rack”配置项:
    • 若服务端开启“机架内就近消费特性”,添加客户端所处的机架信息,如 client.rack = /default0/rack1。
    • 若服务端开启“AZ内就近消费特性”,添加客户端所处的机架信息,如 client.rack = /AZ1/rack1。

Ranger统一鉴权特性

特性说明:在Kafka 2.4.0之前版本,Kafka组件仅支持社区自带的SimpleAclAuthorizer鉴权插件,Kafka 2.4.0及之后版本,MRS Kafka同时支持Ranger鉴权插件和社区自带鉴权插件。默认使用Ranger鉴权,基于Ranger鉴权插件,可进行细粒度的Kafka Acl管理。

服务端使用Ranger鉴权插件时,若“allow.everyone.if.no.acl.found”配置为“true”,使用非安全端口访问时,所有行为将直接放行。建议使用Ranger鉴权插件的安全集群,不要开启“allow.everyone.if.no.acl.found”。