更新时间:2024-08-03 GMT+08:00

使用KafkaStreams统计数据

功能简介

以下提供High level KafkaStreams API代码样例及Low level KafkaStreams API代码样例,通过Kafka Streams读取输入Topic中的消息,统计每条消息中的单词个数,从输出Topic消费数据,将统计结果以Key-Value的形式输出,完成单词统计功能。

High Level KafkaStreams API代码样例

下面代码片段在com.huawei.bigdata.kafka.example.WordCountDemo类的createWordCountStream方法中。

 static void createWordCountStream(final StreamsBuilder builder) {
     // 从 input-topic 接收输入记录
     final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME);

     // 聚合 key-value 键值对的计算结果
     final KTable<String, Long> counts = source
             // 处理接收的记录,根据正则表达式REGEX_STRING进行分割
             .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING)))
             // 聚合key-value键值对的计算结果
             .groupBy((key, value) -> value)
             // 最终结果计数
             .count();

     // 将计算结果的 key-value 键值对从 output topic 输出
     counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long()));
 }

Low Level KafkaStreams API代码样例

下面代码片段在com.huawei.bigdata.kafka.example.WordCountProcessorDemo类中。

private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
    @Override
    public Processor<String, String> get() {
        return new Processor<String, String>() {
            // ProcessorContext实例,它提供对当前正在处理的记录的元数据的访问
            private ProcessorContext context;
            private KeyValueStore<String, Integer> kvStore;

            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
                // 在本地保留processor context,因为在punctuate()和commit()时会用到
                this.context = context;
                // 每1秒执行一次punctuate()
                this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> {
                    try (final KeyValueIterator<String, Integer> iter = kvStore.all()) {
                        System.out.println("----------- " + timestamp + " ----------- ");
                        while (iter.hasNext()) {
                            final KeyValue<String, Integer> entry = iter.next();
                            System.out.println("[" + entry.key + ", " + entry.value + "]");
                            // 将新纪录作为键值对发送到下游处理器
                            context.forward(entry.key, entry.value.toString());
                        }
                    }
                });
                // 检索名称为KEY_VALUE_STATE_STORE_NAME的key-value状态存储区,可用于记忆最近收到的输入记录等
                this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore(KEY_VALUE_STATE_STORE_NAME);
            }

            // 对input topic的接收记录进行处理,将记录拆分为单词并计数
            @Override
            public void process(String dummy, String line) {
                String[] words = line.toLowerCase(Locale.getDefault()).split(REGEX_STRING);

                for (String word : words) {
                    Integer oldValue = this.kvStore.get(word);

                    if (oldValue == null) {
                        this.kvStore.put(word, 1);
                    } else {
                        this.kvStore.put(word, oldValue + 1);
                    }
                }
            }

            @Override
            public void close() {
            }
        };
    }
}