创建Storm Bolt
功能介绍
所有的消息处理逻辑都被封装在各个Bolt中。Bolt包含多种功能:过滤、聚合等等。
如果Bolt之后还有其他拓扑算子,可以使用OutputFieldsDeclarer.declareStream定义Stream,使用OutputCollector.emit来选择要发射的Stream。
代码样例
下面代码片段在com.huawei.storm.example.common包的“SplitSentenceBolt”类的“execute”方法中,作用在于拆分每条语句为单个单词并发送。
/** * {@inheritDoc} */ @Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getString(0); String[] words = sentence.split(" "); for (String word : words) { word = word.trim(); if (!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); } } }
下面代码片段在com.huawei.storm.example.wordcount包的“WordCountBolt”类的execute方法中,作用在于统计收到的每个单词的数量。
@Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) { count = 0; } count++; counts.put(word, count); System.out.println("word: " + word + ", count: " + count); }