Help Center/ Distributed Message Service for Kafka/ User Guide (Kuala Lumpur Region)/ Getting Started/ Getting Started with Kafka to Produce and Consume Messages
Updated on 2024-05-30 GMT+08:00

Getting Started with Kafka to Produce and Consume Messages

This section takes the example of creating and connecting to a Kafka 2.7 instance with SASL enabled to get you quickly started with Distributed Message Service (DMS) for Kafka.

You can also create a Kafka instance by calling an API and connect to the instance in your service code.

Process of Using DMS for Kafka

Figure 1 Procedure for using DMS for Kafka
  1. Prepare the environment.

    A Kafka instance runs in a Virtual Private Cloud (VPC). Before creating a Kafka instance, ensure that a VPC is available.

    After a Kafka instance is created, download and install the Kafka open-source client on your ECS before creating and retrieving messages.

  2. Create a Kafka instance.

    You can select the specification and quantity and enable SASL when creating a Kafka instance. Enabling SASL secures data transmission with encryption.

  3. (Optional) Create a topic.

    If automatic topic creation is not enabled during Kafka instance creation, you need to manually create topics for creating and retrieving messages.

  4. Connect to the instance.

    Before connecting to a Kafka instance with SASL enabled, download the certificate and configure the connection in the client configuration file.

For details about Kafka concepts, see Basic Concepts.

Step 1: Prepare the Environment

  1. To achieve fine-grained management of your cloud resources, create IAM user groups and users and grant specified permissions to the users. For details, see Creating a User and Granting DMS for Kafka Permissions.
  2. Before creating a Kafka instance, ensure that a VPC and a subnet are available.

    Configure the VPC and subnet for Kafka instances as required. You can use the current account's existing VPC and subnet, or create new ones. For details about how to create a VPC and a subnet, see Creating a VPC. Note that the VPC must be in the same region as the Kafka instance.

  3. Before creating a Kafka instance, ensure that a security group is available.

    Configure the security group for Kafka instances as required. You can use the current account's existing security groups, or create new ones. For details about how to create a security group, see Creating a Security Group.

    To connect to Kafka instances, add the security group rules described in Table 1. Other rules can be added based on site requirements.
    Table 1 Security group rules

    Direction

    Protocol

    Port

    Source address

    Description

    Inbound

    TCP

    9093

    0.0.0.0/0

    Access Kafka instance in the same VPC over private network (SSL enabled).

    After a security group is created, it has a default inbound rule that allows communication among ECSs within the security group and a default outbound rule that allows all outbound traffic. If you access your Kafka instance using the private network within a VPC, you do not need to add the rules described in Table 1.

  4. Before connecting to a Kafka instance, ensure that you have created an ECS, installed the JDK, configured environment variables, and downloaded an open-source Kafka client. The following steps describe how to complete these preparations. A Linux ECS is taken as an example. For more information on how to install JDK and configure the environment variables for a Windows ECS, please search the Internet.

    1. Log in to the console, click in the upper left corner, click Elastic Cloud Server under Computing, and then create an ECS.

      For details about how to create an ECS, see Creating an ECS. If you already have an available ECS, skip this step.

    2. Log in to the ECS.
    3. Install Java JDK or JRE and configure the environment variables JAVA_HOME and PATH.
      Add the following lines to the .bash_profile file in the home directory as an authorized user. In this command, /opt/java/jdk1.8.0_151 is the JDK installation path. Change it to the path where you install JDK or JRE.
      export JAVA_HOME=/opt/java/jdk1.8.0_151 
      export PATH=$JAVA_HOME/bin:$PATH

      Run the source .bash_profile command for the modification to take effect.

      Use Oracle JDK instead of ECS's default JDK (for example, OpenJDK), because ECS's default JDK may not be suitable. Obtain Oracle JDK 1.8.111 or later from Oracle's official website.

    4. Download an open-source Kafka client.
      wget https://archive.apache.org/dist/kafka/2.7.2/kafka_2.12-2.7.2.tgz
    5. Run the following command to decompress the package:
      tar -zxf kafka_2.12-2.7.2.tgz

Step 2: Create a Kafka Instance

  1. Log in to the Kafka console, then click Buy Instance in the upper right corner of the page.
  2. Set Billing Mode to Pay-per-use.
  3. Select a region closest to your application to reduce latency and accelerate access.
  4. Select a project from the Project drop-down list.
  5. Select one AZ or at least three AZs.
  6. Specify the instance name and the enterprise project.
  7. Specify the instance settings. For details, see Table 2.

    Table 2 Setting an instance

    Parameter

    Description

    Version

    Select 2.7.

    Fixed once the instance is created. Use the same version as your client.

    CPU Architecture

    Select x86.

    Broker Flavor

    Select kafka.2u4g.cluster.

    Brokers

    Enter 3

    Storage Space per Broker

    Select Ultra-high I/O and enter 100 GB.

    Total storage space = Storage space per broker × Broker quantity. After the instance is created, you cannot change the disk type.

    Capacity Threshold Policy

    Select Automatically delete.

  8. Configure the instance network. For details, see Table 3.

    Table 3 Configuring instance network

    Parameter

    Description

    VPC

    Select the created VPC and subnet.

    You cannot change the VPC and subnet after the instance is created.

    Security Group

    Select the created security group.

  9. Set the instance access mode. For details, see Table 4.

    Table 4 Setting instance access mode

    Parameter

    Description

    Kafka SASL_SSL

    SASL_SSL supported

    This setting is fixed once the instance is created.

    SASL PLAIN

    SASL PLAIN supported

    If SASL/PLAIN is enabled, both the SCRAM-SHA-512 and PLAIN mechanisms are supported. You can select either of them to connect Kafka instances on the client.

    Username

    Username for the client to connect to Kafka instances

    Password

    Password for the client to connect to Kafka instances

  10. Configure the username and password for logging in to Kafka Manager. The Kafka Manager username cannot be changed once an instance is created.

    Kafka Manager is an open-source tool for managing Kafka clusters. After a Kafka instance is created, you can go to the instance details page to obtain the address for logging in to Kafka Manager. In Kafka Manager, you can view the monitoring and broker information of your Kafka clusters.

  11. Click Advanced Settings. For more information, see Table 5.

    Table 5 Advanced settings

    Parameter

    Description

    Automatic Topic Creation

    Do not enable it.

    Tags

    Skip it.

    Description

    Skip it.

  12. Click Buy.
  13. Confirm the instance settings.
  14. Return to the DMS for Kafka page and check whether the instance has been created.

    It takes 3 to 15 minutes to create an instance. During this period, the instance status is Creating.

    • If the instance is created successfully, its status changes to Running.
    • If the instance is in the Creation failed state, delete it. Then create a new one. If the instance creation fails again, contact customer service.

      Instances that fail to be created do not occupy other resources.

(Optional) Step 3: Create a Topic

  1. On the DMS for Kafka page, click a Kafka instance.
  2. In the navigation pane, choose Topics.
  3. Click Create Topic.
  4. Enter the topic name, specify other parameters, and click OK.

    Table 6 Topic parameters

    Parameter

    Description

    Topic Name

    Enter "topic-01".

    Cannot be changed once the topic is created.

    Partitions

    Set it to 3.

    The more partitions, the higher the consumption concurrency.

    Replicas

    Set it to 3.

    Kafka automatically backs up data on each replica. If one broker is faulty, data will still be available. Reliability increases with the number of replicas of a topic.

    NOTE:

    If an instance node is faulty, an internal service error may be reported when you query messages in a topic with only one replica. Therefore, you are not advised using a topic with only one replica.

    Aging Time (h)

    Set it to 72.

    How long messages will be preserved in the topic. Messages older than this period cannot be consumed. They will be deleted, and can no longer be consumed.

    Synchronous Replication

    Do not enable it.

    Synchronous Flushing

    Do not enable it.

    Message Timestamp

    Select CreateTime.

    Max. Message Size (bytes)

    Retain the default value.

Step 4: Connect to a Kafka Instance to Produce and Consume Messages

  1. Obtain the instance connection address.

    1. In the navigation pane, click Basic Information.
    2. In the Connection area, view the connection address.
      Figure 2 Kafka instance addresses (private network) for intra-VPC access

  2. Prepare the file for production and consumption configuration.

    1. Log in to a Linux ECS.
    2. Map hosts to IP addresses in the /etc/hosts file on the ECS, so that the client can quickly parse the instance brokers.

      Set IP addresses to the instance connection addresses obtained in 1. Set hosts to the names of instance hosts. Specify a unique name for each host.

      For example:

      10.154.48.120 server01

      10.154.48.121 server02

      10.154.48.122 server03

    3. Download client.truststore.jks certificate.

      On the Kafka console, click the Kafka instance to go to the Basic Information page. Click Download next to SSL Certificate in the Connection area. Decompress the package to obtain the client certificate file client.truststore.jks.

    4. Add the following commands in both the consumer.properties and producer.properties files (PLAIN is used as an example).
      sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
      username="**********" \
      password="**********";        
      sasl.mechanism=PLAIN
      
      security.protocol=SASL_SSL
      ssl.truststore.location={ssl_truststore_path}
      ssl.truststore.password=dms@kafka
      ssl.endpoint.identification.algorithm=

      Description:

      • username and password are specified when enabling SASL_SSL during instance creation.
      • ssl.truststore.location is the path for storing the certificate obtained in 2.c.
      • ssl.truststore.password is certified by the server, which must be set to dms@kafka and cannot be changed.
      • ssl.endpoint.identification.algorithm decides whether to verify the certificate domain name. Here, this parameter must be left blank, which indicates disabling domain name verification.

  3. Produce messages.

    Go to the /bin directory of the Kafka client file and run the following command:

    ./kafka-console-producer.sh --broker-list ${connection addr} --topic ${topic name} --producer.config ../config/producer.properties

    Description:

    • {connection addr}: the address obtained in 1.
    • {topic name}: the topic name obtained in 4.

    For example, 192.xxx.xxx.xxx:9093, 192.xxx.xxx.xxx:9093, 192.xxx.xxx.xxx:9093 are the connection addresses of the Kafka instance.

    After running the preceding command, you can send a message to the Kafka instance by entering the information as prompted and pressing Enter. Contents in each line are sent as a message.

    [root@ecs-kafka bin]#./kafka-console-producer.sh --broker-list 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093  --topic topic-demo --producer.config ../config/producer.properties
    >Hello
    >DMS
    >Kafka!
    >^C[root@ecs-kafka bin]# 

    Press Ctrl+C to cancel.

  4. Consume messages.

    Run the following command:

    ./kafka-console-consumer.sh --bootstrap-server ${connection addr} --topic ${topic name} --group ${consumer group name} --from-beginning  --consumer.config ../config/consumer.properties

    Description:

    • {connection addr}: the address obtained in 1.
    • {topic name}: the topic name obtained in 4.
    • {consumer group name}: the consumer group name set as required. If a consumer group name has been specified in the configuration file, ensure that you use the same name in the command line. Otherwise, consumption may fail. If a consumer group name starts with a special character, such as an underscore (_) or a number sign (#), the monitoring data cannot be displayed.

    Sample:

    [root@ecs-kafka bin]#  ./kafka-console-consumer.sh --bootstrap-server 192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093,192.xxx.xxx.xxx:9093 --topic topic-demo --group order-test --from-beginning --consumer.config ../config/consumer.properties
    Hello
    Kafka!
    DMS
    ^CProcessed a total of 3 messages
    [root@ecs-kafka bin]# 

    Press Ctrl+C to cancel.