El contenido no se encuentra disponible en el idioma seleccionado. Estamos trabajando continuamente para agregar más idiomas. Gracias por su apoyo.

Compute
Elastic Cloud Server
Huawei Cloud Flexus
Bare Metal Server
Auto Scaling
Image Management Service
Dedicated Host
FunctionGraph
Cloud Phone Host
Huawei Cloud EulerOS
Networking
Virtual Private Cloud
Elastic IP
Elastic Load Balance
NAT Gateway
Direct Connect
Virtual Private Network
VPC Endpoint
Cloud Connect
Enterprise Router
Enterprise Switch
Global Accelerator
Management & Governance
Cloud Eye
Identity and Access Management
Cloud Trace Service
Resource Formation Service
Tag Management Service
Log Tank Service
Config
OneAccess
Resource Access Manager
Simple Message Notification
Application Performance Management
Application Operations Management
Organizations
Optimization Advisor
IAM Identity Center
Cloud Operations Center
Resource Governance Center
Migration
Server Migration Service
Object Storage Migration Service
Cloud Data Migration
Migration Center
Cloud Ecosystem
KooGallery
Partner Center
User Support
My Account
Billing Center
Cost Center
Resource Center
Enterprise Management
Service Tickets
HUAWEI CLOUD (International) FAQs
ICP Filing
Support Plans
My Credentials
Customer Operation Capabilities
Partner Support Plans
Professional Services
Analytics
MapReduce Service
Data Lake Insight
CloudTable Service
Cloud Search Service
Data Lake Visualization
Data Ingestion Service
GaussDB(DWS)
DataArts Studio
Data Lake Factory
DataArts Lake Formation
IoT
IoT Device Access
Others
Product Pricing Details
System Permissions
Console Quick Start
Common FAQs
Instructions for Associating with a HUAWEI CLOUD Partner
Message Center
Security & Compliance
Security Technologies and Applications
Web Application Firewall
Host Security Service
Cloud Firewall
SecMaster
Anti-DDoS Service
Data Encryption Workshop
Database Security Service
Cloud Bastion Host
Data Security Center
Cloud Certificate Manager
Edge Security
Managed Threat Detection
Blockchain
Blockchain Service
Web3 Node Engine Service
Media Services
Media Processing Center
Video On Demand
Live
SparkRTC
MetaStudio
Storage
Object Storage Service
Elastic Volume Service
Cloud Backup and Recovery
Storage Disaster Recovery Service
Scalable File Service Turbo
Scalable File Service
Volume Backup Service
Cloud Server Backup Service
Data Express Service
Dedicated Distributed Storage Service
Containers
Cloud Container Engine
SoftWare Repository for Container
Application Service Mesh
Ubiquitous Cloud Native Service
Cloud Container Instance
Databases
Relational Database Service
Document Database Service
Data Admin Service
Data Replication Service
GeminiDB
GaussDB
Distributed Database Middleware
Database and Application Migration UGO
TaurusDB
Middleware
Distributed Cache Service
API Gateway
Distributed Message Service for Kafka
Distributed Message Service for RabbitMQ
Distributed Message Service for RocketMQ
Cloud Service Engine
Multi-Site High Availability Service
EventGrid
Dedicated Cloud
Dedicated Computing Cluster
Business Applications
Workspace
ROMA Connect
Message & SMS
Domain Name Service
Edge Data Center Management
Meeting
AI
Face Recognition Service
Graph Engine Service
Content Moderation
Image Recognition
Optical Character Recognition
ModelArts
ImageSearch
Conversational Bot Service
Speech Interaction Service
Huawei HiLens
Video Intelligent Analysis Service
Developer Tools
SDK Developer Guide
API Request Signing Guide
Terraform
Koo Command Line Interface
Content Delivery & Edge Computing
Content Delivery Network
Intelligent EdgeFabric
CloudPond
Intelligent EdgeCloud
Solutions
SAP Cloud
High Performance Computing
Developer Services
ServiceStage
CodeArts
CodeArts PerfTest
CodeArts Req
CodeArts Pipeline
CodeArts Build
CodeArts Deploy
CodeArts Artifact
CodeArts TestPlan
CodeArts Check
CodeArts Repo
Cloud Application Engine
MacroVerse aPaaS
KooMessage
KooPhone
KooDrive

Storm-Kafka Development Guideline

Updated on 2024-08-12 GMT+08:00

Scenario

This topic describes how to use the Storm-Kafka toolkit to implement the interaction between Storm and Kafka. KafkaSpout and KafkaBolt are included. KafkaSpout enables Storm to read data from Kafka. KafkaBolt enables Storm to write data into Kafka.

The code example in this section is based on the new Kafka API and corresponds to com.huawei.storm.example.kafka.NewKafkaTopology.java in the IntelliJ IDEA project.

This topic applies only to the access between the Storm component and the Kafka component of MRS. Determine the versions of the jar packages described in this chapter based on the actual situation.

Procedure for Developing an Application

  1. Verify that the Storm and Kafka components have been installed and are running properly.
  2. Obtain the sample project folder storm-examples in the src\storm-examples directory where the sample code is decompressed. For details, see Obtaining the MRS Application Development Sample Project. Import storm-examples to the IntelliJ IDEA development environment. For details, see Environment Preparation.
  3. Install the Storm client in Linux OS.

    For details about how to use the client on a Master or Core node in the cluster, see Using an MRS Client on Nodes Inside a Cluster. For details about how to install the client outside the MRS cluster, see Using an MRS Client on Nodes Outside a Cluster.

  4. If security services are enabled in the cluster, obtain a human-machine user from the administrator for login to the FusionInsight Manager platform and authentication, and obtain the keytab file of the user.

    NOTE:
    • The obtained user must belong to the storm group and kafka group.
    • The default validity period of a user password is 90 days. Therefore, the validity period of the obtained keytab file is 90 days. To prolong the validity period of the keytab file, modify the user password policy and obtain the keytab file again.

  5. Download and install the Kafka client.

Code Sample

Create a topology. (Change the IP addresses and ports to the actual ones.)

public static void main(String[] args) throws Exception {

// Set the topology
Config conf = new Config();

// Configure the security plug-in.
setSecurityPlugin(conf);

if (args.length >= 2) {
// If the default keytab file name has been changed, configure the new keytab file name. 
conf.put(Config.TOPOLOGY_KEYTAB_FILE, args[1]);
}

// Define KafkaSpout
KafkaSpout kafkaSpout = new KafkaSpout<String, String>(
getKafkaSpoutConfig(getKafkaSpoutStreams()));

// CountBolt
CountBolt countBolt = new CountBolt();
//SplitBolt
SplitSentenceBolt splitBolt = new SplitSentenceBolt();

// KafkaBolt configuration information 
KafkaBolt<String, String> kafkaBolt = new KafkaBolt<String, String>();
kafkaBolt.withTopicSelector(new DefaultTopicSelector(OUTPUT_TOPIC))
.withTupleToKafkaMapper(
new FieldNameBasedTupleToKafkaMapper("word", "count"));
kafkaBolt.withProducerProperties(getKafkaProducerProps());

// Define the topology. 
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", kafkaSpout, 10);
builder.setBolt("split-bolt", splitBolt,10).shuffleGrouping("kafka-spout", STREAMS[0]);
builder.setBolt("count-bolt", countBolt, 10).fieldsGrouping(
"split-bolt", new Fields("word"));
builder.setBolt("kafka-bolt", kafkaBolt, 10).shuffleGrouping("count-bolt");

// Submit the topology. 
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
NOTE:

If the cluster domain name is modified, set the Kerberos domain name in the Kafka consumer/producer attribute to the actual domain name of the cluster, for example, props.put(KERBEROS_DOMAIN_NAME, "hadoop.hadoop1.com").

Running the Application and Viewing Results

  1. Export the local JAR package. For details, see Packaging IntelliJ IDEA Code.
  2. Obtain the related configuration files. The methods are described as follows:

    • Security mode: See 4 to obtain the keytab file.
    • Normal mode: None

  3. Obtain the following JAR packages.

    Go to the Kafka/kafka/libs directory on the installed Kafka client, and obtain the following JAR packages:

    • kafka_<version>.jar
    • scala-library-<version>.jar
    • scala-logging_2.11-3.7.2.jar
    • metrics-core-<version>.jar
    • kafka-clients-<version>.jar
    • zookeeper-<version>.jar

    Obtain the following JAR file from the streaming-cql-<HD-Version>/lib directory on the Storm client:

    • storm-kafka-client-<version>.jar
    • storm-kafka-<version>.jar
    • slf4j-api-<version>.jar
    • guava-<version>.jar
    • json-simple-<version>.jar
    • curator-client-<version>.jar
    • curator-framework-<version>.jar
    • curator-recipes-<version>.jar

  4. Combine the JAR packages obtained in the preceding steps and export a complete service JAR package. For details, see Packaging Services.
  5. Go to the directory where the Kafka client locates in the Linux system, use the Kafka client to create the topic used by the topology in the Kafka/kafka/bin directory. Run the following command:

    ./kafka-topics.sh --create --topic input --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    ./kafka-topics.sh --create --topic output --partitions 2 --replication-factor 2 --zookeeper {ip:port}/kafka

    NOTE:
    • The variable appended to --zookeeper specifies the ZooKeeper address. You must set the ZooKeeper address to the ZooKeeper address configured during cluster installation.
    • Safe mode requires Kafka administrator user to create Topic.

  6. Submit the topology on a Linux OS.

    The submission command example is as follows (the topology name is kafka-test):

    storm jar /opt/jartarget/source.jar com.huawei.storm.example.kafka.SimpleKafkaTopology kafka-test

    NOTE:
    • Before submitting source.jar in security mode, ensure that the Kerberos security login is implemented, and the login user and the user of the uploaded keytab file are the same user in keytab mode.
    • In security mode, Kafka requires the user to have access permission on related topics. Therefore, you need to log in to the server from the cluster where Kafka is located as the Kafka administrator, and then run the kafka-acls.sh command to assign permission to the user. After successful permission assignment, you need to log in to the server as the user who submits the task and submit the topology. For details about Kafka user permission assignment, see Kafka Development Guide > More Information.

  7. After the topology is successfully submitted, send data to Kafka and check whether related information is generated.

    Go to the directory where the Kafka client locates in the Linux system, start the consumer in the Kafka/kafka/bin directory, and check whether data is generated. Run the following command:

    ./kafka-console-consumer.sh --bootstrap-server {ip:port} --topic output --consumer.config ../config/consumer.properties

    Go to the directory whether the Kafka client locates in the Linux system, start the producer in the Kafka/kafka/bin directory, and write data into Kafka. Run the following command:

    ./kafka-console-producer.sh --broker-list {ip:port} --topic input --producer.config ../config/producer.properties

    Write test data into input, and check whether related data is generated in output. If yes, the Storm-Kafka topology is executed successfully.

Utilizamos cookies para mejorar nuestro sitio y tu experiencia. Al continuar navegando en nuestro sitio, tú aceptas nuestra política de cookies. Descubre más

Feedback

Feedback

Feedback

0/500

Selected Content

Submit selected content with the feedback