Storm-Kafka开发指引
操作场景
本文档主要说明如何使用Storm-Kafka工具包,完成Storm和Kafka之间的交互。包含KafkaSpout和KafkaBolt两部分。KafkaSpout主要完成Storm从Kafka中读取数据的功能;KafkaBolt主要完成Storm向Kafka中写入数据的功能。
本章节代码样例基于Kafka新API,对应Eclipse工程中com.huawei.storm.example.kafka.NewKafkaTopology.java。
本章节只适用于MRS产品Storm与Kafka组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
应用开发操作步骤
- 确认MRS产品Storm和Kafka组件已经安装,并正常运行。
- 已搭建Storm示例代码工程,将storm-examples导入到Eclipse开发环境,参见导入并配置Storm样例工程。
- 用WinScp工具将Storm客户端安装包导入Linux环境并安装客户端,参见准备Linux客户端环境。
- 如果集群启用了安全服务,需要从管理员处获取一个“人机”用户,用于认证,并且获取到该用户的keytab文件。将获取到的文件拷贝到示例工程的 src/main/resources目录。
- 获取的用户需要同时属于storm组和kafka组。
- 下载并安装Kafka客户端程序,参见《Kafka应用开发》。
代码样例
创建拓扑:
public static void main(String[] args) throws Exception { // 设置拓扑配置 Config conf = new Config(); // 配置安全插件 setSecurityPlugin(conf); if (args.length >= 2) { // 用户更改了默认的keytab文件名,这里需要将新的keytab文件名通过参数传入 conf.put(Config.TOPOLOGY_KEYTAB_FILE, args[1]); } // 定义KafkaSpout KafkaSpout kafkaSpout = new KafkaSpout<String, String>( getKafkaSpoutConfig(getKafkaSpoutStreams())); // CountBolt CountBolt countBolt = new CountBolt(); //SplitBolt SplitSentenceBolt splitBolt = new SplitSentenceBolt(); // KafkaBolt配置信息 conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, getKafkaProducerProps()); KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>(); kafkaBolt.withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC)) .withTupleToKafkaMapper( new FieldNameBasedTupleToKafkaMapper("word", "count")); // 定义拓扑 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-spout", kafkaSpout, 10); builder.setBolt("split-bolt", splitBolt,10).shuffleGrouping("kafka-spout", STREAMS[0]); builder.setBolt("count-bolt", countBolt, 10).fieldsGrouping( "split-bolt", new Fields("word")); builder.setBolt("kafka-bolt", kafkaBolt, 10).shuffleGrouping("count-bolt"); // 命令行提交拓扑 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); }
部署运行及结果查看
- 获取相关配置文件,获取方式如下。
- 安全模式:参见4获取keytab文件。
- 普通模式:无。
- 在Storm示例代码根目录执行如下命令打包:"mvn package"。执行成功后,将会在target目录生成storm-examples-1.0.jar。
- 使用Kafka客户端创建拓扑中所用到的Topic,执行命令。
./kafka-topics.sh --create --topic input --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka
./kafka-topics.sh --create --topic output --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka
- “--zookeeper”后面填写的是ZooKeeper地址,需要改为安装集群时配置的ZooKeeper地址。
- 安全模式下,需要kafka管理员用户创建Topic。
- 在Linux系统中完成拓扑的提交。提交命令示例(拓扑名为kafka-test)。
storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.kafka.NewKafkaTopology kafka-test
- 安全模式下,在提交storm-examples-1.0.jar之前,请确保已经进行kerberos安全登录,并且keytab方式下,登录用户和所上传keytab所属用户必须是同一个用户。
- 安全模式下,kafka需要用户有相应Topic的访问权限,因此首先需要给用户赋权,再提交拓扑。
- 拓扑提交成功后,可以向Kafka中发送数据,观察是否有相关信息生成。
在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动consumer观察数据是否生成。执行命令:
./kafka-console-consumer.sh --bootstrap-server {ip:port} --topic output --new-consumer --consumer.config ../../../Kafka/kafka/config/consumer.properties
同时在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动producer,向Kafka中写入数据。执行命令:
./kafka-console-producer.sh --broker-list {ip:port} --topic input --producer.config ../../../Kafka/kafka/config/producer.properties
向input中写入测试数据,可以观察到output中有对应的数据产生,则说明Storm-Kafka拓扑运行成功。