Updated on 2022-06-01 GMT+08:00

Storm-Kafka Development Guideline

Scenario

This section describes how to use the Storm-Kafka toolkit to implement the interaction between Storm and Kafka. KafkaSpout and KafkaBolt are included. KafkaSpout enables Storm to read data from Kafka. KafkaBolt enables Storm to write data into Kafka.

The sample code uses new Kafka APIs and corresponds to com.huawei.storm.example.kafka.NewKafkaTopology.java in the Eclipse project.

This section applies only to the access between the Storm component and the Kafka component of MRS. Determine the versions of the JAR files described in this section based on the actual situation.

Procedure for Developing an Application

  1. Verify that the Storm and Kafka components of MRS have been installed and are running properly.
  2. Ensure that a Storm sample code project has been set up. Import storm-examples to the Eclipse development environment. For details, see Configuring and Importing a Project.
  3. Use WinScp to import the Storm client installation package to the Linux environment and install the client. For details, see Preparing the Linux Client.
  4. If the security service is enabled on the cluster, you need to obtain a human-machine user from the administrator for authentication and obtain the keytab file of the user. Copy the obtained file to the src/main/resources directory of the sample project.

    • The obtained user must belong to both the storm and Kafka groups.

  5. Download and install the Kafka client. For details, see the Kafka Development Guide.

Sample Code

Create a topology.

public static void main(String[] args) throws Exception {

// Set the topology.
Config conf = new Config();

// Configure the security plug-in.
setSecurityPlugin(conf);

if (args.length >= 2) {
// If the default keytab file name has been changed, configure the new keytab file name.
conf.put(Config.TOPOLOGY_KEYTAB_FILE, args[1]);
}

// Define KafkaSpout.
KafkaSpout kafkaSpout = new KafkaSpout<String, String>(
getKafkaSpoutConfig(getKafkaSpoutStreams()));

// CountBolt
CountBolt countBolt = new CountBolt();
//SplitBolt
SplitSentenceBolt splitBolt = new SplitSentenceBolt();

// KafkaBolt configuration information
conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, getKafkaProducerProps());
KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>();
kafkaBolt.withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
.withTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("word", "count"));

// Define the topology.
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", kafkaSpout, 10);
builder.setBolt("split-bolt", splitBolt,10).shuffleGrouping("kafka-spout", STREAMS[0]);
builder.setBolt("count-bolt", countBolt, 10).fieldsGrouping(
"split-bolt", new Fields("word"));
builder.setBolt("kafka-bolt", kafkaBolt, 10).shuffleGrouping("count-bolt");

// Run the related command to submit the topology.
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}

Running the Application and Viewing Results

  1. Obtain the related configuration file using the following method:

    • Security mode: Obtain the keytab file by referring to 4.
    • Common mode: None

  2. In the root directory of Storm sample code, run the mvn package command. After the command is executed successfully, the storm-examples-1.0.jar file is generated in the target directory.
  3. Run the following commands to use the Kafka client to create the topic used by the topology:

    ./kafka-topics.sh --create --topic input --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    ./kafka-topics.sh --create --topic output --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    • The variable appended to --zookeeper specifies the ZooKeeper address. You must set the ZooKeeper address to the ZooKeeper address configured during cluster installation.
    • In security mode, the Kafka administrator needs to create a topic.

  4. Submit the topology on a Linux OS. The submission command example is as follows (the topology name is kafka-test):

    storm jar /opt/jartarget/storm-examples-1.0.jar com.huawei.storm.example.kafka.NewKafkaTopology kafka-test

    • In security mode, ensure that Kerberos security login has been performed before the storm-examples-1.0.jar file is submitted. In keytab mode, the login user and the user to whom the uploaded keytab file belongs must be the same user.
    • In security mode, the Kafka user must have the permission to access the corresponding topic. Therefore, you need to assign permission to the user before submitting the topology.

  5. After the topology is successfully submitted, send data to Kafka and check whether related information is generated.

    Go to the directory where the Kafka client locates in the Linux system, start the consumer in the Kafka/kafka/bin directory, and check whether data is generated. The command is detailed as follows:

    ./kafka-console-consumer.sh --bootstrap-server {ip:port} --topic output --new-consumer --consumer.config ../../../Kafka/kafka/config/consumer.properties

    Go to the directory whether the Kafka client locates in the Linux system, start the producer in the Kafka/kafka/bin directory, and write data into Kafka. The command is detailed as follows:

    ./kafka-console-producer.sh --broker-list {ip:port} --topic input --producer.config ../../../Kafka/kafka/config/producer.properties

    Write test data into input, and check whether related data is generated in output. If yes, the Storm-Kafka topology is executed successfully.