Storm-Kafka开发指引
操作场景
本文档主要说明如何使用Storm-Kafka工具包,完成Storm和Kafka之间的交互。包含KafkaSpout和KafkaBolt两部分。KafkaSpout主要完成Storm从Kafka中读取数据的功能;KafkaBolt主要完成Storm向Kafka中写入数据的功能。
本章节代码样例基于Kafka新API,对应IntelliJ IDEA工程中com.huawei.storm.example.kafka.NewKafkaTopology.java。
本章节只适用于MRS产品Storm与Kafka组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。
应用开发操作步骤
- 确认华为MRS产品Storm和Kafka组件已经安装,并正常运行。
- 参考获取MRS应用开发样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见准备Storm应用开发环境。
- 在Linux环境下安装Storm客户端。
集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端。
- 下载并安装Kafka客户端程序。
代码样例
- 创建拓扑
public static void main(String[] args) throws Exception { // 设置拓扑配置 Config conf = new Config(); // 配置安全插件 //setSecurityPlugin(conf); if (args.length >= 2) { // 用户更改了默认的keytab文件名,这里需要将新的keytab文件名通过参数传入 conf.put(Config.TOPOLOGY_KEYTAB_FILE, args[1]); } // 定义KafkaSpout KafkaSpout kafkaSpout = new KafkaSpout<String, String>( getKafkaSpoutConfig(getKafkaSpoutStreams())); // CountBolt CountBolt countBolt = new CountBolt(); //SplitBolt SplitSentenceBolt splitBolt = new SplitSentenceBolt(); // KafkaBolt配置信息 KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>(); kafkaBolt.withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC)) .withTupleToKafkaMapper( new FieldNameBasedTupleToKafkaMapper("word", "count")); kafkaBolt.withProducerProperties(getKafkaProducerProps()); // 定义拓扑 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-spout", kafkaSpout, 10); builder.setBolt("split-bolt", splitBolt,10).shuffleGrouping("kafka-spout", STREAMS[0]); builder.setBolt("count-bolt", countBolt, 10).fieldsGrouping( "split-bolt", new Fields("word")); builder.setBolt("kafka-bolt", kafkaBolt, 10).shuffleGrouping("count-bolt"); // 命令行提交拓扑 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); }
- NewKafkaTopology类里的getKafkaConsumerProps()
private static Map<String, Object> getKafkaConsumerProps() throws Exception { Map<String, Object> props = new HashMap<String, Object>(); props.put(GROUP_ID, DEFAULT_GROUP_ID); props.put(SASL_KERBEROS_SERVICE_NAME, DEFAULT_SERVICE_NAME); props.put(SECURITY_PROTOCOL, DEFAULT_SECURITY_PROTOCOL); props.put(KEY_DESERIALIZER, DEFAULT_DESERIALIZER); props.put(VALUE_DESERIALIZER, DEFAULT_DESERIALIZER); //props.put(KERBEROS_DOMAIN_NAME, "hadoop." + KerberosUtil.getDefaultRealm().toLowerCase()); return props; }
- NewKafkaTopology类里的getKafkaProducerProps()
private static Properties getKafkaProducerProps() throws Exception { Properties props = new Properties(); props.put(BOOTSTRAP_SERVERS, KAFKA_BROKER_LIST); props.put(SECURITY_PROTOCOL, DEFAULT_SECURITY_PROTOCOL); props.put(KEY_SERIALIZER, DEFAULT_SERIALIZER); props.put(VALUE_SERIALIZER, DEFAULT_SERIALIZER); props.put(SASL_KERBEROS_SERVICE_NAME, DEFAULT_SERVICE_NAME); //props.put(KERBEROS_DOMAIN_NAME, "hadoop." + KerberosUtil.getDefaultRealm().toLowerCase()); return props; }
如果修改了集群域名,在设置Kafka消费者/生产者属性中kerberos域名时,需要将其设置为集群实际域名,例如props.put(KERBEROS_DOMAIN_NAME , "hadoop.hadoop1.com")。
部署运行及结果查看
- 导出本地jar包,请参见打包Strom样例工程应用。
- 获取下列jar包:
在安装好的Kafka客户端目录中进入Kafka/kafka/libs目录,获取如下jar包:
- kafka_<version>.jar
- scala-library-<version>.jar
- scala-logging_2.11-3.7.2.jar
- metrics-core-<version>.jar
- kafka-clients-<version>.jar
- zookeeper-<version>.jar
在Storm客户端的“streaming-cql-<HD-Version>/lib”目录中获取如下jar包:
- storm-kafka-client-<version>.jar
- storm-kafka-<version>.jar
- slf4j-api-<version>.jar
- guava-<version>.jar
- json-simple-<version>.jar
- curator-client-<version>.jar
- curator-framework-<version>.jar
- curator-recipes-<version>.jar
- 将1和2中获取的jar包和配置文件合并统一打出完整的业务jar包,请参见打包Strom应用业务。
- 进入Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下使用Kafka客户端创建拓扑中所用到的Topic,执行命令:
./kafka-topics.sh --create --topic input --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka
./kafka-topics.sh --create --topic output --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka
“--zookeeper”后面填写的是ZooKeeper地址,需要改为安装集群时配置的ZooKeeper地址。
- 在Linux系统中完成拓扑的提交。提交命令示例(拓扑名为kafka-test):
storm jar /opt/jartarget/source.jar com.huawei.storm.example.kafka.NewKafkaTopology kafka-test
- 拓扑提交成功后,可以向Kafka中发送数据,观察是否有相关信息生成。
在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动consumer观察数据是否生成。执行命令:
./kafka-console-consumer.sh --bootstrap-server {ip:port} --topic output --consumer.config ../config/consumer.properties
同时在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动producer,向Kafka中写入数据。执行命令:
./kafka-console-producer.sh --broker-list {ip:port} --topic input --producer.config ../config/producer.properties
向input中写入测试数据,可以观察到output中有对应的数据产生,则说明Storm-Kafka拓扑运行成功。