更新时间:2024-08-05 GMT+08:00
使用KafkaStreams统计数据
功能简介
以下提供High level KafkaStreams API代码样例及Low level KafkaStreams API代码样例,通过Kafka Streams读取输入Topic中的消息,统计每条消息中的单词个数,从输出Topic消费数据,将统计结果以Key-Value的形式输出,完成单词统计功能。
High Level KafkaStreams API代码样例
下面代码片段在com.huawei.bigdata.kafka.example.WordCountDemo类的createWordCountStream方法中。
static void createWordCountStream(final StreamsBuilder builder) {
// 从 input-topic 接收输入记录
final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME);
// 聚合 key-value 键值对的计算结果
final KTable<String, Long> counts = source
// 处理接收的记录,根据正则表达式REGEX_STRING进行分割
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING)))
// 聚合key-value键值对的计算结果
.groupBy((key, value) -> value)
// 最终结果计数
.count();
// 将计算结果的 key-value 键值对从 output topic 输出
counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
}
Low Level KafkaStreams API代码样例
下面代码片段在com.huawei.bigdata.kafka.example.WordCountProcessorDemo类中。
private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
@Override
public Processor<String, String> get() {
return new Processor<String, String>() {
// ProcessorContext实例,它提供对当前正在处理的记录的元数据的访问
private ProcessorContext context;
private KeyValueStore<String, Integer> kvStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
// 在本地保留processor context,因为在punctuate()和commit()时会用到
this.context = context;
// 每1秒执行一次punctuate()
this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
System.out.println("----------- " + timestamp + " ----------- ");
while (iter.hasNext()) {
final KeyValue<String, Integer> entry = iter.next();
System.out.println("[" + entry.key + ", " + entry.value + "]");
// 将新纪录作为键值对发送到下游处理器
context.forward(entry.key, entry.value.toString());
}
}
});
// 检索名称为KEY_VALUE_STATE_STORE_NAME的key-value状态存储区,可用于记忆最近收到的输入记录等
this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore(KEY_VALUE_STATE_STORE_NAME);
}
// 对input topic的接收记录进行处理,将记录拆分为单词并计数
@Override
public void process(String dummy, String line) {
String[] words = line.toLowerCase(Locale.getDefault()).split(REGEX_STRING);
for (String word : words) {
Integer oldValue = this.kvStore.get(word);
if (oldValue == null) {
this.kvStore.put(word, 1);
} else {
this.kvStore.put(word, oldValue + 1);
}
}
}
@Override
public void close() {
}
};
}
}
父主题: 开发Kafka应用