Uploading Data
Sample Code
package com.huaweicloud.dis.demo.adapter; import com.huaweicloud.dis.DISConfig; import com.huaweicloud.dis.adapter.kafka.clients.producer.*; import com.huaweicloud.dis.adapter.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; public class DISKafkaProducerDemo { private static final Logger LOGGER = LoggerFactory.getLogger(DISKafkaProducerDemo.class); public static void main(String[] args) { // There will be security risks if the AK/SK used for authentication is directly written into code. Encrypt the AK/SK in the configuration file or environment variables for storage; // In this example, the AK and SK stored in the environment variables are used for identity authentication. Before running this example, configure environment variables HUAWEICLOUD_SDK_AK and HUAWEICLOUD_SDK_SK in the local environment. String ak = System.getenv("HUAWEICLOUD_SDK_AK"); String sk = System.getenv("HUAWEICLOUD_SDK_SK"); // YOU ProjectId String projectId = "YOU_PROJECT_ID"; // YOU DIS Stream String streamName = "YOU_STREAM_NAME"; // Consumption group ID, which is used to record the offset. String groupId = "YOU_GROUP_ID"; // DIS region String region = "your region"; Properties props = new Properties(); props.setProperty(DISConfig.PROPERTY_AK, ak); props.setProperty(DISConfig.PROPERTY_SK, sk); props.setProperty(DISConfig.PROPERTY_PROJECT_ID, projectId); props.setProperty(DISConfig.PROPERTY_REGION_ID, region); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //By default, the domain name is automatically used for access instead of configuring an endpoint. If an endpoint is required, remove the following comments and set the endpoint: // props.setProperty(DISConfig.PROPERTY_ENDPOINT, "https://dis-${region}.myhuaweicloud.com"); // Create dis producer Producer<String, String> producer = new DISKafkaProducer<>(props); //Send data synchronously. synchronousSendDemo(producer, streamName); //Send data asynchronously. asynchronousSendDemo(producer, streamName); // Disable the producer to prevent resource leakage. producer.close(); } public static void synchronousSendDemo(Producer<String, String> producer, String streamName) { LOGGER.info("===== synchronous send ====="); for (int i = 0; i < 5; i++) { //If the key is set to Random or Null, data is evenly distributed to all partitions. String key = String.valueOf(ThreadLocalRandom.current().nextInt(1000000)); String value = "Hello world[sync]. " + i; Future<RecordMetadata> future = producer.send(new ProducerRecord<>(streamName, key, value)); try { //Calling future.get will block waiting until sending is complete. RecordMetadata recordMetadata = future.get(); //Data is successfully sent. LOGGER.info("Success to send [{}], Partition [{}], Offset [{}].", value, recordMetadata.partition(), recordMetadata.offset()); } catch (Exception e) { //Data failed to be sent. LOGGER.error("Failed to send [{}], Error [{}]", value, e.getMessage(), e); } } } public static void asynchronousSendDemo(Producer<String, String> producer, String streamName) { LOGGER.info("===== asynchronous send ====="); int totalSendCount = 5; CountDownLatch countDownLatch = new CountDownLatch(totalSendCount); for (int i = 0; i < totalSendCount; i++) { //If the key is set to Random or Null, data is evenly distributed to all partitions. String key = String.valueOf(ThreadLocalRandom.current().nextInt(1000000)); String value = "Hello world[async]. " + i; try { //Data is sent in callback mode and is not blocked. producer.send(new ProducerRecord<>(streamName, key, value), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { countDownLatch.countDown(); if (e == null) { //Data is successfully sent. LOGGER.info("Success to send [{}], Partition [{}], Offset [{}].", value, recordMetadata.partition(), recordMetadata.offset()); } else { //Data failed to be sent. LOGGER.error("Failed to send [{}], Error [{}]", value, e.getMessage(), e); } } }); } catch (Exception e) { countDownLatch.countDown(); LOGGER.error(e.getMessage(), e); } } try { // Wait until all data is sent. countDownLatch.await(); } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } } }
After running the preceding program, if the data is successfully sent, the following log is generated:
09:32:52.001 INFO c.h.d.d.a.DISKafkaProducerDemo - ===== synchronous send ===== 09:32:53.523 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 0], Partition [0], Offset [114]. 09:32:53.706 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 1], Partition [0], Offset [115]. 09:32:53.956 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 2], Partition [0], Offset [116]. 09:32:54.160 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 3], Partition [0], Offset [117]. 09:32:54.450 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 4], Partition [0], Offset [118]. 09:32:54.450 INFO c.h.d.d.a.DISKafkaProducerDemo - ===== asynchronous send ===== 09:32:54.673 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 0], Partition [0], Offset [119]. 09:32:54.674 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 1], Partition [0], Offset [120]. 09:32:54.674 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 2], Partition [0], Offset [121]. 09:32:54.674 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 3], Partition [0], Offset [122]. 09:32:54.674 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 4], Partition [0], Offset [123].
Adaptation to the Native KafkaProducer API
DISKafkaProducer is implemented in a different way from KafkaProducer. The DISKafkaProducer client and server are implemented through the REST API, whereas KafkaProducer is implemented based on TCP. Their API compatibility differences are as follows:
Native KafkaProducer |
Type |
DISKafkaProducer |
Description |
---|---|---|---|
Future<RecordMetadata> send(ProducerRecord<K, V> record) |
API |
Supported |
Send a single data record. |
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) |
API |
Supported |
Send a single data record and set the callback processing function. |
void close() |
API |
Supported |
Disable Producer. |
void close(long timeout, TimeUnit timeUnit) |
API |
Supported |
Disable Producer and set the timeout period. |
List<PartitionInfo> partitionsFor(String topic) |
API |
Supported |
Obtain the partition information of the stream. |
void flush(long timeout, TimeUnit unit) |
API |
Not supported |
Forcibly send the current cached data. |
Map<MetricName, ? extends Metric> metrics() |
API |
Not supported |
Obtain statistics. |
key.serializer |
Parameter |
Supported |
The meaning of this parameter is the same as that in Kafka. The default value is StringSerializer. In Kafka, this parameter has no default value, and you must configure a value for it. |
value.serializer |
Parameter |
Supported |
The meaning of this parameter is the same as that in Kafka. The default value is StringSerializer. In Kafka, this parameter has no default value, and you must configure a value for it. |
linger.ms |
Parameter |
Supported |
The meaning of this parameter is the same as that in Kafka. The default value is 50. In Kafka, the default value is 0. This parameter is configured for improving the upload efficiency of the REST API. |
batch.size |
Parameter |
Supported |
The meaning of this parameter is the same as that in Kafka. The default value is 1 MB. In Kafka, the default value is 16 KB. This parameter is configured for matching the flow control. |
buffer.memory |
Parameter |
Supported |
Same as the default setting in Kafka, which is 32 MB. |
max.in.flight.requests.per.connection |
Parameter |
Supported |
Limit the maximum number of not-responded requests that can be sent by a client on a single connection. The default value is 100. In Kafka, the default value is 5. This parameter is configured for improving the sending performance. However, the data sequence may be inconsistent. You are advised to set this parameter to 1 to ensure data sequence. |
block.on.buffer.full |
Parameter |
Supported |
Same as the default setting in Kafka, which is false.
|
max.block.ms |
Parameter |
Supported |
Same as the default setting in Kafka, which is 60000. When the sending buffer is full and block.on.buffer.full is false, control the block time (ms) of send (). |
retries |
Parameter |
Supported, but the parameter name is changed to exception.retries. |
The default value in Kafka is 0, and the default value in DIS is 8. Number of retry attempts that are made when the network or server is abnormal. |
Others |
Parameter |
Not supported |
- |
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot