更新时间:2024-06-14 GMT+08:00
分享

Flux开发指引

操作场景

本章节只适用于MRS产品中Storm组件使用Flux框架提交和部署拓扑的场景。本章中描述的jar包的具体版本信息请以实际情况为准。

Flux框架是Storm 0.10.0版本提供的提高拓扑部署易用性的框架。通过Flux框架,用户可以使用yaml文件来定义和部署拓扑,并且最终通过storm jar命令来提交拓扑的一种方式,极大地方便了拓扑的部署和提交,缩短了业务开发周期。

基本语法说明

使用Flux定义拓扑分为两种场景,定义新拓扑和定义已有拓扑。

  1. 使用Flux定义新拓扑

    使用Flux定义拓扑,即使用yaml文件来描述拓扑,一个完整的拓扑定义需要包含以下几个部分:

    • 拓扑名称
    • 定义拓扑时需要的组件列表
    • 拓扑的配置
    • 拓扑的定义,包含spout列表、bolt列表和stream列表

    定义拓扑名称:

    name: "yaml-topology"

    定义组件列表示例:

    #简单的component定义 
     components: 
     - id: "stringScheme" 
     className: "org.apache.storm.kafka.StringScheme" 
    
     #使用构造函数定义component 
     - id: "defaultTopicSelector" 
     className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector" 
     constructorArgs: 
     - "output" 
    
     #构造函数入参使用引用,使用`ref`标志来说明引用 
     #在使用引用时请确保被引用对象在前面定义 
     - id: "stringMultiScheme" 
     className: "org.apache.storm.spout.SchemeAsMultiScheme" 
     constructorArgs: 
     - ref: "stringScheme" 
    
     #构造函数入参引用指定的properties文件中的配置项,使用`${}`标志来表示 
     #引用properties文件时,请在使用storm jar命令提交拓扑时使用--filter my-prop.properties的方式指明properties文件路径  
     - id: "zkHosts" 
     className: "org.apache.storm.kafka.ZkHosts" 
     constructorArgs:  
     - "${kafka.zookeeper.root.list}" 
    
     #构造函数入参引用环境变量,使用`${ENV-[NAME]}`方式来引用 
     #NAME必须是一个已经定义的环境变量 
     - id: "zkHosts" 
     className: "org.apache.storm.kafka.ZkHosts" 
     constructorArgs:  
     - "${ENV-ZK_HOSTS}" 
    
     #使用`properties`关键字初始化内部私有变量 
     - id: spoutConfig 
     className: "org.apache.storm.kafka.SpoutConfig" 
     constructorArgs: 
     - ref: "zkHosts" 
     - "input" 
     - "/kafka/input" 
     - "myId" 
     properties: 
     - name: "scheme" 
     ref: "stringMultiScheme"
    
    #定义KafkaBolt使用的properties
    - id: "kafkaProducerProps"
        className: "java.util.Properties"
        configMethods:
          - name: "put"
            args:
              - "bootstrap.servers"
              - "${metadata.broker.list}"
          - name: "put"
            args:
              - "acks"
              - "1"
          - name: "put"
            args:
              - "key.serializer"
              - "org.apache.kafka.common.serialization.StringSerializer"
          - name: "put"
            args:
              - "value.serializer"
              - "org.apache.kafka.common.serialization.StringSerializer"
    

    定义拓扑的配置示例:

    config: 
     #简单配置项 
     topology.workers: 1 
    
     #配置项值为列表,使用`[]`表示 
     topology.auto-credentials: ["class1","class2"] 
    
     #配置项值为map结构 
     kafka.broker.properties:  
     metadata.broker.list: "${metadata.broker.list}" 
     producer.type: "async" 
     request.required.acks: "0" 
     serializer.class: "kafka.serializer.StringEncoder"

    定义spout/bolt列表示例:

    #定义spout列表 
     spouts: 
     - id: "spout1" 
     className: "org.apache.storm.kafka.KafkaSpout" 
     constructorArgs: 
     - ref: "spoutConfig" 
     parallelism: 1 
    
     #定义bolt列表 
     bolts: 
     - id: "bolt1" 
     className: "com.huawei.storm.example.hbase.WordCounter" 
     parallelism: 1 
    
     #使用方法来初始化对象,关键字为`configMethods` 
     - id: "bolt2" 
     className: "org.apache.storm.hbase.bolt.HBaseBolt" 
     constructorArgs: 
     - "WordCount" 
     - ref: "mapper" 
     configMethods: 
     - name: "withConfigKey" 
     args: ["hbase.conf"] 
     parallelism: 1
    
    - id: "kafkaBolt"
      className: "org.apache.storm.kafka.bolt.KafkaBolt"
      configMethods:
        - name: "withTopicSelector"
          args: 
            - ref: "defaultTopicSelector"
        - name: "withProducerProperties"
          args: [ref: "kafkaProducerProps"]
        - name: "withTupleToKafkaMapper"
          args:
            - ref: "fieldNameBasedTupleToKafkaMapper"
    

    定义stream列表示例:

    #定义流式需要制定分组方式,关键字为`grouping`,当前提供的分组方式关键字有: 
     #`ALL`,`CUSTOM`,`DIRECT`,`SHUFFLE`,`LOCAL_OR_SHUFFLE`,`FIELDS`,`GLOBAL`, 和 `NONE`. 
     #其中`CUSTOM`为用户自定义分组 
    
     #简单流定义,分组方式为SHUFFLE 
     streams: 
     - name: "spout1 --> bolt1" 
     from: "spout1" 
     to: "bolt1" 
     grouping:  
     type: SHUFFLE 
    
     #分组方式为FIELDS,需要传入参数 
     - name: "bolt1 --> bolt2" 
     from: "bolt1" 
     to: "bolt2" 
     grouping: 
     type: FIELDS 
     args: ["word"] 
    
     #分组方式为CUSTOM,需要指定用户自定义分组类 
     - name: "bolt-1 --> bolt2" 
     from: "bolt-1" 
     to: "bolt-2" 
     grouping: 
     type: CUSTOM 
     customClass: 
     className: "org.apache.storm.testing.NGrouping" 
     constructorArgs: 
     - 1
  2. 使用Flux定义已有拓扑

    如果已经拥有拓扑(例如已经使用java代码定义了拓扑),仍然可以使用Flux框架来提交和部署,这时需要在现有的拓扑定义(如MyTopology.java)中实现getTopology()方法,在java中定义如下:

    public StormTopology getTopology(Config config) 
     或者 
     public StormTopology getTopology(Map<String, Object> config)

    这时可以使用如下yaml文件来定义拓扑:

    name: "existing-topology"  #拓扑名可随意指定 
     topologySource: 
     className: "custom-class"  #请指定客户端类

    当然,仍然可以指定其他方法名来获得StormTopology(非getTopology()方法),yaml文件示例如下:

    name: "existing-topology" 
     topologySource: 
     className: "custom-class " 
     methodName: "getTopologyWithDifferentMethodName"

    指定的方法必须接受一个Map<String, Object>类型或者Config类型的入参,并且返回backtype.storm.generated.StormTopology类型的对象,和getTopology()方法相同。

应用开发操作步骤

  1. 确认Storm组件已经安装,并正常运行。如果业务需要连接其他组件,请同时安装该组件并运行。
  2. 将storm-examples导入到Eclipse开发环境,请参见准备Storm应用开发环境
  3. 参考storm-examples工程src/main/resources/flux-examples目录下的相关yaml应用示例,开发客户端业务。
  4. 获取相关配置文件。

    本步骤只适用于业务中有访问外部组件需求的场景,如HDFS、HBase等,获取方式请参见Storm-HDFS开发指引或者Storm-HBase开发指引。若业务无需获取相关配置文件,请忽略本步骤。

Flux配置文件样例

下面是一个完整的访问Kafka业务的yaml文件样例:

name: "simple_kafka" 

 components: 
 - id: "zkHosts"  #对象名称 
 className: "org.apache.storm.kafka.ZkHosts"  #完整的类名 
 constructorArgs:   #构造函数 
 - "${kafka.zookeeper.root.list}"  #构造函数的参数 

 - id: "stringScheme"  
 className: "org.apache.storm.kafka.StringScheme" 

 - id: "stringMultiScheme" 
 className: "org.apache.storm.spout.SchemeAsMultiScheme" 
 constructorArgs: 
 - ref: "stringScheme"  #使用了引用,值为前面定义的stringScheme 

 - id: spoutConfig 
 className: "org.apache.storm.kafka.SpoutConfig" 
 constructorArgs: 
 - ref: "zkHosts"  #使用了引用 
 - "input" 
 - "/kafka/input" 
 - "myId" 
 properties:  #使用properties来设置本对象中的名为“scheme”的私有变量 
 - name: "scheme" 
 ref: "stringMultiScheme" 

 - id: "defaultTopicSelector"  
 className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector" 
 constructorArgs: 
 - "output" 

 - id: "fieldNameBasedTupleToKafkaMapper" 
 className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper" 
 constructorArgs: 
 - "words"  #构造函数中第一个入参 
 - "count"  #构造函数中第二个入参 

 config:  
 topology.workers: 1  #设置拓扑的worker数量为1 
 kafka.broker.properties:   #设置kafka相关的配置,值为map结构 
 metadata.broker.list: "${metadata.broker.list}" 
 producer.type: "async" 
 request.required.acks: "0" 
 serializer.class: "kafka.serializer.StringEncoder" 

 spouts: 
 - id: "kafkaSpout"  #spout名称 
 className: "storm.kafka.KafkaSpout"#spout的类名 
 constructorArgs:  #使用构造函数的方式初始化 
 - ref: "spoutConfig"  #构造函数的入参使用了引用 
 parallelism: 1  #该spout的并发设置为1 

 bolts:  
 - id: "splitBolt" 
 className: "com.huawei.storm.example.common.SplitSentenceBolt" 
 parallelism: 1 

 - id: "countBolt"  
 className: "com.huawei.storm.example.kafka.CountBolt" 
 parallelism: 1 

 - id: "kafkaBolt"  
 className: "org.apache.storm.kafka.bolt.KafkaBolt" 
 configMethods:  #使用调用对象内部方法的形式初始化对象 
 - name: "withTopicSelector"  #调用的内部方法名 
 args:   #内部方法需要的入参 
 - ref: "defaultTopicSelector"  #入参只有一个,使用了引用 
 - name: "withTupleToKafkaMapper"  #调用第二个内部方法 
 args: 
 - ref: "fieldNameBasedTupleToKafkaMapper" 

 #定义数据流 
 streams: 
 - name: "kafkaSpout --> splitBolt"  #第一个数据流名称,只作为展示 
 from: "kafkaSpout"  #数据流起点,值为spouts中定义的kafkaSpout 
 to: "splitBolt"  #数据流终点,值为bolts中定义的splitBolt 
 grouping:#定义分组方式 
 type: LOCAL_OR_SHUFFLE  #分组方式为local_or_shuffle 

 - name: "splitBolt --> countBolt"  #第二个数据流 
 from: "splitBolt" 
 to: "countBolt" 
 grouping:  
 type: FIELDS  #分组方式为fields 
 args: ["word"]  #fields方式需要传入参数 

 - name: "countBolt --> kafkaBolt"  #第三个数据流 
 from: "countBolt" 
 to: "kafkaBolt" 
 grouping: 
 type: SHUFFLE  #分组方式为shuffle,无需传入参数

部署运行及结果查看

  1. 使用如下命令打包:“mvn package”。执行成功后,将会在target目录生成storm-examples-1.0.jar。
  2. 将打好的jar包,以及开发好的yaml文件及相关的properties文件拷贝至storm客户端所在主机的任意目录下,如“/opt”。
  3. 执行命令提交拓扑。

    storm jar /opt/jartarget/storm-examples-1.0.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml

    如果设置业务以本地模式启动,则提交命令如下。

    storm jar /opt/jartarget/storm-examples-1.0.jar org.apache.storm.flux.Flux --local /opt/my-topology.yaml

    如果业务设置为本地模式,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。

    如果使用了properties文件,则提交命令如下。

    storm jar /opt/jartarget/storm-examples-1.0.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml --filter /opt/my-prop.properties

  4. 拓扑提交成功后请自行登录storm UI查看。

相关文档