更新时间:2024-08-03 GMT+08:00

Storm-OBS开发指引

操作场景

本章节只适用于MRS产品中Storm和OBS交互的场景。本章中描述的jar包的具体版本信息请以实际情况为准。

应用开发操作步骤

  1. 确认Storm已经安装,并正常运行。
  2. 将storm-examples导入到Eclipse开发环境,请参见导入并配置Storm样例工程
  3. 下载并安装HDFS客户端,参见准备HDFS应用运行环境
  4. 获取相关配置文件。获取方法如下。

    在安装好的HDFS客户端目录下找到目录“/opt/client/HDFS/hadoop/etc/hadoop”,在该目录下获取到配置文件“core-site.xml”和“hdfs-site.xml”。将这些文件拷贝到示例工程的 src/main/resources目录。并在core-site.xml中增加如下配置项:

    <property>
    <name>fs.obs.connection.ssl.enabled</name>
    <value>true</value>
    </property>
    <property>
    <name>fs.obs.endpoint</name>
    <value></value>
    </property>
    <property>
    <name>fs.obs.access.key</name>
    <value></value>
    </property>
    <property>
    <name>fs.obs.secret.key</name>
    <value></value>
    </property>

    具体AK,SK等的获取,请参考OBS相关帮助。

Eclipse代码样例

创建Topology。

  
private static final String DEFAULT_FS_URL = "obs://mybucket";

public static void main(String[] args) throws Exception   
    {  
      TopologyBuilder builder = new TopologyBuilder();  

      // 分隔符格式,当前采用“|”代替默认的“,”对tuple中的field进行分隔  
      // HdfsBolt必选参数  
      RecordFormat format = new DelimitedRecordFormat()  
              .withFieldDelimiter("|");  

      // 同步策略,每1000个tuple对文件系统进行一次同步  
      // HdfsBolt必选参数  
      SyncPolicy syncPolicy = new CountSyncPolicy(1000);  

      // 文件大小循环策略,当文件大小到达5M时,从头开始写  
      // HdfsBolt必选参数  
      FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.KB);  

      // 写入HDFS的目的文件  
      // HdfsBolt必选参数  
      FileNameFormat fileNameFormat = new DefaultFileNameFormat()  
              .withPath("/user/foo/");  


      //创建HdfsBolt  
      HdfsBolt bolt = new HdfsBolt()  
              .withFsUrl(DEFAULT_FS_URL)
              .withFileNameFormat(fileNameFormat)  
              .withRecordFormat(format)  
              .withRotationPolicy(rotationPolicy)  
              .withSyncPolicy(syncPolicy);  

      //Spout生成随机语句  
      builder.setSpout("spout", new RandomSentenceSpout(), 1);   
      builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");  
      builder.setBolt("count", bolt, 1).fieldsGrouping("split", new Fields("word"));        

      Config conf = new Config();  
  

      //命令行提交拓扑  
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  

    }

部署运行及结果查看

  1. 在Storm示例代码根目录执行如下命令打包:"mvn package"。执行成功后,将会在target目录生成storm-examples-1.0.jar。
  2. 执行命令提交拓扑。

    提交命令示例(拓扑名为obs-test)。

    storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.obs.SimpleOBSTopology obs://my-bucket obs-test

  3. 拓扑提交成功后请登录OBS Browser查看。