Storm Flux开发指引
操作场景
本章节只适用于MRS产品中Storm组件使用Flux框架提交和部署拓扑的场景。本章中描述的jar包的具体版本信息请以实际情况为准。
Flux框架是Storm提供的提高拓扑部署易用性的框架。通过Flux框架,用户可以使用yaml文件来定义和部署拓扑,并且最终通过storm jar命令来提交拓扑的一种方式,极大地方便了拓扑的部署和提交,缩短了业务开发周期。
基本语法说明
使用Flux定义拓扑分为两种场景,定义新拓扑和定义已有拓扑。
- 使用Flux定义新拓扑
    
使用Flux定义拓扑,即使用yaml文件来描述拓扑,一个完整的拓扑定义需要包含以下几个部分:
- 拓扑名称
 - 定义拓扑时需要的组件列表
 - 拓扑的配置
 - 拓扑的定义,包含spout列表、bolt列表和stream列表
 
定义拓扑名称:
name: "yaml-topology"
定义组件列表示例:
#简单的component定义 components: - id: "stringScheme" className: "org.apache.storm.kafka.StringScheme" #使用构造函数定义component - id: "defaultTopicSelector" className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector" constructorArgs: - "output" #构造函数入参使用引用,使用`ref`标志来说明引用 #在使用引用时请确保被引用对象在前面定义 - id: "stringMultiScheme" className: "org.apache.storm.spout.SchemeAsMultiScheme" constructorArgs: - ref: "stringScheme" #构造函数入参引用指定的properties文件中的配置项,使用`${}`标志来表示 #引用properties文件时,请在使用storm jar命令提交拓扑时使用--filter my-prop.properties的方式指明properties文件路径 - id: "zkHosts" className: "org.apache.storm.kafka.ZkHosts" constructorArgs: - "${kafka.zookeeper.root.list}" #构造函数入参引用环境变量,使用`${ENV-[NAME]}`方式来引用 #NAME必须是一个已经定义的环境变量 - id: "zkHosts" className: "org.apache.storm.kafka.ZkHosts" constructorArgs: - "${ENV-ZK_HOSTS}" #使用`properties`关键字初始化内部私有变量 - id: spoutConfig className: "org.apache.storm.kafka.SpoutConfig" constructorArgs: - ref: "zkHosts" - "input" - "/kafka/input" - "myId" properties: - name: "scheme" ref: "stringMultiScheme"定义拓扑的配置示例:
config: #简单配置项 topology.workers: 1 #配置项值为列表,使用`[]`表示 topology.auto-credentials: ["class1","class2"] #配置项值为map结构 kafka.broker.properties: metadata.broker.list: "${metadata.broker.list}" producer.type: "async" request.required.acks: "0" serializer.class: "kafka.serializer.StringEncoder"定义spout/bolt列表示例:
#定义spout列表 spouts: - id: "spout1" className: "org.apache.storm.kafka.KafkaSpout" constructorArgs: - ref: "spoutConfig" parallelism: 1 #定义bolt列表 bolts: - id: "bolt1" className: "com.huawei.storm.example.hbase.WordCounter" parallelism: 1 #使用方法来初始化对象,关键字为`configMethods` - id: "bolt2" className: "org.apache.storm.hbase.bolt.HBaseBolt" constructorArgs: - "WordCount" - ref: "mapper" configMethods: - name: "withConfigKey" args: ["hbase.conf"] parallelism: 1
定义stream列表示例:
#定义流式需要制定分组方式,关键字为`grouping`,当前提供的分组方式关键字有: #`ALL`,`CUSTOM`,`DIRECT`,`SHUFFLE`,`LOCAL_OR_SHUFFLE`,`FIELDS`,`GLOBAL`, 和 `NONE`. #其中`CUSTOM`为用户自定义分组 #简单流定义,分组方式为SHUFFLE streams: - name: "spout1 --> bolt1" from: "spout1" to: "bolt1" grouping: type: SHUFFLE #分组方式为FIELDS,需要传入参数 - name: "bolt1 --> bolt2" from: "bolt1" to: "bolt2" grouping: type: FIELDS args: ["word"] #分组方式为CUSTOM,需要指定用户自定义分组类 - name: "bolt-1 --> bolt2" from: "bolt-1" to: "bolt-2" grouping: type: CUSTOM customClass: className: "org.apache.storm.testing.NGrouping" constructorArgs: - 1
 - 使用Flux定义已有拓扑
    
如果已经拥有拓扑(例如已经使用java代码定义了拓扑),仍然可以使用Flux框架来提交和部署,这时需要在现有的拓扑定义(如MyTopology.java)中实现getTopology()方法,在java中定义如下:
public StormTopology getTopology(Config config) 或者 public StormTopology getTopology(Map<String, Object> config)
这时可以使用如下yaml文件来定义拓扑:
name: "existing-topology" #拓扑名可随意指定 topologySource: className: "custom-class" #请指定客户端类
当然,仍然可以指定其他方法名来获得StormTopology(非getTopology()方法),yaml文件示例如下:
name: "existing-topology" topologySource: className: "custom-class " methodName: "getTopologyWithDifferentMethodName"
 
     指定的方法必须接受一个Map<String, Object>类型或者Config类型的入参,并且返回org.apache.storm.generated.StormTopology类型的对象,和getTopology()方法相同。
 
应用开发操作步骤
- 确认Storm组件已经安装,并正常运行。如果业务需要连接其他组件,请同时安装该组件并运行。
 - 将storm-examples导入到IntelliJ IDEA开发环境,请参见导入并配置Storm样例工程。
 - 参考storm-examples工程src/main/resources/flux-examples目录下的相关yaml应用示例,开发客户端业务。
 - 获取相关配置文件。
    
    
 
     本步骤只适用于业务中有访问外部组件需求的场景,如HDFS、HBase等,获取方式请参见Storm-HDFS开发指引或者Storm-HBase开发指引。若业务无需获取相关配置文件,请忽略本步骤。
 - 获取相关jar包,获取方法如下:
    
    
- 在Storm客户端的“streaming-cql-<HD-Version>/lib”目录中获取如下jar包:
      
flux-core-<version>.jar
flux-wrappers-<version>.jar
 - 获取业务相关其他jar包,如访问HDFS时需要获取的jar包请参见6,其他场景类似。
 
 - 在Storm客户端的“streaming-cql-<HD-Version>/lib”目录中获取如下jar包:
      
 
Flux配置文件样例
下面是一个完整的访问Kafka业务的yaml文件样例:
name: "simple_kafka" 
 components: 
 - id: "zkHosts"  #对象名称 
 className: "org.apache.storm.kafka.ZkHosts"  #完整的类名 
 constructorArgs:   #构造函数 
 - "${kafka.zookeeper.root.list}"  #构造函数的参数 
 - id: "stringScheme"  
 className: "org.apache.storm.kafka.StringScheme" 
 - id: "stringMultiScheme" 
 className: "org.apache.storm.spout.SchemeAsMultiScheme" 
 constructorArgs: 
 - ref: "stringScheme"  #使用了引用,值为前面定义的stringScheme 
 - id: spoutConfig 
 className: "org.apache.storm.kafka.SpoutConfig" 
 constructorArgs: 
 - ref: "zkHosts"  #使用了引用 
 - "input" 
 - "/kafka/input" 
 - "myId" 
 properties:  #使用properties来设置本对象中的名为“scheme”的私有变量 
 - name: "scheme" 
 ref: "stringMultiScheme" 
 - id: "defaultTopicSelector"  
 className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector" 
 constructorArgs: 
 - "output" 
 - id: "fieldNameBasedTupleToKafkaMapper" 
 className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper" 
 constructorArgs: 
 - "words"  #构造函数中第一个入参 
 - "count"  #构造函数中第二个入参 
 config:  
 topology.workers: 1  #设置拓扑的worker数量为1 
 kafka.broker.properties:   #设置kafka相关的配置,值为map结构 
 metadata.broker.list: "${metadata.broker.list}" 
 producer.type: "async" 
 request.required.acks: "0" 
 serializer.class: "kafka.serializer.StringEncoder" 
 spouts: 
 - id: "kafkaSpout"  #spout名称 
 className: "org.apache.storm.kafka.KafkaSpout"#spout的类名 
 constructorArgs:  #使用构造函数的方式初始化 
 - ref: "spoutConfig"  #构造函数的入参使用了引用 
 parallelism: 1  #该spout的并发设置为1 
 bolts:  
 - id: "splitBolt" 
 className: "com.huawei.storm.example.common.SplitSentenceBolt" 
 parallelism: 1 
 - id: "countBolt"  
 className: "com.huawei.storm.example.kafka.CountBolt" 
 parallelism: 1 
 - id: "kafkaBolt"  
 className: "org.apache.storm.kafka.bolt.KafkaBolt" 
 configMethods:  #使用调用对象内部方法的形式初始化对象 
 - name: "withTopicSelector"  #调用的内部方法名 
 args:   #内部方法需要的入参 
 - ref: "defaultTopicSelector"  #入参只有一个,使用了引用 
 - name: "withTupleToKafkaMapper"  #调用第二个内部方法 
 args: 
 - ref: "fieldNameBasedTupleToKafkaMapper" 
 #定义数据流 
 streams: 
 - name: "kafkaSpout --> splitBolt"  #第一个数据流名称,只作为展示 
 from: "kafkaSpout"  #数据流起点,值为spouts中定义的kafkaSpout 
 to: "splitBolt"  #数据流终点,值为bolts中定义的splitBolt 
 grouping:#定义分组方式 
 type: LOCAL_OR_SHUFFLE  #分组方式为local_or_shuffle 
 - name: "splitBolt --> countBolt"  #第二个数据流 
 from: "splitBolt" 
 to: "countBolt" 
 grouping:  
 type: FIELDS  #分组方式为fields 
 args: ["word"]  #fields方式需要传入参数 
 - name: "countBolt --> kafkaBolt"  #第三个数据流 
 from: "countBolt" 
 to: "kafkaBolt" 
 grouping: 
 type: SHUFFLE  #分组方式为shuffle,无需传入参数
 部署运行及结果查看
- 导出本地jar包,请参见打包Storm样例工程应用。
 - 将4中获取的配置文件和5中获取的jar包合并统一打出完整的业务jar包,请参见打包Storm业务。
 - 将开发好的yaml文件及相关的properties文件复制至storm客户端所在主机的任意目录下,如“/opt”。
 - 执行命令提交拓扑。
    
    
storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml
如果设置业务以本地模式启动,则提交命令如下:
storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --local /opt/my-topology.yaml
 
     如果业务设置为本地模式,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。
如果使用了properties文件,则提交命令如下:
storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml --filter /opt/my-prop.properties
 - 拓扑提交成功后请自行登录storm UI查看。