更新时间:2024-08-03 GMT+08:00

Storm-Kafka开发指引

操作场景

本文档主要说明如何使用Storm-Kafka工具包,完成Storm和Kafka之间的交互。包含KafkaSpout和KafkaBolt两部分。KafkaSpout主要完成Storm从Kafka中读取数据的功能;KafkaBolt主要完成Storm向Kafka中写入数据的功能。

本章节代码样例基于Kafka新API,对应Eclipse工程中com.huawei.storm.example.kafka.NewKafkaTopology.java。

本章节只适用于MRS产品Storm与Kafka组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。

应用开发操作步骤

  1. 确认MRS产品Storm和Kafka组件已经安装,并正常运行。
  2. 已搭建Storm示例代码工程,将storm-examples导入到Eclipse开发环境,参见导入并配置Storm样例工程
  3. 用WinScp工具将Storm客户端安装包导入Linux环境并安装客户端,参见准备Linux客户端环境
  4. 如果集群启用了安全服务,需要从管理员处获取一个“人机”用户,用于认证,并且获取到该用户的keytab文件。将获取到的文件拷贝到示例工程的 src/main/resources目录。

    • 获取的用户需要同时属于storm组和kafka组。

  5. 下载并安装Kafka客户端程序,参见《Kafka应用开发》。

代码样例

创建拓扑:

public static void main(String[] args) throws Exception {

// 设置拓扑配置
Config conf = new Config();

// 配置安全插件
setSecurityPlugin(conf);

if (args.length >= 2) {
// 用户更改了默认的keytab文件名,这里需要将新的keytab文件名通过参数传入
conf.put(Config.TOPOLOGY_KEYTAB_FILE, args[1]);
}

// 定义KafkaSpout
KafkaSpout kafkaSpout = new KafkaSpout<String, String>(
getKafkaSpoutConfig(getKafkaSpoutStreams()));

// CountBolt
CountBolt countBolt = new CountBolt();
//SplitBolt
SplitSentenceBolt splitBolt = new SplitSentenceBolt();

// KafkaBolt配置信息
conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, getKafkaProducerProps());
KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>();
kafkaBolt.withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
.withTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("word", "count"));

// 定义拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", kafkaSpout, 10);
builder.setBolt("split-bolt", splitBolt,10).shuffleGrouping("kafka-spout", STREAMS[0]);
builder.setBolt("count-bolt", countBolt, 10).fieldsGrouping(
"split-bolt", new Fields("word"));
builder.setBolt("kafka-bolt", kafkaBolt, 10).shuffleGrouping("count-bolt");

// 命令行提交拓扑
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}

部署运行及结果查看

  1. 获取相关配置文件,获取方式如下。

    • 安全模式:参见4获取keytab文件。
    • 普通模式:无。

  2. 在Storm示例代码根目录执行如下命令打包:"mvn package"。执行成功后,将会在target目录生成storm-examples-1.0.jar。
  3. 使用Kafka客户端创建拓扑中所用到的Topic,执行命令。

    ./kafka-topics.sh --create --topic input --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    ./kafka-topics.sh --create --topic output --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    • “--zookeeper”后面填写的是ZooKeeper地址,需要改为安装集群时配置的ZooKeeper地址。
    • 安全模式下,需要kafka管理员用户创建Topic。

  4. 在Linux系统中完成拓扑的提交。提交命令示例(拓扑名为kafka-test)。

    storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.kafka.NewKafkaTopology kafka-test

    • 安全模式下,在提交storm-examples-1.0.jar之前,请确保已经进行kerberos安全登录,并且keytab方式下,登录用户和所上传keytab所属用户必须是同一个用户。
    • 安全模式下,kafka需要用户有相应Topic的访问权限,因此首先需要给用户赋权,再提交拓扑。

  5. 拓扑提交成功后,可以向Kafka中发送数据,观察是否有相关信息生成。

    在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动consumer观察数据是否生成。执行命令:

    ./kafka-console-consumer.sh --bootstrap-server {ip:port} --topic output --new-consumer --consumer.config ../../../Kafka/kafka/config/consumer.properties

    同时在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动producer,向Kafka中写入数据。执行命令:

    ./kafka-console-producer.sh --broker-list {ip:port} --topic input --producer.config ../../../Kafka/kafka/config/producer.properties

    向input中写入测试数据,可以观察到output中有对应的数据产生,则说明Storm-Kafka拓扑运行成功。