更新时间:2024-08-03 GMT+08:00
使用KafkaStreams统计数据
功能简介
以下提供High level KafkaStreams API代码样例及Low level KafkaStreams API代码样例,通过Kafka Streams读取输入Topic中的消息,统计每条消息中的单词个数,从输出Topic消费数据,将统计结果以Key-Value的形式输出,完成单词统计功能。
High level KafkaStreams API代码样例
下面代码片段在com.huawei.bigdata.kafka.example.WordCountDemo类的createWordCountStream方法中。
static void createWordCountStream(final StreamsBuilder builder) { // 从 input-topic 接收输入记录 final KStream<String, String> source = builder.stream(INPUT_TOPIC_NAME); // 聚合 key-value 键值对的计算结果 final KTable<String, Long> counts = source // 处理接收的记录,根据正则表达式REGEX_STRING进行分割 .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(REGEX_STRING))) // 聚合key-value键值对的计算结果 .groupBy((key, value) -> value) // 最终结果计数 .count(); // 将计算结果的 key-value 键值对从 output topic 输出 counts.toStream().to(OUTPUT_TOPIC_NAME, Produced.with(Serdes.String(), Serdes.Long())); } // 用户自己申请的机机账号keytab文件名称 private static final String USER_KEYTAB_FILE = "请修改为真实keytab文件名"; // 用户自己申请的机机账号名称 private static final String USER_PRINCIPAL = "请修改为真实用户名称"
Low level KafkaStreams API代码样例
下面代码片段在com.huawei.bigdata.kafka.example.WordCountProcessorDemo类中。
private static class MyProcessorSupplier implements ProcessorSupplier<String, String> { @Override public Processor<String, String> get() { return new Processor<String, String>() { // ProcessorContext实例,它提供对当前正在处理的记录的元数据的访问 private ProcessorContext context; private KeyValueStore<String, Integer> kvStore; @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) { // 在本地保留processor context,因为在punctuate()和commit()时会用到 this.context = context; // 每1秒执行一次punctuate() this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> { try (final KeyValueIterator<String, Integer> iter = kvStore.all()) { System.out.println("----------- " + timestamp + " ----------- "); while (iter.hasNext()) { final KeyValue<String, Integer> entry = iter.next(); System.out.println("[" + entry.key + ", " + entry.value + "]"); // 将新纪录作为键值对发送到下游处理器 context.forward(entry.key, entry.value.toString()); } } }); // 检索名称为KEY_VALUE_STATE_STORE_NAME的key-value状态存储区,可用于记忆最近收到的输入记录等 this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore(KEY_VALUE_STATE_STORE_NAME); } // 对input topic的接收记录进行处理,将记录拆分为单词并计数 @Override public void process(String dummy, String line) { String[] words = line.toLowerCase(Locale.getDefault()).split(REGEX_STRING); for (String word : words) { Integer oldValue = this.kvStore.get(word); if (oldValue == null) { this.kvStore.put(word, 1); } else { this.kvStore.put(word, oldValue + 1); } } } @Override public void close() { } }; } } // 用户自己申请的机机账号keytab文件名称 private static final String USER_KEYTAB_FILE = "请修改为真实keytab文件名"; // 用户自己申请的机机账号名称 private static final String USER_PRINCIPAL = "请修改为真实用户名称";
父主题: 开发Kafka应用