Updated on 2022-09-14 GMT+08:00

Storm-Kafka Development Guideline

Scenario

This topic describes how to use the Storm-Kafka toolkit to implement the interaction between Storm and Kafka. KafkaSpout and KafkaBolt are included. KafkaSpout enables Storm to read data from Kafka. KafkaBolt enables Storm to write data into Kafka.

The code example in this section is based on the new Kafka API and corresponds to com.huawei.storm.example.kafka.NewKafkaTopology.java in the IntelliJ IDEA project.

This topic applies only to the access between the Storm component and the Kafka component of MRS. Determine the versions of the jar packages described in this chapter based on the actual situation.

Procedure for Developing an Application

  1. Verify that the Storm and Kafka components of Huawei MRS have been installed and are running properly.
  2. Obtain the sample project folder storm-examples in the src\storm-examples directory where the sample code is decompressed. For details, see Obtaining Sample Projects from Huawei Mirrors. Import storm-examples to the IntelliJ IDEA development environment. For details, see Environment Preparation.
  3. Install the Storm client in Linux OS.

    For details about how to use the client on a Master or Core node in the cluster, see Using an MRS Client on Nodes Inside a Cluster. For details about how to install the client outside the MRS cluster, see Using an MRS Client on Nodes Outside a Cluster.

  4. Download and install the Kafka client.

Code Sample

  1. Create a topology
    public static void main(String[] args) throws Exception {
    
    // Set the topology
    Config conf = new Config();
    
    // Configure the security plug-in.
    setSecurityPlugin(conf);
    
    if (args.length >= 2) {
    // If the default keytab file name has been changed, configure the new keytab file name. 
    conf.put(Config.TOPOLOGY_KEYTAB_FILE, args[1]);
    }
    
    // Define KafkaSpout
    KafkaSpout kafkaSpout = new KafkaSpout<String, String>(
    getKafkaSpoutConfig(getKafkaSpoutStreams()));
    
    // CountBolt
    CountBolt countBolt = new CountBolt();
    //SplitBolt
    SplitSentenceBolt splitBolt = new SplitSentenceBolt();
    
    // KafkaBolt configuration information 
    KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>();
    kafkaBolt.withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
    .withTupleToKafkaMapper(
    new FieldNameBasedTupleToKafkaMapper("word", "count"));
    kafkaBolt.withProducerProperties(getKafkaProducerProps());
    
    // Define the topology. 
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("kafka-spout", kafkaSpout, 10);
    builder.setBolt("split-bolt", splitBolt,10).shuffleGrouping("kafka-spout", STREAMS[0]);
    builder.setBolt("count-bolt", countBolt, 10).fieldsGrouping(
    "split-bolt", new Fields("word"));
    builder.setBolt("kafka-bolt", kafkaBolt, 10).shuffleGrouping("count-bolt");
    
    // Submit the topology. 
    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
  2. getKafkaConsumerProps() in the NewKafkaTopology class
    private static Map<String, Object> getKafkaConsumerProps() throws Exception {
        Map<String, Object> props = new HashMap<String, Object>();
        props.put(GROUP_ID, DEFAULT_GROUP_ID);
        props.put(SASL_KERBEROS_SERVICE_NAME, DEFAULT_SERVICE_NAME);
        props.put(SECURITY_PROTOCOL, DEFAULT_SECURITY_PROTOCOL);
        props.put(KEY_DESERIALIZER, DEFAULT_DESERIALIZER);
        props.put(VALUE_DESERIALIZER, DEFAULT_DESERIALIZER);
        //props.put(KERBEROS_DOMAIN_NAME, "hadoop." + KerberosUtil.getDefaultRealm().toLowerCase());
        return props;
    }
  3. getKafkaProducerProps() in the NewKafkaTopology class
    private static Properties getKafkaProducerProps() throws Exception {
        Properties props = new Properties();
        props.put(BOOTSTRAP_SERVERS, KAFKA_BROKER_LIST);
        props.put(SECURITY_PROTOCOL, DEFAULT_SECURITY_PROTOCOL);
        props.put(KEY_SERIALIZER, DEFAULT_SERIALIZER);
        props.put(VALUE_SERIALIZER, DEFAULT_SERIALIZER);
        props.put(SASL_KERBEROS_SERVICE_NAME, DEFAULT_SERVICE_NAME);
        //props.put(KERBEROS_DOMAIN_NAME, "hadoop." + KerberosUtil.getDefaultRealm().toLowerCase());
        return props;
    }

If the cluster domain name is modified, set the Kerberos domain name in the Kafka consumer/producer attribute to the actual domain name of the cluster, for example, props.put(KERBEROS_DOMAIN_NAME, "hadoop.hadoop1.com").

Running the Application and Viewing Results

  1. Export the local JAR package. For details, see Packaging IntelliJ IDEA Code.
  2. Obtain the following JAR packages.

    Go to the Kafka/kafka/libs directory on the installed Kafka client, and obtain the following JAR packages:

    • kafka_<version>.jar
    • scala-library-<version>.jar
    • scala-logging_2.11-3.7.2.jar
    • metrics-core-<version>.jar
    • kafka-clients-<version>.jar
    • zookeeper-<version>.jar

    Obtain the following JAR file from the streaming-cql-<HD-Version>/lib directory on the Storm client:

    • storm-kafka-client-<version>.jar
    • storm-kafka-<version>.jar
    • slf4j-api-<version>.jar
    • guava-<version>.jar
    • json-simple-<version>.jar
    • curator-client-<version>.jar
    • curator-framework-<version>.jar
    • curator-recipes-<version>.jar

  3. Combine the JAR packages obtained in the preceding steps and export a complete service JAR package. For details, see Packaging Services.
  4. Go to the directory where the Kafka client locates in the Linux system, use the Kafka client to create the topic used by the topology in the Kafka/kafka/bin directory. Run the following command:

    ./kafka-topics.sh --create --topic input --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    ./kafka-topics.sh --create --topic output --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    The variable appended to --zookeeper specifies the ZooKeeper address. You must set the ZooKeeper address to the ZooKeeper address configured during cluster installation.

  5. Submit the topology on a Linux OS. The submission command example is as follows (the topology name is kafka-test):

    storm jar /opt/jartarget/source.jar com.huawei.storm.example.kafka.SimpleKafkaTopology kafka-test

  6. After the topology is successfully submitted, send data to Kafka and check whether related information is generated.

    Go to the directory where the Kafka client locates in the Linux system, start the consumer in the Kafka/kafka/bin directory, and check whether data is generated. Run the following command:

    ./kafka-console-consumer.sh --bootstrap-server {ip:port} --topic output --consumer.config ../config/consumer.properties

    Go to the directory whether the Kafka client locates in the Linux system, start the producer in the Kafka/kafka/bin directory, and write data into Kafka. Run the following command:

    ./kafka-console-producer.sh --broker-list {ip:port} --topic input --producer.config ../config/producer.properties

    Write test data into input, and check whether related data is generated in output. If yes, the Storm-Kafka topology is executed successfully.