创建Storm Bolt
功能介绍
所有的消息处理逻辑都被封装在各个Bolt中。Bolt包含多种功能:过滤、聚合等。
如果Bolt之后还有其他拓扑算子,可以使用OutputFieldsDeclarer.declareStream定义Stream,使用OutputCollector.emit来选择要发射的Stream。
代码样例
下面代码片段在com.huawei.storm.example.common.SplitSentenceBolt类中,作用在于拆分每条语句为单个单词并发送。
/**
* {@inheritDoc}
*/
@Override
public void execute(Tuple input, BasicOutputCollector collector)
{
String sentence = input.getString(0);
String[] words = sentence.split(" ");
for (String word : words)
{
word = word.trim();
if (!word.isEmpty())
{
word = word.toLowerCase();
collector.emit(new Values(word));
}
}
}
下面代码片段在com.huawei.storm.example.wordcount.WordCountBolt类中,作用在于统计收到的每个单词的数量。
@Override
public void execute(Tuple tuple, BasicOutputCollector collector)
{
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
{
count = 0;
}
count++;
counts.put(word, count);
System.out.println("word: " + word + ", count: " + count);
}