更新时间:2024-08-03 GMT+08:00

Kafka Java API接口介绍

Kafka相关接口同开源社区保持一致,详情请参见https://kafka.apache.org/24/documentation.html

Producer重要接口

表1 Producer重要参数

参数

描述

备注

bootstrap.servers

Broker地址列表。

生产者通过此参数值,创建与Broker之间的连接。

security.protocol

安全协议类型。

生产者使用的安全协议类型,当前安全模式下仅支持SASL协议,需要配置为SASL_PLAINTEXT。

sasl.kerberos.service.name

服务名。

Kafka集群运行,所使用的Kerberos用户名(需配置为kafka)。

key.serializer

消息Key值序列化类。

指定消息Key值序列化方式。

value.serializer

消息序列化类。

指定所发送消息的序列化方式。

表2 Producer重要接口函数

返回值类型

接口函数

描述

java.util.concurrent.Future<RecordMetadata>

send(ProducerRecord<K,V> record)

不带回调函数的发送接口,通常使用Future的get()函数阻塞发送,实现同步发送。

java.util.concurrent.Future<RecordMetadata>

send(ProducerRecord<K,V> record, Callback callback)

带回调函数的发送接口,通常用于异步发送后,通过回调函数实现对发送结果的处理。

void

onCompletion(RecordMetadata metadata, Exception exception);

回调函数接口方法,通过实现Callback中的此方法来进行异步发送结果的处理。

Consumer重要接口

表3 Consumer重要参数

参数

描述

备注

bootstrap.servers

Broker地址列表。

消费者通过此参数值,创建与Broker之间的连接。

security.protocol

安全协议类型。

消费者使用的安全协议类型,当前安全模式下仅支持SASL协议,需要配置为SASL_PLAINTEXT。

sasl.kerberos.service.name

服务名。

Kafka集群运行,所使用的Kerberos用户名(需配置为kafka)。

key.deserializer

消息Key值反序列化类。

反序列化消息Key值。

value.deserializer

消息反序列化类。

反序列化所接收的消息。

表4 Consumer重要接口函数

返回值类型

接口函数

描述

void

close()

关闭Consumer接口方法。

void

subscribe(java.util.Collection<java.lang.String> topics)

Topic订阅接口方法。

ConsumerRecords<K,V>

poll(final Duration timeout)

请求获取消息接口方法。