更新时间:2024-08-05 GMT+08:00

Storm-Kafka开发指引

操作场景

本文档主要说明如何使用Storm-Kafka工具包,完成Storm和Kafka之间的交互。包含KafkaSpout和KafkaBolt两部分。KafkaSpout主要完成Storm从Kafka中读取数据的功能;KafkaBolt主要完成Storm向Kafka中写入数据的功能。

本章节代码样例基于Kafka新API,对应IntelliJ IDEA工程中com.huawei.storm.example.kafka.NewKafkaTopology.java。

本章节只适用于MRS产品Storm与Kafka组件间的访问。本章中描述的jar包的具体版本信息请以实际情况为准。

应用开发操作步骤

  1. 确认华为MRS产品Storm和Kafka组件已经安装,并正常运行。
  2. 参考获取MRS应用开发样例工程,获取样例代码解压目录中“src\storm-examples”目录下的样例工程文件夹storm-examples并将storm-examples导入到IntelliJ IDEA开发环境,参见准备Storm应用开发环境
  3. 在Linux环境下安装Storm客户端。

    集群的Master节点或者Core节点使用客户端可参考集群内节点使用MRS客户端,MRS集群外客户端的安装操作可参考集群外节点使用MRS客户端

  4. 下载并安装Kafka客户端程序。

代码样例

  1. 创建拓扑
    public static void main(String[] args) throws Exception {
    
    // 设置拓扑配置
    Config conf = new Config();
    
    // 配置安全插件
    //setSecurityPlugin(conf);
    
    if (args.length >= 2) {
    // 用户更改了默认的keytab文件名,这里需要将新的keytab文件名通过参数传入
    conf.put(Config.TOPOLOGY_KEYTAB_FILE, args[1]);
    }
    
    // 定义KafkaSpout
    KafkaSpout kafkaSpout = new KafkaSpout<String, String>(
    getKafkaSpoutConfig(getKafkaSpoutStreams()));
    
    // CountBolt
    CountBolt countBolt = new CountBolt();
    //SplitBolt
    SplitSentenceBolt splitBolt = new SplitSentenceBolt();
    
    // KafkaBolt配置信息
    KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>();
    kafkaBolt.withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
    .withTupleToKafkaMapper(
    new FieldNameBasedTupleToKafkaMapper("word", "count"));
    kafkaBolt.withProducerProperties(getKafkaProducerProps());
    
    // 定义拓扑
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("kafka-spout", kafkaSpout, 10);
    builder.setBolt("split-bolt", splitBolt,10).shuffleGrouping("kafka-spout", STREAMS[0]);
    builder.setBolt("count-bolt", countBolt, 10).fieldsGrouping(
    "split-bolt", new Fields("word"));
    builder.setBolt("kafka-bolt", kafkaBolt, 10).shuffleGrouping("count-bolt");
    
    // 命令行提交拓扑
    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
  2. NewKafkaTopology类里的getKafkaConsumerProps()
    private static Map<String, Object> getKafkaConsumerProps() throws Exception {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(GROUP_ID, DEFAULT_GROUP_ID);
        props.put(SASL_KERBEROS_SERVICE_NAME, DEFAULT_SERVICE_NAME);
        props.put(SECURITY_PROTOCOL, DEFAULT_SECURITY_PROTOCOL);
        props.put(KEY_DESERIALIZER, DEFAULT_DESERIALIZER);
        props.put(VALUE_DESERIALIZER, DEFAULT_DESERIALIZER);
        //props.put(KERBEROS_DOMAIN_NAME, "hadoop." + KerberosUtil.getDefaultRealm().toLowerCase());
        return props;
    }
  3. NewKafkaTopology类里的getKafkaProducerProps()
    private static Properties getKafkaProducerProps() throws Exception {
        Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS, KAFKA_BROKER_LIST);
        props.put(SECURITY_PROTOCOL, DEFAULT_SECURITY_PROTOCOL);
        props.put(KEY_SERIALIZER, DEFAULT_SERIALIZER);
        props.put(VALUE_SERIALIZER, DEFAULT_SERIALIZER);
        props.put(SASL_KERBEROS_SERVICE_NAME, DEFAULT_SERVICE_NAME);
        //props.put(KERBEROS_DOMAIN_NAME, "hadoop." + KerberosUtil.getDefaultRealm().toLowerCase());
        return props;
    }

如果修改了集群域名,在设置Kafka消费者/生产者属性中kerberos域名时,需要将其设置为集群实际域名,例如props.put(KERBEROS_DOMAIN_NAME , "hadoop.hadoop1.com")。

部署运行及结果查看

  1. 导出本地jar包,请参见打包Strom样例工程应用
  2. 获取下列jar包:

    在安装好的Kafka客户端目录中进入Kafka/kafka/libs目录,获取如下jar包:

    • kafka_<version>.jar
    • scala-library-<version>.jar
    • scala-logging_2.11-3.7.2.jar
    • metrics-core-<version>.jar
    • kafka-clients-<version>.jar
    • zookeeper-<version>.jar

    在Storm客户端的“streaming-cql-<HD-Version>/lib”目录中获取如下jar包:

    • storm-kafka-client-<version>.jar
    • storm-kafka-<version>.jar
    • slf4j-api-<version>.jar
    • guava-<version>.jar
    • json-simple-<version>.jar
    • curator-client-<version>.jar
    • curator-framework-<version>.jar
    • curator-recipes-<version>.jar

  3. 12中获取的jar包和配置文件合并统一打出完整的业务jar包,请参见打包Strom应用业务
  4. 进入Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下使用Kafka客户端创建拓扑中所用到的Topic,执行命令:

    ./kafka-topics.sh --create --topic input --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    ./kafka-topics.sh --create --topic output --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    “--zookeeper”后面填写的是ZooKeeper地址,需要改为安装集群时配置的ZooKeeper地址。

  5. 在Linux系统中完成拓扑的提交。提交命令示例(拓扑名为kafka-test):

    storm jar /opt/jartarget/source.jar com.huawei.storm.example.kafka.NewKafkaTopology kafka-test

  6. 拓扑提交成功后,可以向Kafka中发送数据,观察是否有相关信息生成。

    在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动consumer观察数据是否生成。执行命令:

    ./kafka-console-consumer.sh --bootstrap-server {ip:port} --topic output --consumer.config ../config/consumer.properties

    同时在Linux系统中进入Kafka客户端所在目录,在Kafka/kafka/bin目录下启动producer,向Kafka中写入数据。执行命令:

    ./kafka-console-producer.sh --broker-list {ip:port} --topic input --producer.config ../config/producer.properties

    向input中写入测试数据,可以观察到output中有对应的数据产生,则说明Storm-Kafka拓扑运行成功。