Help Center/ Distributed Message Service for Kafka/ Developer Guide/ Java/ Setting Up the Java Development Environment
Updated on 2024-05-15 GMT+08:00

Setting Up the Java Development Environment

With the information collected in Collecting Connection Information and the network environment prepared for Kafka clients, you can proceed to configuring Kafka clients. This section describes how to configure Kafka clients to produce and consume messages.

Preparing Tools

Procedure

  1. Download the demo package.

    Decompress the package to obtain the following files.

    Table 1 Files in the demo package

    File

    Directory

    Description

    DmsConsumer.java

    .\src\main\java\com\dms\consumer

    API for consuming messages

    DmsProducer.java

    .\src\main\java\com\dms\producer

    API for producing messages

    dms.sdk.consumer.properties

    .\src\main\resources

    Configuration information for consuming messages

    dms.sdk.producer.properties

    .\src\main\resources

    Configuration information for producing messages

    client.truststore.jks

    .\src\main\resources

    SSL certificate, used for SASL connection

    DmsConsumerTest.java

    .\src\test\java\com\dms\consumer

    Test code of consuming messages

    DmsProducerTest.java

    .\src\test\java\com\dms\producer

    Test code of producing messages

    pom.xml

    .\

    Maven configuration file, containing the Kafka client dependencies

  2. In IntelliJ IDEA, import the demo project.

    The demo project is a Java project built in Maven. Therefore, you need the JDK and the Maven plugin in IDEA.
    Figure 1 Click Import Project.
    Figure 2 Choose Maven.
    Figure 3 Select the JDK.

    You can select other options or retain the default settings. Click Finish.

    The demo project has been imported.

  3. Configure Maven.

    Choose Files > Settings, set Maven home directory correctly, and select the required settings.xml file.

  4. Specify Kafka configurations.

    The following is a configuration example for producing messages. Replace the information in bold with the actual values.

    #The information in bold is specific to different Kafka instances and must be modified. Other parameters can also be added.
    #The topic name is in the specific production and consumption code.
    #######################
    #Information about Kafka brokers. ip:port are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #Producer acknowledgement
    acks=all
    #Method of turning the key into bytes
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #Method of turning the value into bytes
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #Memory available to the producer for buffering
    buffer.memory=33554432
    #Number of retries
    retries=0
    #######################
    #Comment out the following parameters if SASL access is not enabled.
    #######################
    # Set the SASL authentication mechanism, username, and password.
    # sasl.mechanism indicates the SASL authentication mechanism. username and password indicate the username and password of SASL_SSL. Obtain them by referring to "Collecting Connection Information."
    # If the SASL mechanism is PLAIN, the configuration is as follows:
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    # If the SASL mechanism is SCRAM-SHA-512, the configuration is as follows:
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
        username="username" \
        password="password";
    
    # Set security.protocol.
    # If the security protocol is SASL_SSL, the configuration is as follows:
    security.protocol=SASL_SSL
    # ssl truststore.location is the path for storing the SSL certificate. The following code uses the path format in Windows as an example. Change the path format based on the actual running environment.
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    # ssl truststore.password is the password of the server certificate. This password is used for accessing the JKS file generated by Java.
    ssl.truststore.password=dms@kafka
    # ssl.endpoint.identification.algorithm indicates whether to verify the certificate domain name. This parameter must be left blank, which indicates disabling domain name verification.
    ssl.endpoint.identification.algorithm=
    
    

  5. In the down left corner of IDEA, click Terminal. In terminal, run the mvn test command to see how the demo project goes.

    Figure 4 Opening terminal in IDEA

    The following information is displayed for the producer:

    -------------------------------------------------------
     T E S T S
    -------------------------------------------------------
    Running com.dms.producer.DmsProducerTest
    produce msg:The msg is 0
    produce msg:The msg is 1
    produce msg:The msg is 2
    produce msg:The msg is 3
    produce msg:The msg is 4
    produce msg:The msg is 5
    produce msg:The msg is 6
    produce msg:The msg is 7
    produce msg:The msg is 8
    produce msg:The msg is 9
    Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 138.877 sec

    The following information is displayed for the consumer:

    -------------------------------------------------------
     T E S T S
    -------------------------------------------------------
    Running com.dms.consumer.DmsConsumerTest
    the numbers of topic:0
    the numbers of topic:0
    the numbers of topic:6
    ConsumerRecord(topic = topic-0, partition = 2, offset = 0, CreateTime = 1557059377179, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = The msg is 2)
    ConsumerRecord(topic = topic-0, partition = 2, offset = 1, CreateTime = 1557059377195, serialized key size = -1, serialized value size = 12, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = The msg is 5)