更新时间:2022-08-05 GMT+08:00

与原生KafkaConsumer接口适配说明

表1 接口适配说明

原生KafkaConsumer

类型

DISKafkaConsumer

说明

Set<TopicPartition> assignment()

接口

支持

获取consumer消费的通道与分区信息

Set<String> subscription()

接口

支持

获取consumer已订阅的通道名称

void assign(Collection<TopicPartition> var1)

接口

支持

分配指定的分区

void subscribe(Collection<String> var1)

接口

支持

订阅指定的通道

void subscribe(Collection<String> var1, ConsumerRebalanceListener var2)

接口

支持

订阅指定的通道并支持ConsumerRebalanceListener回调

void subscribe(Pattern var1, ConsumerRebalanceListener var2)

接口

支持

订阅所有匹配通配符的通道并支持ConsumerRebalanceListener回调

void unsubscribe()

接口

支持

取消所有订阅

ConsumerRecords<K, V> poll(long var1)

接口

支持

获取消息,但消息当中未实现

checksum(消息的CRC32校验值)、serializedKeySize(key序列化后的字节长度)、serializedValueSize(key序列化后的字节长度)。

void commitSync()

接口

支持

同步提交当前消费的offset

void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)

接口

支持

同步提交指定的offset

void commitAsync()

接口

支持

异步提交当前消费的offset

public void commitAsync(OffsetCommitCallback callback)

接口

支持

异步提交当前消费的offset并支持OffsetCommitCallback回调

void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)

接口

支持

异步提交指定的offset并支持OffsetCommitCallback回调

void seek(TopicPartition partition, long offset)

接口

支持

给分区设置指定的offset

void seekToBeginning(Collection<TopicPartition> partitions)

接口

支持

分区的offset设置为最旧的值

void seekToEnd(Collection<TopicPartition> partitions)

接口

支持

分区的offset设置为最新的值

long position(TopicPartition partition)

接口

支持

获取分区当前已消费数据的offset

OffsetAndMetadata committed(TopicPartition partition)

接口

支持

获取分区已提交的offset

List<PartitionInfo> partitionsFor(String topic)

接口

支持

获取通道的分区信息,但PartitionInfo里面的leader, replicas, inSyncReplicas未实现。

Map<String, List<PartitionInfo>> listTopics()

接口

支持

获取所有的通道信息,但PartitionInfo里面的leader, replicas, inSyncReplicas未实现。

void pause(Collection<TopicPartition> partitions)

接口

支持

暂停消费分区

void resume(Collection<TopicPartition> partitions)

接口

支持

恢复消费分区

Set<TopicPartition> paused()

接口

支持

获取所有已暂停消费的分区

close()

接口

支持

关闭consumer

Map<MetricName, ? extends Metric> metrics()

接口

不支持

获取统计信息

wakeup()

接口

不支持

内部实现原理不一样,不需要。

group.id

参数

支持

消费组ID

client.id

参数

支持

每个consumer的client.id必须唯一,如果不配置client.id, dis kafka consumer会生成一个uuid作为client.id。

key.deserializer

参数

支持

含义与kafka设置相同,但默认值为StringDeserializer (kafka必须配置)。

value.deserializer

参数

支持

含义与kafka设置相同,但默认值为StringDeserializer (kafka必须配置)。

enable.auto.commit

参数

支持

同kafka的默认设置,默认为true

  • true表示启用自动提交,每隔${auto.commit.interval.ms}的时间提交一次offset;
  • false表示不自动提交offset

auto.commit.interval.ms

参数

支持

自动提交offset的周期(毫秒),默认值5000。

auto.offset.reset

参数

支持

同Kafka的默认配置,默认为latest。

此值用于没有初始偏移量或者偏移量不正确的情况下,自动设置offset位置:

  • earliest 将偏移量自动重置为最旧的值;
  • latest将偏移量自动重置为最新的值;
  • none 如果在消费者组中没有发现前一个偏移量,就向消费者抛出一个异常;

其他参数

参数

不支持

-