Updated on 2022-08-16 GMT+08:00

Creating a Topology

Function Description

A topology is a directed acyclic graph (DAG) consisting of Spouts and Bolts.

Applications are submitted in storm jar mode. Therefore, a function for creating a topology must be invoked in the main function, and the class to which the main function belongs must be specified in storm jar parameters.

Code Sample

The following code snippet belongs to the main method in the WordCountTopology class of the com.huawei.storm.example.wordcount package, and these code snippets are used to create and submit applications.

   public static void main(String[] args) 
         throws Exception 
     { 
         TopologyBuilder builder = buildTopology(); 

         /* 
          * Tasks can be submitted in the following three modes: 
          * 1. Command line submitting. In this mode, a user must copy an application JAR package to a client and run related commands on the client. 
          * 2. Remote submitting. In this mode, a user must package application JAR packages and execute the main method in IntelliJ IDEA.  
          * 3. Local submitting. In this mode, a user must run an application for test on a local computer.  
          * The command line submitting and remote submitting modes support both security and normal modes. 
          * The local submitting mode supports the normal mode only.
          *  
          * A user can select only one mode for submitting a task. By default, the command line submitting mode is used. To use another mode, delete code comments. 
          */ 

         cmdSubmit(builder, null); 

         //Remote submitting mode 
         //remoteSubmit(builder); 

         //Local submitting mode, used for test 
         //localSubmit(builder); 
     } 

     private static void cmdSubmit(TopologyBuilder builder, Config conf) 
         throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException 
     { 
         if (conf == null) 
         { 
             conf = new Config(); 
         } 
         /** 
          * Command line submitting mode 
          * The procedures are as follows: 
          * Package a JAR package and then submit the task in the client CLI.  
          *   In remote submitting mode, package application JAR packages and other externally dependent JAR packages into one JAR package. Other externally dependent JAR packages are depended by user programs, not provided by the example project. 
          *   Run the storm -jar command on the Storm client to submit the task.
          *  
          * In a security environment, before submitting the task in the client CLI, run the kinit command to log in in security mode.  
          *  
          * Run the following command: 
          * ./storm jar ../example/example.jar com.huawei.storm.example.WordCountTopology 
          */ 
         conf.setNumWorkers(1); 
         StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); 
     } 

     private static void localSubmit(TopologyBuilder builder) 
         throws InterruptedException 
     { 
         Config conf = new Config(); 
         conf.setDebug(true); 
         conf.setMaxTaskParallelism(3); 
         LocalCluster cluster = new LocalCluster(); 
         cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology()); 
         Thread.sleep(10000); 
         cluster.shutdown(); 
     } 


         private static void remoteSubmit(TopologyBuilder builder) 
         throws AlreadyAliveException, InvalidTopologyException, NotALeaderException, AuthorizationException, 
         IOException 
     { 
         Config config = createConf(); 

         //Preparations in security mode 
         if (isSecurityModel()) 
         { 
             securityPrepare(); 
         } 
         String userJarFilePath = "User JAR package address"; 
         System.setProperty(STORM_SUBMIT_JAR_PROPERTY, userJarFilePath); 

         cmdSubmit(builder, config); 
     } 

       private static TopologyBuilder buildTopology() 
     { 
         TopologyBuilder builder = new TopologyBuilder(); 
         builder.setSpout("spout", new RandomSentenceSpout(), 5); 
         builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout"); 
         builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word")); 
         return builder; 
     } 

If ack is enabled for the topology, it is recommended that the number of ackers be less than or equal to the configured number of workers.