Updated on 2022-09-14 GMT+08:00

Creating a Topology

Function Description

A topology is a directed acyclic graph (DAG) consisting of Spouts and Bolts.

Applications are submitted in storm jar mode. Therefore, a function for creating a topology must be invoked in the main function, and the class to which the main function belongs must be specified in storm jar parameters.

Sample Code

The following code snippets are in the com.huawei.storm.example.wordcount.WordCountTopology class, and these code snippets are used to create and submit applications.

   public static void main(String[] args)  
          throws Exception  
      {  
          TopologyBuilder builder = buildTopology();  

          /*  
           * Tasks can be submitted in the following three modes: 
           * 1. Command line submitting. In this mode, a user must copy an application JAR package to a client and run related commands on the client.
           * 2. Remote submitting. In this mode, a user must package application JAR files and execute the main method in Eclipse.  
           * 3. Local submitting. In this mode, a user must run an application for test on a local computer.  
           * The command line submitting and remote submitting modes support both security and normal modes.  
           * The local submitting mode supports the normal mode only.  
           *   
           * A user can select only one mode for submitting a task. By default, the command line submitting mode is used. To use another mode, delete code comments.  
           */  

          submitTopology(builder, SubmitType.CMD); 

      }  

     private static void submitTopology(TopologyBuilder builder, SubmitType type) throws Exception 
     { 
         switch (type)  
         { 
             case CMD:  
             { 
                 cmdSubmit(builder, null); 
                 break; 
             } 
             case REMOTE: 
             { 
                 remoteSubmit(builder); 
                 break; 
             } 
             case LOCAL: 
             { 
                 localSubmit(builder); 
                 break; 
             } 
         } 
     } 

     /** 
      * Command line submitting mode 
      * The procedures are as follows: 
      * 1. Package a JAR file and then submit the task in the client CLI. 
      * 2. In remote submitting mode, package the JAR file of the application and other external dependency JAR files of users' applications into a big JAR file. External dependency JAR files are not provided by the sample project. 
      * 3. Run the storm -jar command on the Storm client to submit the task. 
      *  
      * In a security environment, before submitting the task in the client CLI, run the kinit command to perform login in security mode. 
      *  
      * Run the following command: 
      *./storm jar ../example/example.jar com.huawei.streaming.storm.example.WordCountTopology 
      */     
     private static void cmdSubmit(TopologyBuilder builder, Config conf) 
         throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException 
     { 
         if (conf == null) 
         { 
             conf = new Config(); 
         } 
         conf.setNumWorkers(1); 

         StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, conf, builder.createTopology()); 
     } 

      private static void localSubmit(TopologyBuilder builder)  
          throws InterruptedException  
      {  
          Config conf = new Config();  
          conf.setDebug(true);  
          conf.setMaxTaskParallelism(3);  
          LocalCluster cluster = new LocalCluster();  
          cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());  
          Thread.sleep(10000);  
          cluster.shutdown();  
      }  


     private static void remoteSubmit(TopologyBuilder builder) 
         throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, 
         IOException 
     { 
         Config config = createConf(); 

         String userJarFilePath = "User JAR file address"; 
         System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); 

         //Preparations to be made in security mode
         if (isSecurityModel()) 
         { 
             securityPrepare(config); 
         } 
         config.setNumWorkers(1); 
         StormSubmitter.submitTopologyWithProgressBar(TOPOLOGY_NAME, config, builder.createTopology()); 
     }  

        private static TopologyBuilder buildTopology()  
      {  
          TopologyBuilder builder = new TopologyBuilder();  
          builder.setSpout("spout", new RandomSentenceSpout(), 5);  
          builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout");  
          builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));  
          return builder;  
      }