Kafka Java API接口介绍
Kafka相关接口同开源社区保持一致,详情请参见https://kafka.apache.org/24/documentation.html。
Producer重要接口
参数 |
描述 |
备注 |
---|---|---|
bootstrap.servers |
Broker地址列表。 |
生产者通过此参数值,创建与Broker之间的连接。 |
security.protocol |
安全协议类型。 |
生产者使用的安全协议类型,当前安全模式下仅支持SASL协议,需要配置为SASL_PLAINTEXT。 |
sasl.kerberos.service.name |
服务名。 |
Kafka集群运行,所使用的Kerberos用户名(需配置为kafka)。 |
key.serializer |
消息Key值序列化类。 |
指定消息Key值序列化方式。 |
value.serializer |
消息序列化类。 |
指定所发送消息的序列化方式。 |
返回值类型 |
接口函数 |
描述 |
---|---|---|
java.util.concurrent.Future<RecordMetadata> |
send(ProducerRecord<K,V> record) |
不带回调函数的发送接口,通常使用Future的get()函数阻塞发送,实现同步发送。 |
java.util.concurrent.Future<RecordMetadata> |
send(ProducerRecord<K,V> record, Callback callback) |
带回调函数的发送接口,通常用于异步发送后,通过回调函数实现对发送结果的处理。 |
void |
onCompletion(RecordMetadata metadata, Exception exception); |
回调函数接口方法,通过实现Callback中的此方法来进行异步发送结果的处理。 |
Consumer重要接口
参数 |
描述 |
备注 |
---|---|---|
bootstrap.servers |
Broker地址列表。 |
消费者通过此参数值,创建与Broker之间的连接。 |
security.protocol |
安全协议类型。 |
消费者使用的安全协议类型,当前安全模式下仅支持SASL协议,需要配置为SASL_PLAINTEXT。 |
sasl.kerberos.service.name |
服务名。 |
Kafka集群运行,所使用的Kerberos用户名(需配置为kafka)。 |
key.deserializer |
消息Key值反序列化类。 |
反序列化消息Key值。 |
value.deserializer |
消息反序列化类。 |
反序列化所接收的消息。 |
返回值类型 |
接口函数 |
描述 |
---|---|---|
void |
close() |
关闭Consumer接口方法。 |
void |
subscribe(java.util.Collection<java.lang.String> topics) |
Topic订阅接口方法。 |
ConsumerRecords<K,V> |
poll(final Duration timeout) |
请求获取消息接口方法。 |