更新时间:2024-06-27 GMT+08:00
分享

创建Storm Bolt

功能介绍

所有的消息处理逻辑都被封装在各个Bolt中。Bolt包含多种功能:过滤、聚合等等。

如果Bolt之后还有其他拓扑算子,可以使用OutputFieldsDeclarer.declareStream定义Stream,使用OutputCollector.emit来选择要发射的Stream。

代码样例

下面代码片段在com.huawei.storm.example.common包的“SplitSentenceBolt”类的“execute”方法中,作用在于拆分每条语句为单个单词并发送。

/**  
   * {@inheritDoc}  
   */  
  @Override  
  public void execute(Tuple input, BasicOutputCollector collector)  
  {  
      String sentence = input.getString(0);  
      String[] words = sentence.split(" ");  
      for (String word : words)  
      {  
          word = word.trim();  
          if (!word.isEmpty())  
          {  
              word = word.toLowerCase();  
              collector.emit(new Values(word));  
          }  
      }  
  }

下面代码片段在com.huawei.storm.example.wordcount包的“WordCountBolt”类的execute方法中,作用在于统计收到的每个单词的数量。

    @Override   
      public void execute(Tuple tuple, BasicOutputCollector collector)   
      {   
          String word = tuple.getString(0);   
          Integer count = counts.get(word);   
          if (count == null)   
          {   
              count = 0;   
          }   
          count++;   
          counts.put(word, count);   
          System.out.println("word: " + word + ", count: " + count);   
      }

相关文档