Updated on 2022-08-16 GMT+08:00

Storm-Kafka Development Guideline

Scenario

This topic describes how to use the Storm-Kafka toolkit to implement the interaction between Storm and Kafka. KafkaSpout and KafkaBolt are included. KafkaSpout enables Storm to read data from Kafka. KafkaBolt enables Storm to write data into Kafka.

The code example in this section is based on the new Kafka API and corresponds to com.huawei.storm.example.kafka.NewKafkaTopology.java in the IntelliJ IDEA project.

This topic applies only to the access between the Storm component and the Kafka component of MRS. Determine the versions of the jar packages described in this chapter based on the actual situation.

Procedure for Developing an Application

  1. Verify that the Storm and Kafka components have been installed and are running properly.
  2. Obtain the sample project folder storm-examples in the src\storm-examples directory where the sample code is decompressed. For details, see Obtaining Sample Projects from Huawei Mirrors. Import storm-examples to the IntelliJ IDEA development environment. For details, see Environment Preparation.
  3. Install the Storm client in Linux OS.

    For details about how to use the client on a Master or Core node in the cluster, see Using an MRS Client on Nodes Inside a Cluster. For details about how to install the client outside the MRS cluster, see Using an MRS Client on Nodes Outside a Cluster.

  4. If security services are enabled in the cluster, obtain a human-machine user from the administrator for login to the FusionInsight Manager platform and authentication, and obtain the keytab file of the user.

    • The obtained user must belong to the storm group and kafka group.
    • The default validity period of a user password is 90 days. Therefore, the validity period of the obtained keytab file is 90 days. To prolong the validity period of the keytab file, modify the user password policy and obtain the keytab file again.

  5. Download and install the Kafka client.

Code Sample

Create a topology. (Change the IP addresses and ports to the actual ones.)

public static void main(String[] args) throws Exception {

// Set the topology
Config conf = new Config();

// Configure the security plug-in.
setSecurityPlugin(conf);

if (args.length >= 2) {
// If the default keytab file name has been changed, configure the new keytab file name. 
conf.put(Config.TOPOLOGY_KEYTAB_FILE, args[1]);
}

// Define KafkaSpout
KafkaSpout kafkaSpout = new KafkaSpout<String, String>(
getKafkaSpoutConfig(getKafkaSpoutStreams()));

// CountBolt
CountBolt countBolt = new CountBolt();
//SplitBolt
SplitSentenceBolt splitBolt = new SplitSentenceBolt();

// KafkaBolt configuration information 
KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>();
kafkaBolt.withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
.withTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("word", "count"));
kafkaBolt.withProducerProperties(getKafkaProducerProps());

// Define the topology. 
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", kafkaSpout, 10);
builder.setBolt("split-bolt", splitBolt,10).shuffleGrouping("kafka-spout", STREAMS[0]);
builder.setBolt("count-bolt", countBolt, 10).fieldsGrouping(
"split-bolt", new Fields("word"));
builder.setBolt("kafka-bolt", kafkaBolt, 10).shuffleGrouping("count-bolt");

// Submit the topology. 
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}

If the cluster domain name is modified, set the Kerberos domain name in the Kafka consumer/producer attribute to the actual domain name of the cluster, for example, props.put(KERBEROS_DOMAIN_NAME, "hadoop.hadoop1.com").

Running the Application and Viewing Results

  1. Export the local JAR package. For details, see Packaging IntelliJ IDEA Code.
  2. Obtain the related configuration files. The methods are described as follows:

    • Security mode: See 4 to obtain the keytab file.
    • Normal mode: None

  3. Obtain the following JAR packages.

    Go to the Kafka/kafka/libs directory on the installed Kafka client, and obtain the following JAR packages:

    • kafka_<version>.jar
    • scala-library-<version>.jar
    • scala-logging_2.11-3.7.2.jar
    • metrics-core-<version>.jar
    • kafka-clients-<version>.jar
    • zookeeper-<version>.jar

    Obtain the following JAR file from the streaming-cql-<HD-Version>/lib directory on the Storm client:

    • storm-kafka-client-<version>.jar
    • storm-kafka-<version>.jar
    • slf4j-api-<version>.jar
    • guava-<version>.jar
    • json-simple-<version>.jar
    • curator-client-<version>.jar
    • curator-framework-<version>.jar
    • curator-recipes-<version>.jar

  4. Combine the JAR packages obtained in the preceding steps and export a complete service JAR package. For details, see Packaging Services.
  5. Go to the directory where the Kafka client locates in the Linux system, use the Kafka client to create the topic used by the topology in the Kafka/kafka/bin directory. Run the following command:

    ./kafka-topics.sh --create --topic input --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    ./kafka-topics.sh --create --topic output --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    • The variable appended to --zookeeper specifies the ZooKeeper address. You must set the ZooKeeper address to the ZooKeeper address configured during cluster installation.
    • Safe mode requires Kafka administrator user to create Topic.

  6. Submit the topology on a Linux OS.

    The submission command example is as follows (the topology name is kafka-test):

    storm jar /opt/jartarget/source.jar com.huawei.storm.example.kafka.SimpleKafkaTopology kafka-test

    • Before submitting source.jar in security mode, ensure that the Kerberos security login is implemented, and the login user and the user of the uploaded keytab file are the same user in keytab mode.
    • In security mode, Kafka requires the user to have access permission on related topics. Therefore, you need to log in to the server from the cluster where Kafka is located as the Kafka administrator, and then run the kafka-acls.sh command to assign permission to the user. After successful permission assignment, you need to log in to the server as the user who submits the task and submit the topology. For details about Kafka user permission assignment, see Kafka Development Guide > More Information.

  7. After the topology is successfully submitted, send data to Kafka and check whether related information is generated.

    Go to the directory where the Kafka client locates in the Linux system, start the consumer in the Kafka/kafka/bin directory, and check whether data is generated. Run the following command:

    ./kafka-console-consumer.sh --bootstrap-server {ip:port} --topic output --consumer.config ../config/consumer.properties

    Go to the directory whether the Kafka client locates in the Linux system, start the producer in the Kafka/kafka/bin directory, and write data into Kafka. Run the following command:

    ./kafka-console-producer.sh --broker-list {ip:port} --topic input --producer.config ../config/producer.properties

    Write test data into input, and check whether related data is generated in output. If yes, the Storm-Kafka topology is executed successfully.