更新时间:2026-01-19 GMT+08:00
分享

与原生KafkaConsumer接口适配说明

表1 接口适配说明

原生KafkaConsumer

类型

DISKafkaConsumer

说明

Set<TopicPartition> assignment()

接口

支持

获取consumer消费的通道与分区信息

Set<String> subscription()

接口

支持

获取consumer已订阅的通道名称

void assign(Collection<TopicPartition> var1)

接口

支持

分配指定的分区

void subscribe(Collection<String> var1)

接口

支持

订阅指定的通道

void subscribe(Collection<String> var1, ConsumerRebalanceListener var2)

接口

支持

订阅指定的通道并支持ConsumerRebalanceListener回调

void subscribe(Pattern var1, ConsumerRebalanceListener var2)

接口

支持

订阅所有匹配通配符的通道并支持ConsumerRebalanceListener回调

void unsubscribe()

接口

支持

取消所有订阅

ConsumerRecords<K, V> poll(long var1)

接口

支持

获取消息,但消息当中未实现

checksum(消息的CRC32校验值)、serializedKeySize(key序列化后的字节长度)、serializedValueSize(key序列化后的字节长度)。

void commitSync()

接口

支持

同步提交当前消费的offset

void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)

接口

支持

同步提交指定的offset

void commitAsync()

接口

支持

异步提交当前消费的offset

public void commitAsync(OffsetCommitCallback callback)

接口

支持

异步提交当前消费的offset并支持OffsetCommitCallback回调

void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

接口

支持

异步提交指定的offset并支持OffsetCommitCallback回调

void seek(TopicPartition partition, long offset)

接口

支持

给分区设置指定的offset

void seekToBeginning(Collection<TopicPartition> partitions)

接口

支持

分区的offset设置为最旧的值

void seekToEnd(Collection<TopicPartition> partitions)

接口

支持

分区的offset设置为最新的值

long position(TopicPartition partition)

接口

支持

获取分区当前已消费数据的offset

OffsetAndMetadata committed(TopicPartition partition)

接口

支持

获取分区已提交的offset

List<PartitionInfo> partitionsFor(String topic)

接口

支持

获取通道的分区信息,但PartitionInfo里面的leader, replicas, inSyncReplicas未实现。

Map<String, List<PartitionInfo>> listTopics()

接口

支持

获取所有的通道信息,但PartitionInfo里面的leader, replicas, inSyncReplicas未实现。

void pause(Collection<TopicPartition> partitions)

接口

支持

暂停消费分区

void resume(Collection<TopicPartition> partitions)

接口

支持

恢复消费分区

Set<TopicPartition> paused()

接口

支持

获取所有已暂停消费的分区

close()

接口

支持

关闭consumer

Map<MetricName, ? extends Metric> metrics()

接口

不支持

获取统计信息

wakeup()

接口

不支持

内部实现原理不一样,不需要。

group.id

参数

支持

消费组ID

client.id

参数

支持

每个consumer的client.id必须唯一,如果不配置client.id, dis kafka consumer会生成一个uuid作为client.id。

key.deserializer

参数

支持

含义与kafka设置相同,但默认值为StringDeserializer(kafka必须配置)。

value.deserializer

参数

支持

含义与kafka设置相同,但默认值为StringDeserializer(kafka必须配置)。

enable.auto.commit

参数

支持

同kafka的默认设置,默认为true

  • true表示启用自动提交,每隔${auto.commit.interval.ms}的时间提交一次offset;
  • false表示不自动提交offset

auto.commit.interval.ms

参数

支持

自动提交offset的周期(毫秒),默认值5000。

auto.offset.reset

参数

支持

同Kafka的默认配置,默认为latest。

此值用于没有初始偏移量或者偏移量不正确的情况下,自动设置offset位置:

  • earliest 将偏移量自动重置为最旧的值。
  • latest将偏移量自动重置为最新的值。
  • none 如果在消费者组中没有发现前一个偏移量,就向消费者抛出一个异常。

其他参数

参数

不支持

-

相关文档