更新时间:2023-03-29 GMT+08:00

下载数据之消费位移

消费位移确认有自动提交与手动提交两种策略,在创建DISKafkaConsumer对象时,通过参数enable.auto.commit设定,true表示自动提交(默认)。

自动提交策略由消费者协调器(Coordinator)每隔${auto.commit.interval.ms}毫秒执行一次偏移量的提交;手动提交需要由客户端自己控制偏移量的提交。

  • 自动提交

    在创建一个消费者时,默认是自动提交偏移量,默认的提交间隔是5000ms。使用自动提交相关参数设置如下:

props.setProperty("enable.auto.commit", "true");// 显示设置偏移量自动提交
props.setProperty("auto.commit.interval.ms", "5000");// 设置偏移量提交时间间隔
  • 手动提交

在有些场景可能对消费偏移量有更精确的管理,以保证消息不被重复消费以及消息不被丢失。假设对拉取到的消息需要进行写入数据库处理,或者用于其他网络访问请求等等复杂的业务处理,在这种场景下,所有的业务处理完成后才认为消息被成功消费,此时必须手动控制偏移量的提交。使用手动提交相关参数设置如下:

props.put("enable.auto.commit", "false");// 显示设置偏移量手动提交

然后在业务处理成功后调用commitAsync()或commitSync()方法提交偏移量:commitAsync()是异步提交,消费者线程不会被阻塞,可能在提交偏移量操作的结果还未返回时就开始进行下一次的拉取操作,如需获取提交的结果,可以添加回调方法OffsetCommitCallback,当提交偏移量完成后会自动调用其onComplete()方法,以便根据回调结果执行不同的逻辑处理;

commitSync()是同步提交,会阻塞线程直到提交消费偏移量执行结果返回。

另外还可以精细的控制对具体分区具体offset数据的确认,确认的offset为已接受数据最大offset+1。例如消费一批数据,最后一条的offset为100,则此时需要commit 101,这样下次消费就会从101开始,不会重复。代码样例如下:

ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
 
if (!records.isEmpty())
{
    for (TopicPartition partition : records.partitions())
    {
        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
        for (ConsumerRecord<String, String> record : partitionRecords)
        {
            LOGGER.info("Value [{}], Partition [{}], Offset [{}], Key [{}]",
                    record.value(), record.partition(), record.offset(), record.key());
        }
        if (!partitionRecords.isEmpty())
        {
            // 同步确认某个分区的特定offset
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
        }
    }
}