Updated on 2024-03-05 GMT+08:00

Uploading Data

Sample Code

For details about how to obtain the AK, SK, and project ID, see Checking Authentication Information.
package com.huaweicloud.dis.demo.adapter;
import com.huaweicloud.dis.DISConfig;
import com.huaweicloud.dis.adapter.kafka.clients.producer.*;
import com.huaweicloud.dis.adapter.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
 
public class DISKafkaProducerDemo
{
    private static final Logger LOGGER = LoggerFactory.getLogger(DISKafkaProducerDemo.class);

 
    public static void main(String[] args)
    {
       
        // There will be security risks if the AK/SK used for authentication is directly written into code. Encrypt the AK/SK in the configuration file or environment variables for storage;
       // In this example, the AK and SK stored in the environment variables are used for identity authentication. Before running this example, configure environment variables HUAWEICLOUD_SDK_AK and HUAWEICLOUD_SDK_SK in the local environment.
       String ak = System.getenv("HUAWEICLOUD_SDK_AK");
       String sk = System.getenv("HUAWEICLOUD_SDK_SK");
       // YOU ProjectId 
       String projectId = "YOU_PROJECT_ID";
      // YOU DIS Stream
       String streamName = "YOU_STREAM_NAME";
      // Consumption group ID, which is used to record the offset.
      String groupId = "YOU_GROUP_ID";
     // DIS region
      String region = "your region";

















 
        




        Properties props = new Properties();
        props.setProperty(DISConfig.PROPERTY_AK, ak);
        props.setProperty(DISConfig.PROPERTY_SK, sk);
        props.setProperty(DISConfig.PROPERTY_PROJECT_ID, projectId);
        props.setProperty(DISConfig.PROPERTY_REGION_ID, region);
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
//By default, the domain name is automatically used for access instead of configuring an endpoint. If an endpoint is required, remove the following comments and set the endpoint:
        // props.setProperty(DISConfig.PROPERTY_ENDPOINT, "https://dis-${region}.myhuaweicloud.com");
 
        // Create dis producer
        Producer<String, String> producer = new DISKafkaProducer<>(props);
 
                    //Send data synchronously.
        synchronousSendDemo(producer, streamName);
 
                //Send data asynchronously.
        asynchronousSendDemo(producer, streamName);
 
// Disable the producer to prevent resource leakage.
        producer.close();
    }
 
    public static void synchronousSendDemo(Producer<String, String> producer, String streamName)
    {
        LOGGER.info("===== synchronous send =====");
        for (int i = 0; i < 5; i++)
        {
 //If the key is set to Random or Null, data is evenly distributed to all partitions.
            String key = String.valueOf(ThreadLocalRandom.current().nextInt(1000000));
            String value = "Hello world[sync]. " + i;
 
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>(streamName, key, value));
 
            try
            {
 //Calling future.get will block waiting until sending is complete.
                RecordMetadata recordMetadata = future.get();
//Data is successfully sent.
                LOGGER.info("Success to send [{}], Partition [{}], Offset [{}].",
                        value, recordMetadata.partition(), recordMetadata.offset());
            }
            catch (Exception e)
            {
//Data failed to be sent.
                LOGGER.error("Failed to send [{}], Error [{}]", value, e.getMessage(), e);
            }
        }
    }
 
    public static void asynchronousSendDemo(Producer<String, String> producer, String streamName)
    {
        LOGGER.info("===== asynchronous send =====");
        int totalSendCount = 5;
        CountDownLatch countDownLatch = new CountDownLatch(totalSendCount);
        for (int i = 0; i < totalSendCount; i++)
        {
 //If the key is set to Random or Null, data is evenly distributed to all partitions.
            String key = String.valueOf(ThreadLocalRandom.current().nextInt(1000000));
            String value = "Hello world[async]. " + i;
 
            try
            {
//Data is sent in callback mode and is not blocked.
                producer.send(new ProducerRecord<>(streamName, key, value), new Callback()
                {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e)
                    {
                        countDownLatch.countDown();
                        if (e == null)
                        {
//Data is successfully sent.
                            LOGGER.info("Success to send [{}], Partition [{}], Offset [{}].",
                                    value, recordMetadata.partition(), recordMetadata.offset());
                        }
                        else
                        {
//Data failed to be sent.
                            LOGGER.error("Failed to send [{}], Error [{}]", value, e.getMessage(), e);
                        }
                    }
                });
            }
            catch (Exception e)
            {
                countDownLatch.countDown();
                LOGGER.error(e.getMessage(), e);
            }
        }
 
        try
        {
// Wait until all data is sent.
            countDownLatch.await();
        }
        catch (InterruptedException e)
        {
            LOGGER.error(e.getMessage(), e);
        }
    }
}

After running the preceding program, if the data is successfully sent, the following log is generated:

09:32:52.001 INFO  c.h.d.d.a.DISKafkaProducerDemo - ===== synchronous send =====
09:32:53.523 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 0], Partition [0], Offset [114].
09:32:53.706 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 1], Partition [0], Offset [115].
09:32:53.956 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 2], Partition [0], Offset [116].
09:32:54.160 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 3], Partition [0], Offset [117].
09:32:54.450 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 4], Partition [0], Offset [118].
09:32:54.450 INFO  c.h.d.d.a.DISKafkaProducerDemo - ===== asynchronous send =====
09:32:54.673 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 0], Partition [0], Offset [119].
09:32:54.674 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 1], Partition [0], Offset [120].
09:32:54.674 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 2], Partition [0], Offset [121].
09:32:54.674 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 3], Partition [0], Offset [122].
09:32:54.674 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 4], Partition [0], Offset [123].

Adaptation to the Native KafkaProducer API

DISKafkaProducer is implemented in a different way from KafkaProducer. The DISKafkaProducer client and server are implemented through the REST API, whereas KafkaProducer is implemented based on TCP. Their API compatibility differences are as follows:

Table 1 Adaptation description

Native KafkaProducer

Type

DISKafkaProducer

Description

Future<RecordMetadata> send(ProducerRecord<K, V> record)

API

Supported

Send a single data record.

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

API

Supported

Send a single data record and set the callback processing function.

void close()

API

Supported

Disable Producer.

void close(long timeout, TimeUnit timeUnit)

API

Supported

Disable Producer and set the timeout period.

List<PartitionInfo> partitionsFor(String topic)

API

Supported

Obtain the partition information of the stream.

void flush(long timeout, TimeUnit unit)

API

Not supported

Forcibly send the current cached data.

Map<MetricName, ? extends Metric> metrics()

API

Not supported

Obtain statistics.

key.serializer

Parameter

Supported

The meaning of this parameter is the same as that in Kafka. The default value is StringSerializer. In Kafka, this parameter has no default value, and you must configure a value for it.

value.serializer

Parameter

Supported

The meaning of this parameter is the same as that in Kafka. The default value is StringSerializer. In Kafka, this parameter has no default value, and you must configure a value for it.

linger.ms

Parameter

Supported

The meaning of this parameter is the same as that in Kafka. The default value is 50. In Kafka, the default value is 0. This parameter is configured for improving the upload efficiency of the REST API.

batch.size

Parameter

Supported

The meaning of this parameter is the same as that in Kafka. The default value is 1 MB. In Kafka, the default value is 16 KB. This parameter is configured for matching the flow control.

buffer.memory

Parameter

Supported

Same as the default setting in Kafka, which is 32 MB.

max.in.flight.requests.per.connection

Parameter

Supported

Limit the maximum number of not-responded requests that can be sent by a client on a single connection. The default value is 100. In Kafka, the default value is 5. This parameter is configured for improving the sending performance. However, the data sequence may be inconsistent. You are advised to set this parameter to 1 to ensure data sequence.

block.on.buffer.full

Parameter

Supported

Same as the default setting in Kafka, which is false.

  • true: When the sending buffer is full, sending is blocked and does not time out.
  • false: After the sending buffer is full, data is blocked based on max.block.ms. If the time is exceeded, an exception is issued.

max.block.ms

Parameter

Supported

Same as the default setting in Kafka, which is 60000.

When the sending buffer is full and block.on.buffer.full is false, control the block time (ms) of send ().

retries

Parameter

Supported, but the parameter name is changed to exception.retries.

The default value in Kafka is 0, and the default value in DIS is 8.

Number of retry attempts that are made when the network or server is abnormal.

Others

Parameter

Not supported

-