更新时间:2024-08-03 GMT+08:00

创建Storm Topology

功能介绍

一个Topology是Spouts和Bolts组成的有向无环图。

应用程序是通过storm jar的方式提交,则需要在main函数中调用创建Topology的函数,并在storm jar参数中指定main函数所在类。

代码样例

下面代码片段在com.huawei.storm.example.wordcount.WordCountTopology类中,作用在于构建应用程序并提交。

   public static void main(String[] args)  
          throws Exception  
      {  
          TopologyBuilder builder = buildTopology();  

          /*  
           * 任务的提交认为三种方式  
           * 1、命令行方式提交,这种需要将应用程序jar包复制到客户端机器上执行客户端命令提交  
           * 2、远程方式提交,这种需要将应用程序的jar包打包好之后在Eclipse中运行main方法提交  
           * 3、本地提交 ,在本地执行应用程序,一般用来测试  
           * 命令行方式和远程方式安全和普通模式都支持  
           * 本地提交仅支持普通模式  
           *   
           * 用户同时只能选择一种任务提交方式,默认命令行方式提交,如果是其他方式,请删除代码注释即可  
           */  

          submitTopology(builder, SubmitType.CMD); 

      }  

     private static void submitTopology(TopologyBuilder builder, SubmitType type) throws Exception 
     { 
         switch (type)  
         { 
             case CMD:  
             { 
                 cmdSubmit(builder, null); 
                 break; 
             } 
             case REMOTE: 
             { 
                 remoteSubmit(builder); 
                 break; 
             } 
             case LOCAL: 
             { 
                 localSubmit(builder); 
                 break; 
             } 
         } 
     } 

     /** 
      * 命令行方式远程提交 
      * 步骤如下: 
      * 1、打包成Jar包,然后在客户端命令行上面进行提交 
      * 2、远程提交的时候,要先将该应用程序和其他外部依赖(非example工程提供,用户自己程序依赖)的jar包打包成一个大的jar包 
      * 3、再通过storm客户端中storm -jar的命令进行提交 
      *  
      * 如果是安全环境,客户端命令行提交之前,必须先通过kinit命令进行安全登录
      *  
      * 运行命令如下: 
      *./storm jar ../example/example.jar com.huawei.streaming.storm.example.WordCountTopology 
      */     
     private static void cmdSubmit(TopologyBuilder builder, Config conf) 
         throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException 
     { 
         if (conf == null) 
         { 
             conf = new Config(); 
         } 
         conf.setNumWorkers(1); 

         StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, conf, builder.createTopology()); 
     } 

      private static void localSubmit(TopologyBuilder builder)  
          throws InterruptedException  
      {  
          Config conf = new Config();  
          conf.setDebug(true);  
          conf.setMaxTaskParallelism(3);  
          LocalCluster cluster = new LocalCluster();  
          cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());  
          Thread.sleep(10000);  
          cluster.shutdown();  
      }  


     private static void remoteSubmit(TopologyBuilder builder) 
         throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, 
         IOException 
     { 
         Config config = createConf(); 

         String userJarFilePath = "替换为用户jar包地址"; 
         System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); 

         //安全模式下的一些准备工作 
         if (isSecurityModel()) 
         { 
             securityPrepare(config); 
         } 
         config.setNumWorkers(1); 
         StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); 
     }  

        private static TopologyBuilder buildTopology()  
      {  
          TopologyBuilder builder = new TopologyBuilder();  
          builder.setSpout("spout", new RandomSentenceSpout(), 5);  
          builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout");  
          builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));  
          return builder;  
      }