Storm-Kafka开发指引
操作场景
本文档主要说明如何使用Storm-Kafka工具包,完成Storm和Kafka之间的交互。包含KafkaSpout和KafkaBolt两部分。KafkaSpout主要完成Storm从Kafka中读取数据的功能;KafkaBolt主要完成Storm向Kafka中写入数据的功能。
本章节代码样例基于Kafka新API,对应IntelliJ IDEA工程中com.huawei.storm.example.kafka.NewKafkaTopology.java。
本章节只适用于MRS产品Storm与Kafka组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
应用开发操作步骤
- 确认Storm和Kafka组件已经安装,并正常运行。
 - 参考获取MRS应用开发样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见准备Storm应用开发环境。
 - 在Linux环境下安装Storm客户端。
    
    
集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端。
 - 如果集群启用了安全服务,需要从管理员处获取一个“人机”用户,用于登录FusionInsight Manager平台并通过认证,并且获取到该用户的keytab文件。
    
    
 
     - 获取的用户需要同时属于storm组和kafka组。
 - 默认情况下,用户的密码有效期是90天,所以获取的keytab文件的有效期是90天。如果需要延长该用户keytab的有效期,修改用户的密码策略并重新获取keytab。
 
 - 下载并安装Kafka客户端程序。
 
代码样例
创建拓扑:
public static void main(String[] args) throws Exception {
// 设置拓扑配置
Config conf = new Config();
// 配置安全插件
setSecurityPlugin(conf);
if (args.length >= 2) {
// 用户更改了默认的keytab文件名,这里需要将新的keytab文件名通过参数传入
conf.put(Config.TOPOLOGY_KEYTAB_FILE, args[1]);
}
// 定义KafkaSpout
KafkaSpout kafkaSpout = new KafkaSpout<String, String>(
getKafkaSpoutConfig(getKafkaSpoutStreams()));
// CountBolt
CountBolt countBolt = new CountBolt();
//SplitBolt
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
// KafkaBolt配置信息
KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>();
kafkaBolt.withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
.withTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("word", "count"));
kafkaBolt.withProducerProperties(getKafkaProducerProps());
// 定义拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", kafkaSpout, 10);
builder.setBolt("split-bolt", splitBolt,10).shuffleGrouping("kafka-spout", STREAMS[0]);
builder.setBolt("count-bolt", countBolt, 10).fieldsGrouping(
"split-bolt", new Fields("word"));
builder.setBolt("kafka-bolt", kafkaBolt, 10).shuffleGrouping("count-bolt");
// 命令行提交拓扑
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
 
 
  如果修改了集群域名,在设置Kafka消费者/生产者属性中kerberos域名时,需要将其设置为集群实际域名,例如props.put(KERBEROS_DOMAIN_NAME , "hadoop.hadoop1.com")。
部署运行及结果查看
- 导出本地jar包,请参见打包Storm样例工程应用。
 - 获取相关配置文件,获取方式如下:
    
    
- 安全模式:参见4获取keytab文件。
 - 普通模式:无。
 
 - 获取下列jar包:
    
    
在安装好的Kafka客户端目录中进入“Kafka/kafka/libs”目录,获取如下jar包:
- kafka_<version>.jar
 - scala-library-<version>.jar
 - scala-logging_2.11-3.7.2.jar
 - metrics-core-<version>.jar
 - kafka-clients-<version>.jar
 - zookeeper-<version>.jar
 
在Storm客户端的“streaming-cql-<HD-Version>/lib”目录中获取如下jar包:
- storm-kafka-client-<version>.jar
 - storm-kafka-<version>.jar
 - slf4j-api-<version>.jar
 - guava-<version>.jar
 - json-simple-<version>.jar
 - curator-client-<version>.jar
 - curator-framework-<version>.jar
 - curator-recipes-<version>.jar
 
 - 将1、2和3中获取的jar包和配置文件合并统一打出完整的业务jar包,请参见打包Storm业务。
 - 进入Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下使用Kafka客户端创建拓扑中所用到的Topic,执行命令:
    
    
./kafka-topics.sh --create --topic input --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka
./kafka-topics.sh --create --topic output --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka
 
     - “--zookeeper”后面填写的是ZooKeeper地址,需要改为安装集群时配置的ZooKeeper地址。
 - 安全模式下,需要Kafka管理员用户创建Topic。
 
 - 在Linux系统中完成拓扑的提交。
    
    
提交命令示例(拓扑名为kafka-test):
storm jar /opt/jartarget/source.jar com.huawei.storm.example.kafka.NewKafkaTopology kafka-test
 
     - 安全模式下,在提交“source.jar”之前,请确保已经进行kerberos安全登录,并且keytab方式下,登录用户和所上传keytab所属用户必须是同一个用户。
 - 安全模式下,Kafka需要用户有相应Topic的访问权限,因此首先需要在Kafka所在集群上使用Kafka管理员用户登录,之后使用kafka-acls.sh命令给用户赋权,成功之后再使用提交用户登录并提交拓扑。Kafka用户赋权详见“Kafka开发指南”的“更多信息”章节。
 
 - 拓扑提交成功后,可以向Kafka中发送数据,观察是否有相关信息生成。
    
    
在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动consumer观察数据是否生成。执行命令:
./kafka-console-consumer.sh --bootstrap-server {ip:port} --topic output --consumer.config ../config/consumer.properties
同时在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动producer,向Kafka中写入数据。执行命令:
./kafka-console-producer.sh --broker-list {ip:port} --topic input --producer.config ../config/producer.properties
向input中写入测试数据,可以观察到output中有对应的数据产生,则说明Storm-Kafka拓扑运行成功。