更新时间:2024-08-05 GMT+08:00

Storm Flux开发指引

操作场景

本章节只适用于MRS产品中Storm组件使用Flux框架提交和部署拓扑的场景。本章中描述的jar包的具体版本信息请以实际情况为准。

Flux框架是Storm提供的提高拓扑部署易用性的框架。通过Flux框架,用户可以使用yaml文件来定义和部署拓扑,并且最终通过storm jar命令来提交拓扑的一种方式,极大地方便了拓扑的部署和提交,缩短了业务开发周期。

基本语法说明

使用Flux定义拓扑分为两种场景,定义新拓扑和定义已有拓扑。

  1. 使用Flux定义新拓扑

    使用Flux定义拓扑,即使用yaml文件来描述拓扑,一个完整的拓扑定义需要包含以下几个部分:

    • 拓扑名称
    • 定义拓扑时需要的组件列表
    • 拓扑的配置
    • 拓扑的定义,包含spout列表、bolt列表和stream列表

    定义拓扑名称:

    name: "yaml-topology"

    定义组件列表示例:

    #简单的component定义 
     components: 
     - id: "stringScheme" 
     className: "org.apache.storm.kafka.StringScheme" 
    
     #使用构造函数定义component 
     - id: "defaultTopicSelector" 
     className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector" 
     constructorArgs: 
     - "output" 
    
     #构造函数入参使用引用,使用`ref`标志来说明引用 
     #在使用引用时请确保被引用对象在前面定义 
     - id: "stringMultiScheme" 
     className: "org.apache.storm.spout.SchemeAsMultiScheme" 
     constructorArgs: 
     - ref: "stringScheme" 
    
     #构造函数入参引用指定的properties文件中的配置项,使用`${}`标志来表示 
     #引用properties文件时,请在使用storm jar命令提交拓扑时使用--filter my-prop.properties的方式指明properties文件路径  
     - id: "zkHosts" 
     className: "org.apache.storm.kafka.ZkHosts" 
     constructorArgs:  
     - "${kafka.zookeeper.root.list}" 
    
     #构造函数入参引用环境变量,使用`${ENV-[NAME]}`方式来引用 
     #NAME必须是一个已经定义的环境变量 
     - id: "zkHosts" 
     className: "org.apache.storm.kafka.ZkHosts" 
     constructorArgs:  
     - "${ENV-ZK_HOSTS}" 
    
     #使用`properties`关键字初始化内部私有变量 
     - id: spoutConfig 
     className: "org.apache.storm.kafka.SpoutConfig" 
     constructorArgs: 
     - ref: "zkHosts" 
     - "input" 
     - "/kafka/input" 
     - "myId" 
     properties: 
     - name: "scheme" 
     ref: "stringMultiScheme"

    定义拓扑的配置示例:

    config: 
     #简单配置项 
     topology.workers: 1 
    
     #配置项值为列表,使用`[]`表示 
     topology.auto-credentials: ["class1","class2"] 
    
     #配置项值为map结构 
     kafka.broker.properties:  
     metadata.broker.list: "${metadata.broker.list}" 
     producer.type: "async" 
     request.required.acks: "0" 
     serializer.class: "kafka.serializer.StringEncoder"

    定义spout/bolt列表示例:

    #定义spout列表 
     spouts: 
     - id: "spout1" 
     className: "org.apache.storm.kafka.KafkaSpout" 
     constructorArgs: 
     - ref: "spoutConfig" 
     parallelism: 1 
    
     #定义bolt列表 
     bolts: 
     - id: "bolt1" 
     className: "com.huawei.storm.example.hbase.WordCounter" 
     parallelism: 1 
    
     #使用方法来初始化对象,关键字为`configMethods` 
     - id: "bolt2" 
     className: "org.apache.storm.hbase.bolt.HBaseBolt" 
     constructorArgs: 
     - "WordCount" 
     - ref: "mapper" 
     configMethods: 
     - name: "withConfigKey" 
     args: ["hbase.conf"] 
     parallelism: 1

    定义stream列表示例:

    #定义流式需要制定分组方式,关键字为`grouping`,当前提供的分组方式关键字有: 
     #`ALL`,`CUSTOM`,`DIRECT`,`SHUFFLE`,`LOCAL_OR_SHUFFLE`,`FIELDS`,`GLOBAL`, 和 `NONE`. 
     #其中`CUSTOM`为用户自定义分组 
    
     #简单流定义,分组方式为SHUFFLE 
     streams: 
     - name: "spout1 --> bolt1" 
     from: "spout1" 
     to: "bolt1" 
     grouping:  
     type: SHUFFLE 
    
     #分组方式为FIELDS,需要传入参数 
     - name: "bolt1 --> bolt2" 
     from: "bolt1" 
     to: "bolt2" 
     grouping: 
     type: FIELDS 
     args: ["word"] 
    
     #分组方式为CUSTOM,需要指定用户自定义分组类 
     - name: "bolt-1 --> bolt2" 
     from: "bolt-1" 
     to: "bolt-2" 
     grouping: 
     type: CUSTOM 
     customClass: 
     className: "org.apache.storm.testing.NGrouping" 
     constructorArgs: 
     - 1
  2. 使用Flux定义已有拓扑

    如果已经拥有拓扑(例如已经使用java代码定义了拓扑),仍然可以使用Flux框架来提交和部署,这时需要在现有的拓扑定义(如MyTopology.java)中实现getTopology()方法,在java中定义如下:

    public StormTopology getTopology(Config config) 
     或者 
     public StormTopology getTopology(Map<String, Object> config)

    这时可以使用如下yaml文件来定义拓扑:

    name: "existing-topology"  #拓扑名可随意指定 
     topologySource: 
     className: "custom-class"  #请指定客户端类

    当然,仍然可以指定其他方法名来获得StormTopology(非getTopology()方法),yaml文件示例如下:

    name: "existing-topology" 
     topologySource: 
     className: "custom-class " 
     methodName: "getTopologyWithDifferentMethodName"

    指定的方法必须接受一个Map<String, Object>类型或者Config类型的入参,并且返回org.apache.storm.generated.StormTopology类型的对象,和getTopology()方法相同。

应用开发操作步骤

  1. 确认Storm组件已经安装,并正常运行。如果业务需要连接其他组件,请同时安装该组件并运行。
  2. 将storm-examples导入到IntelliJ IDEA开发环境,请参见导入并配置Storm样例工程
  3. 参考storm-examples工程src/main/resources/flux-examples目录下的相关yaml应用示例,开发客户端业务。
  4. 获取相关配置文件。

    本步骤只适用于业务中有访问外部组件需求的场景,如HDFS、HBase等,获取方式请参见Storm-HDFS开发指引或者Storm-HBase开发指引。若业务无需获取相关配置文件,请忽略本步骤。

  5. 获取相关jar包,获取方法如下:

    • 在Storm客户端的“streaming-cql-<HD-Version>/lib”目录中获取如下jar包:

      flux-core-<version>.jar

      flux-wrappers-<version>.jar

    • 获取业务相关其他jar包,如访问HDFS时需要获取的jar包请参见6,其他场景类似。

Flux配置文件样例

下面是一个完整的访问Kafka业务的yaml文件样例:

name: "simple_kafka" 

 components: 
 - id: "zkHosts"  #对象名称 
 className: "org.apache.storm.kafka.ZkHosts"  #完整的类名 
 constructorArgs:   #构造函数 
 - "${kafka.zookeeper.root.list}"  #构造函数的参数 

 - id: "stringScheme"  
 className: "org.apache.storm.kafka.StringScheme" 

 - id: "stringMultiScheme" 
 className: "org.apache.storm.spout.SchemeAsMultiScheme" 
 constructorArgs: 
 - ref: "stringScheme"  #使用了引用,值为前面定义的stringScheme 

 - id: spoutConfig 
 className: "org.apache.storm.kafka.SpoutConfig" 
 constructorArgs: 
 - ref: "zkHosts"  #使用了引用 
 - "input" 
 - "/kafka/input" 
 - "myId" 
 properties:  #使用properties来设置本对象中的名为“scheme”的私有变量 
 - name: "scheme" 
 ref: "stringMultiScheme" 

 - id: "defaultTopicSelector"  
 className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector" 
 constructorArgs: 
 - "output" 

 - id: "fieldNameBasedTupleToKafkaMapper" 
 className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper" 
 constructorArgs: 
 - "words"  #构造函数中第一个入参 
 - "count"  #构造函数中第二个入参 

 config:  
 topology.workers: 1  #设置拓扑的worker数量为1 
 kafka.broker.properties:   #设置kafka相关的配置,值为map结构 
 metadata.broker.list: "${metadata.broker.list}" 
 producer.type: "async" 
 request.required.acks: "0" 
 serializer.class: "kafka.serializer.StringEncoder" 

 spouts: 
 - id: "kafkaSpout"  #spout名称 
 className: "org.apache.storm.kafka.KafkaSpout"#spout的类名 
 constructorArgs:  #使用构造函数的方式初始化 
 - ref: "spoutConfig"  #构造函数的入参使用了引用 
 parallelism: 1  #该spout的并发设置为1 

 bolts:  
 - id: "splitBolt" 
 className: "com.huawei.storm.example.common.SplitSentenceBolt" 
 parallelism: 1 

 - id: "countBolt"  
 className: "com.huawei.storm.example.kafka.CountBolt" 
 parallelism: 1 

 - id: "kafkaBolt"  
 className: "org.apache.storm.kafka.bolt.KafkaBolt" 
 configMethods:  #使用调用对象内部方法的形式初始化对象 
 - name: "withTopicSelector"  #调用的内部方法名 
 args:   #内部方法需要的入参 
 - ref: "defaultTopicSelector"  #入参只有一个,使用了引用 
 - name: "withTupleToKafkaMapper"  #调用第二个内部方法 
 args: 
 - ref: "fieldNameBasedTupleToKafkaMapper" 

 #定义数据流 
 streams: 
 - name: "kafkaSpout --> splitBolt"  #第一个数据流名称,只作为展示 
 from: "kafkaSpout"  #数据流起点,值为spouts中定义的kafkaSpout 
 to: "splitBolt"  #数据流终点,值为bolts中定义的splitBolt 
 grouping:#定义分组方式 
 type: LOCAL_OR_SHUFFLE  #分组方式为local_or_shuffle 

 - name: "splitBolt --> countBolt"  #第二个数据流 
 from: "splitBolt" 
 to: "countBolt" 
 grouping:  
 type: FIELDS  #分组方式为fields 
 args: ["word"]  #fields方式需要传入参数 

 - name: "countBolt --> kafkaBolt"  #第三个数据流 
 from: "countBolt" 
 to: "kafkaBolt" 
 grouping: 
 type: SHUFFLE  #分组方式为shuffle,无需传入参数

部署运行及结果查看

  1. 导出本地jar包,请参见打包Storm样例工程应用
  2. 4中获取的配置文件和5中获取的jar包合并统一打出完整的业务jar包,请参见打包Storm业务
  3. 将开发好的yaml文件及相关的properties文件复制至storm客户端所在主机的任意目录下,如“/opt”。
  4. 执行命令提交拓扑。

    storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml

    如果设置业务以本地模式启动,则提交命令如下:

    storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --local /opt/my-topology.yaml

    如果业务设置为本地模式,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。

    如果使用了properties文件,则提交命令如下:

    storm jar /opt/jartarget/source.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml --filter /opt/my-prop.properties

  5. 拓扑提交成功后请自行登录storm UI查看。