上传数据
代码样例
package com.huaweicloud.dis.demo.adapter; import com.huaweicloud.dis.DISConfig; import com.huaweicloud.dis.adapter.kafka.clients.producer.*; import com.huaweicloud.dis.adapter.kafka.common.serialization.StringSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; public class DISKafkaProducerDemo { private static final Logger LOGGER = LoggerFactory.getLogger(DISKafkaProducerDemo.class); public static void main(String[] args) { // 认证用的ak和sk直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全; // 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK 。 String ak = System.getenv("HUAWEICLOUD_SDK_AK"); String sk = System.getenv("HUAWEICLOUD_SDK_SK"); // YOU ProjectId String projectId = "YOU_PROJECT_ID"; // YOU DIS Stream String streamName = "YOU_STREAM_NAME"; // 消费组ID,用于记录offset String groupId = "YOU_GROUP_ID"; // DIS region String region = "your region"; Properties props = new Properties(); props.setProperty(DISConfig.PROPERTY_AK, ak); props.setProperty(DISConfig.PROPERTY_SK, sk); props.setProperty(DISConfig.PROPERTY_PROJECT_ID, projectId); props.setProperty(DISConfig.PROPERTY_REGION_ID, region); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 默认情况下不需要设置endpoint,会自动使用域名访问;如需使用指定的endpoint,解除如下注释并设置endpoint即可 // props.setProperty(DISConfig.PROPERTY_ENDPOINT, "https://dis-${region}.myhuaweicloud.com"); // Create dis producer Producer<String, String> producer = new DISKafkaProducer<>(props); // 同步发送 synchronousSendDemo(producer, streamName); // 异步发送 asynchronousSendDemo(producer, streamName); // 关闭producer,防止资源泄露 producer.close(); } public static void synchronousSendDemo(Producer<String, String> producer, String streamName) { LOGGER.info("===== synchronous send ====="); for (int i = 0; i < 5; i++) { // key设置为随机(或者设置为null),数据会均匀分配到所有分区中 String key = String.valueOf(ThreadLocalRandom.current().nextInt(1000000)); String value = "Hello world[sync]. " + i; Future<RecordMetadata> future = producer.send(new ProducerRecord<>(streamName, key, value)); try { // 调用future.get会阻塞等待,直到发送完成 RecordMetadata recordMetadata = future.get(); // 发送成功 LOGGER.info("Success to send [{}], Partition [{}], Offset [{}].", value, recordMetadata.partition(), recordMetadata.offset()); } catch (Exception e) { // 发送失败 LOGGER.error("Failed to send [{}], Error [{}]", value, e.getMessage(), e); } } } public static void asynchronousSendDemo(Producer<String, String> producer, String streamName) { LOGGER.info("===== asynchronous send ====="); int totalSendCount = 5; CountDownLatch countDownLatch = new CountDownLatch(totalSendCount); for (int i = 0; i < totalSendCount; i++) { // key设置为随机(或者设置为null),数据会均匀分配到所有分区中 String key = String.valueOf(ThreadLocalRandom.current().nextInt(1000000)); String value = "Hello world[async]. " + i; try { // 使用回调方式发送,不会阻塞 producer.send(new ProducerRecord<>(streamName, key, value), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { countDownLatch.countDown(); if (e == null) { // 发送成功 LOGGER.info("Success to send [{}], Partition [{}], Offset [{}].", value, recordMetadata.partition(), recordMetadata.offset()); } else { // 发送失败 LOGGER.error("Failed to send [{}], Error [{}]", value, e.getMessage(), e); } } }); } catch (Exception e) { countDownLatch.countDown(); LOGGER.error(e.getMessage(), e); } } try { // 等待所有发送完成 countDownLatch.await(); } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } } }
执行如上程序,发送数据成功会打印如下日志
09:32:52.001 INFO c.h.d.d.a.DISKafkaProducerDemo - ===== synchronous send ===== 09:32:53.523 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 0], Partition [0], Offset [114]. 09:32:53.706 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 1], Partition [0], Offset [115]. 09:32:53.956 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 2], Partition [0], Offset [116]. 09:32:54.160 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 3], Partition [0], Offset [117]. 09:32:54.450 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 4], Partition [0], Offset [118]. 09:32:54.450 INFO c.h.d.d.a.DISKafkaProducerDemo - ===== asynchronous send ===== 09:32:54.673 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 0], Partition [0], Offset [119]. 09:32:54.674 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 1], Partition [0], Offset [120]. 09:32:54.674 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 2], Partition [0], Offset [121]. 09:32:54.674 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 3], Partition [0], Offset [122]. 09:32:54.674 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 4], Partition [0], Offset [123].
与原生KafkaProducer接口适配说明
DISKafkaProducer的实现与KafkaProducer的实现不同,DISKafkaProducer的客户端与服务端通过Rest API实现,而KafkaProducer是基于TCP协议实现,在接口兼容上有如下差异。
原生KafkaProducer |
类型 |
DISKafkaProducer |
说明 |
---|---|---|---|
Future<RecordMetadata> send(ProducerRecord<K, V> record) |
接口 |
支持 |
发送单条数据 |
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) |
接口 |
支持 |
发送单条数据并设置回调处理函数 |
void close() |
接口 |
支持 |
关闭Producer |
void close(long timeout, TimeUnit timeUnit) |
接口 |
支持 |
关闭Producer并设置超时时间 |
List<PartitionInfo> partitionsFor(String topic) |
接口 |
支持 |
获取通道的分区信息 |
void flush(long timeout, TimeUnit unit) |
接口 |
不支持 |
强制发送当前缓存数据,后续支持 |
Map<MetricName, ? extends Metric> metrics() |
接口 |
不支持 |
获取统计信息 |
key.serializer |
参数 |
支持 |
含义与kafka设置相同,但默认值为StringSerializer (kafka必须配置) |
value.serializer |
参数 |
支持 |
含义与kafka设置相同,但默认值为StringSerializer (kafka必须配置) |
linger.ms |
参数 |
支持 |
含义与kafka设置相同,但默认值为50(kafka是0),目的是提高Rest接口的上传效率 |
batch.size |
参数 |
支持 |
含义与kafka设置相同,但默认值为1MB(kafka是16KB),目的是匹配流控的大小 |
buffer.memory |
参数 |
支持 |
同kafka的默认设置(32MB) |
max.in.flight.requests.per.connection |
参数 |
支持 |
限制客户端在单个连接上能够发送的未响应请求的个数,默认值为100(Kafka默认为5)可提高发送性能,但可能出现数据顺序不一致的问题。如需严格保证顺序,建议此值设置为1 |
block.on.buffer.full |
参数 |
支持 |
同Kafka默认设置(false)。
|
max.block.ms |
参数 |
支持 |
同Kafka默认设置(60000)。 当发送缓冲区满且block.on.buffer.full为false时,控制send()的阻塞时间(毫秒)。 |
retries |
参数 |
支持,但是参数名改为exception.retries |
Kafka默认设置为0,DIS默认设置为8。 出现网络/服务端异常的重试次数,尽量保证数据上传成功 |
其他参数 |
参数 |
不支持 |
- |