Flux开发指引
操作场景
本章节只适用于MRS产品中Storm组件使用Flux框架提交和部署拓扑的场景。本章中描述的jar包的具体版本信息请以实际情况为准。
Flux框架是Storm 0.10.0版本提供的提高拓扑部署易用性的框架。通过Flux框架,用户可以使用yaml文件来定义和部署拓扑,并且最终通过storm jar命令来提交拓扑的一种方式,极大地方便了拓扑的部署和提交,缩短了业务开发周期。
基本语法说明
使用Flux定义拓扑分为两种场景,定义新拓扑和定义已有拓扑。
- 使用Flux定义新拓扑
使用Flux定义拓扑,即使用yaml文件来描述拓扑,一个完整的拓扑定义需要包含以下几个部分:
- 拓扑名称
- 定义拓扑时需要的组件列表
- 拓扑的配置
- 拓扑的定义,包含spout列表、bolt列表和stream列表
定义拓扑名称:
name: "yaml-topology"
定义组件列表示例:
#简单的component定义 components: - id: "stringScheme" className: "org.apache.storm.kafka.StringScheme" #使用构造函数定义component - id: "defaultTopicSelector" className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector" constructorArgs: - "output" #构造函数入参使用引用,使用`ref`标志来说明引用 #在使用引用时请确保被引用对象在前面定义 - id: "stringMultiScheme" className: "org.apache.storm.spout.SchemeAsMultiScheme" constructorArgs: - ref: "stringScheme" #构造函数入参引用指定的properties文件中的配置项,使用`${}`标志来表示 #引用properties文件时,请在使用storm jar命令提交拓扑时使用--filter my-prop.properties的方式指明properties文件路径 - id: "zkHosts" className: "org.apache.storm.kafka.ZkHosts" constructorArgs: - "${kafka.zookeeper.root.list}" #构造函数入参引用环境变量,使用`${ENV-[NAME]}`方式来引用 #NAME必须是一个已经定义的环境变量 - id: "zkHosts" className: "org.apache.storm.kafka.ZkHosts" constructorArgs: - "${ENV-ZK_HOSTS}" #使用`properties`关键字初始化内部私有变量 - id: spoutConfig className: "org.apache.storm.kafka.SpoutConfig" constructorArgs: - ref: "zkHosts" - "input" - "/kafka/input" - "myId" properties: - name: "scheme" ref: "stringMultiScheme" #定义KafkaBolt使用的properties - id: "kafkaProducerProps" className: "java.util.Properties" configMethods: - name: "put" args: - "bootstrap.servers" - "${metadata.broker.list}" - name: "put" args: - "acks" - "1" - name: "put" args: - "key.serializer" - "org.apache.kafka.common.serialization.StringSerializer" - name: "put" args: - "value.serializer" - "org.apache.kafka.common.serialization.StringSerializer"
定义拓扑的配置示例:
config: #简单配置项 topology.workers: 1 #配置项值为列表,使用`[]`表示 topology.auto-credentials: ["class1","class2"] #配置项值为map结构 kafka.broker.properties: metadata.broker.list: "${metadata.broker.list}" producer.type: "async" request.required.acks: "0" serializer.class: "kafka.serializer.StringEncoder"
定义spout/bolt列表示例:
#定义spout列表 spouts: - id: "spout1" className: "org.apache.storm.kafka.KafkaSpout" constructorArgs: - ref: "spoutConfig" parallelism: 1 #定义bolt列表 bolts: - id: "bolt1" className: "com.huawei.storm.example.hbase.WordCounter" parallelism: 1 #使用方法来初始化对象,关键字为`configMethods` - id: "bolt2" className: "org.apache.storm.hbase.bolt.HBaseBolt" constructorArgs: - "WordCount" - ref: "mapper" configMethods: - name: "withConfigKey" args: ["hbase.conf"] parallelism: 1 - id: "kafkaBolt" className: "org.apache.storm.kafka.bolt.KafkaBolt" configMethods: - name: "withTopicSelector" args: - ref: "defaultTopicSelector" - name: "withProducerProperties" args: [ref: "kafkaProducerProps"] - name: "withTupleToKafkaMapper" args: - ref: "fieldNameBasedTupleToKafkaMapper"
定义stream列表示例:
#定义流式需要制定分组方式,关键字为`grouping`,当前提供的分组方式关键字有: #`ALL`,`CUSTOM`,`DIRECT`,`SHUFFLE`,`LOCAL_OR_SHUFFLE`,`FIELDS`,`GLOBAL`, 和 `NONE`. #其中`CUSTOM`为用户自定义分组 #简单流定义,分组方式为SHUFFLE streams: - name: "spout1 --> bolt1" from: "spout1" to: "bolt1" grouping: type: SHUFFLE #分组方式为FIELDS,需要传入参数 - name: "bolt1 --> bolt2" from: "bolt1" to: "bolt2" grouping: type: FIELDS args: ["word"] #分组方式为CUSTOM,需要指定用户自定义分组类 - name: "bolt-1 --> bolt2" from: "bolt-1" to: "bolt-2" grouping: type: CUSTOM customClass: className: "org.apache.storm.testing.NGrouping" constructorArgs: - 1
- 使用Flux定义已有拓扑
如果已经拥有拓扑(例如已经使用java代码定义了拓扑),仍然可以使用Flux框架来提交和部署,这时需要在现有的拓扑定义(如MyTopology.java)中实现getTopology()方法,在java中定义如下:
public StormTopology getTopology(Config config) 或者 public StormTopology getTopology(Map<String, Object> config)
这时可以使用如下yaml文件来定义拓扑:
name: "existing-topology" #拓扑名可随意指定 topologySource: className: "custom-class" #请指定客户端类
当然,仍然可以指定其他方法名来获得StormTopology(非getTopology()方法),yaml文件示例如下:
name: "existing-topology" topologySource: className: "custom-class " methodName: "getTopologyWithDifferentMethodName"
指定的方法必须接受一个Map<String, Object>类型或者Config类型的入参,并且返回backtype.storm.generated.StormTopology类型的对象,和getTopology()方法相同。
应用开发操作步骤
- 确认Storm组件已经安装,并正常运行。如果业务需要连接其他组件,请同时安装该组件并运行。
- 将storm-examples导入到Eclipse开发环境,请参见准备Storm应用开发环境。
- 参考storm-examples工程src/main/resources/flux-examples目录下的相关yaml应用示例,开发客户端业务。
- 获取相关配置文件。
本步骤只适用于业务中有访问外部组件需求的场景,如HDFS、HBase等,获取方式请参见Storm-HDFS开发指引或者Storm-HBase开发指引。若业务无需获取相关配置文件,请忽略本步骤。
Flux配置文件样例
下面是一个完整的访问Kafka业务的yaml文件样例:
name: "simple_kafka" components: - id: "zkHosts" #对象名称 className: "org.apache.storm.kafka.ZkHosts" #完整的类名 constructorArgs: #构造函数 - "${kafka.zookeeper.root.list}" #构造函数的参数 - id: "stringScheme" className: "org.apache.storm.kafka.StringScheme" - id: "stringMultiScheme" className: "org.apache.storm.spout.SchemeAsMultiScheme" constructorArgs: - ref: "stringScheme" #使用了引用,值为前面定义的stringScheme - id: spoutConfig className: "org.apache.storm.kafka.SpoutConfig" constructorArgs: - ref: "zkHosts" #使用了引用 - "input" - "/kafka/input" - "myId" properties: #使用properties来设置本对象中的名为“scheme”的私有变量 - name: "scheme" ref: "stringMultiScheme" - id: "defaultTopicSelector" className: "org.apache.storm.kafka.bolt.selector.DefaultTopicSelector" constructorArgs: - "output" - id: "fieldNameBasedTupleToKafkaMapper" className: "org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper" constructorArgs: - "words" #构造函数中第一个入参 - "count" #构造函数中第二个入参 config: topology.workers: 1 #设置拓扑的worker数量为1 kafka.broker.properties: #设置kafka相关的配置,值为map结构 metadata.broker.list: "${metadata.broker.list}" producer.type: "async" request.required.acks: "0" serializer.class: "kafka.serializer.StringEncoder" spouts: - id: "kafkaSpout" #spout名称 className: "storm.kafka.KafkaSpout"#spout的类名 constructorArgs: #使用构造函数的方式初始化 - ref: "spoutConfig" #构造函数的入参使用了引用 parallelism: 1 #该spout的并发设置为1 bolts: - id: "splitBolt" className: "com.huawei.storm.example.common.SplitSentenceBolt" parallelism: 1 - id: "countBolt" className: "com.huawei.storm.example.kafka.CountBolt" parallelism: 1 - id: "kafkaBolt" className: "org.apache.storm.kafka.bolt.KafkaBolt" configMethods: #使用调用对象内部方法的形式初始化对象 - name: "withTopicSelector" #调用的内部方法名 args: #内部方法需要的入参 - ref: "defaultTopicSelector" #入参只有一个,使用了引用 - name: "withTupleToKafkaMapper" #调用第二个内部方法 args: - ref: "fieldNameBasedTupleToKafkaMapper" #定义数据流 streams: - name: "kafkaSpout --> splitBolt" #第一个数据流名称,只作为展示 from: "kafkaSpout" #数据流起点,值为spouts中定义的kafkaSpout to: "splitBolt" #数据流终点,值为bolts中定义的splitBolt grouping:#定义分组方式 type: LOCAL_OR_SHUFFLE #分组方式为local_or_shuffle - name: "splitBolt --> countBolt" #第二个数据流 from: "splitBolt" to: "countBolt" grouping: type: FIELDS #分组方式为fields args: ["word"] #fields方式需要传入参数 - name: "countBolt --> kafkaBolt" #第三个数据流 from: "countBolt" to: "kafkaBolt" grouping: type: SHUFFLE #分组方式为shuffle,无需传入参数
部署运行及结果查看
- 使用如下命令打包:“mvn package”。执行成功后,将会在target目录生成storm-examples-1.0.jar。
- 将打好的jar包,以及开发好的yaml文件及相关的properties文件拷贝至storm客户端所在主机的任意目录下,如“/opt”。
- 执行命令提交拓扑。
storm jar /opt/jartarget/storm-examples-1.0.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml
如果设置业务以本地模式启动,则提交命令如下。
storm jar /opt/jartarget/storm-examples-1.0.jar org.apache.storm.flux.Flux --local /opt/my-topology.yaml
如果业务设置为本地模式,请确保提交环境为普通模式环境,当前不支持安全环境下使用命令提交本地模式的业务。
如果使用了properties文件,则提交命令如下。
storm jar /opt/jartarget/storm-examples-1.0.jar org.apache.storm.flux.Flux --remote /opt/my-topology.yaml --filter /opt/my-prop.properties
- 拓扑提交成功后请自行登录storm UI查看。