Updated on 2022-06-01 GMT+08:00

Storm-OBS Development Guideline

Scenario

This topic applies only to the interaction between Storm and OBS. Determine the versions of the JAR files described in this section based on the actual situation.

Procedure for Developing an Application

  1. Verify that the Storm component has been installed and is running correctly.
  2. Import storm-examples to the Eclipse development environment. For details, see Configuring and Importing a Project.
  3. Download and install the HDFS client. For details, see Preparing a Linux Client Operating Environment.
  4. Obtain the related configuration files by performing the following operations:

    Go to the /opt/client/HDFS/hadoop/etc/hadoop directory on the installed HDFS client, and obtain the configuration files core-site.xml and hdfs-site.xml. Copy the obtained files to the src/main/resources directory of the sample project. Add the following configuration items to core-site.xml:

    <property>
    <name>fs.obs.connection.ssl.enabled</name>
    <value>true</value>
    </property>
    <property>
    <name>fs.obs.endpoint</name>
    <value></value>
    </property>
    <property>
    <name>fs.obs.access.key</name>
    <value></value>
    </property>
    <property>
    <name>fs.obs.secret.key</name>
    <value></value>
    </property>

    For details about how to obtain the AK and SK, see the OBS documentation.

Eclipse Sample Code

Create a topology.

  
private static final String DEFAULT_FS_URL = "obs://mybucket";

public static void main(String[] args) throws Exception   
    {  
      TopologyBuilder builder = new TopologyBuilder();  

      // Separator. Use | to replace the default comma (,) to separate fields in tuple.  
      // Mandatory HdfsBolt parameter  
      RecordFormat format = new DelimitedRecordFormat()  
              .withFieldDelimiter("|");  

      // Synchronization policy. Synchronize the file system for every 1000 tuples.  
      // Mandatory HdfsBolt parameter  
      SyncPolicy syncPolicy = new CountSyncPolicy(1000);  

      // File size cyclic policy. If the size of a file reaches 5 MB, the file is written from the beginning.  
      // Mandatory HdfsBolt parameter  
      FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.KB);  

      // Objective file written to HDFS  
      // Mandatory HdfsBolt parameter  
      FileNameFormat fileNameFormat = new DefaultFileNameFormat()  
              .withPath("/user/foo/");  


      //Create HdfsBolt.  
      HdfsBolt bolt = new HdfsBolt()  
              .withFsUrl(DEFAULT_FS_URL)
              .withFileNameFormat(fileNameFormat)  
              .withRecordFormat(format)  
              .withRotationPolicy(rotationPolicy)  
              .withSyncPolicy(syncPolicy);  

      //Spout generates a random statement.  
      builder.setSpout("spout", new RandomSentenceSpout(), 1);   
      builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");  
      builder.setBolt("count", bolt, 1).fieldsGrouping("split", new Fields("word"));        

      Config conf = new Config();  
  

      //Run the related command to submit the topology. 
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  

    }

Running the Application and Viewing Results

  1. In the root directory of Storm sample code, run the mvn package command. After the command is executed successfully, the storm-examples-1.0.jar file is generated in the target directory.
  2. Run the related command to submit the topology.

    The submission command example is as follows (the topology name is obs-test):

    storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.obs.SimpleOBSTopology obs://my-bucket obs-test

  3. After the topology is submitted successfully, log in to OBS Browser to view the topology.