更新时间:2024-10-25 GMT+08:00

上传数据

代码样例

“ak”、“sk”和“projectId”信息的获取请参见检查认证信息
package com.huaweicloud.dis.demo.adapter;
import com.huaweicloud.dis.DISConfig;
import com.huaweicloud.dis.adapter.kafka.clients.producer.*;
import com.huaweicloud.dis.adapter.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
 
public class DISKafkaProducerDemo
{
    private static final Logger LOGGER = LoggerFactory.getLogger(DISKafkaProducerDemo.class);

 
    public static void main(String[] args)
    {
       
        // 认证用的ak和sk直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全;
       // 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK    。
       String ak = System.getenv("HUAWEICLOUD_SDK_AK");
       String sk = System.getenv("HUAWEICLOUD_SDK_SK");
       // YOU ProjectId 
       String projectId = "YOU_PROJECT_ID";
      // YOU DIS Stream
       String streamName = "YOU_STREAM_NAME";
      // 消费组ID,用于记录offset
      String groupId = "YOU_GROUP_ID";
     // DIS region
      String region = "your region";

        Properties props = new Properties();
        props.setProperty(DISConfig.PROPERTY_AK, ak);
        props.setProperty(DISConfig.PROPERTY_SK, sk);
        props.setProperty(DISConfig.PROPERTY_PROJECT_ID, projectId);
        props.setProperty(DISConfig.PROPERTY_REGION_ID, region);
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
        // 默认情况下不需要设置endpoint,会自动使用域名访问;如需使用指定的endpoint,解除如下注释并设置endpoint即可
        // props.setProperty(DISConfig.PROPERTY_ENDPOINT, "https://dis-${region}.myhuaweicloud.com");
 
        // Create dis producer
        Producer<String, String> producer = new DISKafkaProducer<>(props);
 
        // 同步发送
        synchronousSendDemo(producer, streamName);
 
        // 异步发送
        asynchronousSendDemo(producer, streamName);
 
        // 关闭producer,防止资源泄露
        producer.close();
    }
 
    public static void synchronousSendDemo(Producer<String, String> producer, String streamName)
    {
        LOGGER.info("===== synchronous send =====");
        for (int i = 0; i < 5; i++)
        {
            // key设置为随机(或者设置为null),数据会均匀分配到所有分区中
            String key = String.valueOf(ThreadLocalRandom.current().nextInt(1000000));
            String value = "Hello world[sync]. " + i;
 
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>(streamName, key, value));
 
            try
            {
                // 调用future.get会阻塞等待,直到发送完成
                RecordMetadata recordMetadata = future.get();
                // 发送成功
                LOGGER.info("Success to send [{}], Partition [{}], Offset [{}].",
                        value, recordMetadata.partition(), recordMetadata.offset());
            }
            catch (Exception e)
            {
                // 发送失败
                LOGGER.error("Failed to send [{}], Error [{}]", value, e.getMessage(), e);
            }
        }
    }
 
    public static void asynchronousSendDemo(Producer<String, String> producer, String streamName)
    {
        LOGGER.info("===== asynchronous send =====");
        int totalSendCount = 5;
        CountDownLatch countDownLatch = new CountDownLatch(totalSendCount);
        for (int i = 0; i < totalSendCount; i++)
        {
            // key设置为随机(或者设置为null),数据会均匀分配到所有分区中
            String key = String.valueOf(ThreadLocalRandom.current().nextInt(1000000));
            String value = "Hello world[async]. " + i;
 
            try
            {
                // 使用回调方式发送,不会阻塞
                producer.send(new ProducerRecord<>(streamName, key, value), new Callback()
                {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e)
                    {
                        countDownLatch.countDown();
                        if (e == null)
                        {
                            // 发送成功
                            LOGGER.info("Success to send [{}], Partition [{}], Offset [{}].",
                                    value, recordMetadata.partition(), recordMetadata.offset());
                        }
                        else
                        {
                            // 发送失败
                            LOGGER.error("Failed to send [{}], Error [{}]", value, e.getMessage(), e);
                        }
                    }
                });
            }
            catch (Exception e)
            {
                countDownLatch.countDown();
                LOGGER.error(e.getMessage(), e);
            }
        }
 
        try
        {
            // 等待所有发送完成
            countDownLatch.await();
        }
        catch (InterruptedException e)
        {
            LOGGER.error(e.getMessage(), e);
        }
    }
}

执行如上程序,发送数据成功会打印如下日志

09:32:52.001 INFO  c.h.d.d.a.DISKafkaProducerDemo - ===== synchronous send =====
09:32:53.523 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 0], Partition [0], Offset [114].
09:32:53.706 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 1], Partition [0], Offset [115].
09:32:53.956 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 2], Partition [0], Offset [116].
09:32:54.160 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 3], Partition [0], Offset [117].
09:32:54.450 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 4], Partition [0], Offset [118].
09:32:54.450 INFO  c.h.d.d.a.DISKafkaProducerDemo - ===== asynchronous send =====
09:32:54.673 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 0], Partition [0], Offset [119].
09:32:54.674 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 1], Partition [0], Offset [120].
09:32:54.674 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 2], Partition [0], Offset [121].
09:32:54.674 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 3], Partition [0], Offset [122].
09:32:54.674 INFO  c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 4], Partition [0], Offset [123].

与原生KafkaProducer接口适配说明

DISKafkaProducer的实现与KafkaProducer的实现不同,DISKafkaProducer的客户端与服务端通过Rest API实现,而KafkaProducer是基于TCP协议实现,在接口兼容上有如下差异。

表1 适配说明

原生KafkaProducer

类型

DISKafkaProducer

说明

Future<RecordMetadata> send(ProducerRecord<K, V> record)

接口

支持

发送单条数据

Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)

接口

支持

发送单条数据并设置回调处理函数

void close()

接口

支持

关闭Producer

void close(long timeout, TimeUnit timeUnit)

接口

支持

关闭Producer并设置超时时间

List<PartitionInfo> partitionsFor(String topic)

接口

支持

获取通道的分区信息

void flush(long timeout, TimeUnit unit)

接口

不支持

强制发送当前缓存数据,后续支持

Map<MetricName, ? extends Metric> metrics()

接口

不支持

获取统计信息

key.serializer

参数

支持

含义与kafka设置相同,但默认值为StringSerializer (kafka必须配置)

value.serializer

参数

支持

含义与kafka设置相同,但默认值为StringSerializer (kafka必须配置)

linger.ms

参数

支持

含义与kafka设置相同,但默认值为50(kafka是0),目的是提高Rest接口的上传效率

batch.size

参数

支持

含义与kafka设置相同,但默认值为1MB(kafka是16KB),目的是匹配流控的大小

buffer.memory

参数

支持

同kafka的默认设置(32MB)

max.in.flight.requests.per.connection

参数

支持

限制客户端在单个连接上能够发送的未响应请求的个数,默认值为100(Kafka默认为5)可提高发送性能,但可能出现数据顺序不一致的问题。如需严格保证顺序,建议此值设置为1

block.on.buffer.full

参数

支持

同Kafka默认设置(false)。

  • true表示当发送缓冲区满,send一直阻塞不超时;
  • false表示发送缓冲区满后根据max.block.ms的时间阻塞,超过时间则抛出异常。

max.block.ms

参数

支持

同Kafka默认设置(60000)。

当发送缓冲区满且block.on.buffer.full为false时,控制send()的阻塞时间(毫秒)。

retries

参数

支持,但是参数名改为exception.retries

Kafka默认设置为0,DIS默认设置为8。

出现网络/服务端异常的重试次数,尽量保证数据上传成功

其他参数

参数

不支持

-