更新时间:2024-08-05 GMT+08:00

Storm-Kafka开发指引

操作场景

本文档主要说明如何使用Storm-Kafka工具包,完成Storm和Kafka之间的交互。包含KafkaSpout和KafkaBolt两部分。KafkaSpout主要完成Storm从Kafka中读取数据的功能;KafkaBolt主要完成Storm向Kafka中写入数据的功能。

本章节代码样例基于Kafka新API,对应IntelliJ IDEA工程中com.huawei.storm.example.kafka.NewKafkaTopology.java。

本章节只适用于MRS产品Storm与Kafka组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。

应用开发操作步骤

  1. 确认Storm和Kafka组件已经安装,并正常运行。
  2. 参考获取MRS应用开发样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见准备Storm应用开发环境
  3. 在Linux环境下安装Storm客户端。

    集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端

  4. 如果集群启用了安全服务,需要从管理员处获取一个“人机”用户,用于登录FusionInsight Manager平台并通过认证,并且获取到该用户的keytab文件。

    • 获取的用户需要同时属于storm组和kafka组。
    • 默认情况下,用户的密码有效期是90天,所以获取的keytab文件的有效期是90天。如果需要延长该用户keytab的有效期,修改用户的密码策略并重新获取keytab。

  5. 下载并安装Kafka客户端程序。

代码样例

创建拓扑:

public static void main(String[] args) throws Exception {

// 设置拓扑配置
Config conf = new Config();

// 配置安全插件
setSecurityPlugin(conf);

if (args.length >= 2) {
// 用户更改了默认的keytab文件名,这里需要将新的keytab文件名通过参数传入
conf.put(Config.TOPOLOGY_KEYTAB_FILE, args[1]);
}

// 定义KafkaSpout
KafkaSpout kafkaSpout = new KafkaSpout<String, String>(
getKafkaSpoutConfig(getKafkaSpoutStreams()));

// CountBolt
CountBolt countBolt = new CountBolt();
//SplitBolt
SplitSentenceBolt splitBolt = new SplitSentenceBolt();

// KafkaBolt配置信息
KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>();
kafkaBolt.withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
.withTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("word", "count"));
kafkaBolt.withProducerProperties(getKafkaProducerProps());

// 定义拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", kafkaSpout, 10);
builder.setBolt("split-bolt", splitBolt,10).shuffleGrouping("kafka-spout", STREAMS[0]);
builder.setBolt("count-bolt", countBolt, 10).fieldsGrouping(
"split-bolt", new Fields("word"));
builder.setBolt("kafka-bolt", kafkaBolt, 10).shuffleGrouping("count-bolt");

// 命令行提交拓扑
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}

如果修改了集群域名,在设置Kafka消费者/生产者属性中kerberos域名时,需要将其设置为集群实际域名,例如props.put(KERBEROS_DOMAIN_NAME , "hadoop.hadoop1.com")。

部署运行及结果查看

  1. 导出本地jar包,请参见打包Storm样例工程应用
  2. 获取相关配置文件,获取方式如下:

    • 安全模式:参见4获取keytab文件。
    • 普通模式:无。

  3. 获取下列jar包:

    在安装好的Kafka客户端目录中进入“Kafka/kafka/libs”目录,获取如下jar包:

    • kafka_<version>.jar
    • scala-library-<version>.jar
    • scala-logging_2.11-3.7.2.jar
    • metrics-core-<version>.jar
    • kafka-clients-<version>.jar
    • zookeeper-<version>.jar

    在Storm客户端的“streaming-cql-<HD-Version>/lib”目录中获取如下jar包:

    • storm-kafka-client-<version>.jar
    • storm-kafka-<version>.jar
    • slf4j-api-<version>.jar
    • guava-<version>.jar
    • json-simple-<version>.jar
    • curator-client-<version>.jar
    • curator-framework-<version>.jar
    • curator-recipes-<version>.jar

  4. 123中获取的jar包和配置文件合并统一打出完整的业务jar包,请参见打包Storm业务
  5. 进入Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下使用Kafka客户端创建拓扑中所用到的Topic,执行命令:

    ./kafka-topics.sh --create --topic input --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    ./kafka-topics.sh --create --topic output --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    • “--zookeeper”后面填写的是ZooKeeper地址,需要改为安装集群时配置的ZooKeeper地址。
    • 安全模式下,需要Kafka管理员用户创建Topic。

  6. 在Linux系统中完成拓扑的提交。

    提交命令示例(拓扑名为kafka-test):

    storm jar /opt/jartarget/source.jar com.huawei.storm.example.kafka.NewKafkaTopology kafka-test

    • 安全模式下,在提交“source.jar”之前,请确保已经进行kerberos安全登录,并且keytab方式下,登录用户和所上传keytab所属用户必须是同一个用户。
    • 安全模式下,Kafka需要用户有相应Topic的访问权限,因此首先需要在Kafka所在集群上使用Kafka管理员用户登录,之后使用kafka-acls.sh命令给用户赋权,成功之后再使用提交用户登录并提交拓扑。Kafka用户赋权详见“Kafka开发指南”的“更多信息”章节。

  7. 拓扑提交成功后,可以向Kafka中发送数据,观察是否有相关信息生成。

    在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动consumer观察数据是否生成。执行命令:

    ./kafka-console-consumer.sh --bootstrap-server {ip:port} --topic output --consumer.config ../config/consumer.properties

    同时在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动producer,向Kafka中写入数据。执行命令:

    ./kafka-console-producer.sh --broker-list {ip:port} --topic input --producer.config ../config/producer.properties

    向input中写入测试数据,可以观察到output中有对应的数据产生,则说明Storm-Kafka拓扑运行成功。