上传数据
代码样例
package com.huaweicloud.dis.demo.adapter;
import com.huaweicloud.dis.DISConfig;
import com.huaweicloud.dis.adapter.kafka.clients.producer.*;
import com.huaweicloud.dis.adapter.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
public class DISKafkaProducerDemo
{
private static final Logger LOGGER = LoggerFactory.getLogger(DISKafkaProducerDemo.class);
public static void main(String[] args)
{
// 认证用的ak和sk直接写到代码中有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全;
// 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK 。
String ak = System.getenv("HUAWEICLOUD_SDK_AK");
String sk = System.getenv("HUAWEICLOUD_SDK_SK");
// YOU ProjectId
String projectId = "YOU_PROJECT_ID";
// YOU DIS Stream
String streamName = "YOU_STREAM_NAME";
// 消费组ID,用于记录offset
String groupId = "YOU_GROUP_ID";
// DIS region
String region = "your region";
Properties props = new Properties();
props.setProperty(DISConfig.PROPERTY_AK, ak);
props.setProperty(DISConfig.PROPERTY_SK, sk);
props.setProperty(DISConfig.PROPERTY_PROJECT_ID, projectId);
props.setProperty(DISConfig.PROPERTY_REGION_ID, region);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 默认情况下不需要设置endpoint,会自动使用域名访问;如需使用指定的endpoint,解除如下注释并设置endpoint即可
// props.setProperty(DISConfig.PROPERTY_ENDPOINT, "https://dis-${region}.myhuaweicloud.com");
// Create dis producer
Producer<String, String> producer = new DISKafkaProducer<>(props);
// 同步发送
synchronousSendDemo(producer, streamName);
// 异步发送
asynchronousSendDemo(producer, streamName);
// 关闭producer,防止资源泄露
producer.close();
}
public static void synchronousSendDemo(Producer<String, String> producer, String streamName)
{
LOGGER.info("===== synchronous send =====");
for (int i = 0; i < 5; i++)
{
// key设置为随机(或者设置为null),数据会均匀分配到所有分区中
String key = String.valueOf(ThreadLocalRandom.current().nextInt(1000000));
String value = "Hello world[sync]. " + i;
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(streamName, key, value));
try
{
// 调用future.get会阻塞等待,直到发送完成
RecordMetadata recordMetadata = future.get();
// 发送成功
LOGGER.info("Success to send [{}], Partition [{}], Offset [{}].",
value, recordMetadata.partition(), recordMetadata.offset());
}
catch (Exception e)
{
// 发送失败
LOGGER.error("Failed to send [{}], Error [{}]", value, e.getMessage(), e);
}
}
}
public static void asynchronousSendDemo(Producer<String, String> producer, String streamName)
{
LOGGER.info("===== asynchronous send =====");
int totalSendCount = 5;
CountDownLatch countDownLatch = new CountDownLatch(totalSendCount);
for (int i = 0; i < totalSendCount; i++)
{
// key设置为随机(或者设置为null),数据会均匀分配到所有分区中
String key = String.valueOf(ThreadLocalRandom.current().nextInt(1000000));
String value = "Hello world[async]. " + i;
try
{
// 使用回调方式发送,不会阻塞
producer.send(new ProducerRecord<>(streamName, key, value), new Callback()
{
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e)
{
countDownLatch.countDown();
if (e == null)
{
// 发送成功
LOGGER.info("Success to send [{}], Partition [{}], Offset [{}].",
value, recordMetadata.partition(), recordMetadata.offset());
}
else
{
// 发送失败
LOGGER.error("Failed to send [{}], Error [{}]", value, e.getMessage(), e);
}
}
});
}
catch (Exception e)
{
countDownLatch.countDown();
LOGGER.error(e.getMessage(), e);
}
}
try
{
// 等待所有发送完成
countDownLatch.await();
}
catch (InterruptedException e)
{
LOGGER.error(e.getMessage(), e);
}
}
}
执行如上程序,发送数据成功会打印如下日志
09:32:52.001 INFO c.h.d.d.a.DISKafkaProducerDemo - ===== synchronous send ===== 09:32:53.523 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 0], Partition [0], Offset [114]. 09:32:53.706 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 1], Partition [0], Offset [115]. 09:32:53.956 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 2], Partition [0], Offset [116]. 09:32:54.160 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 3], Partition [0], Offset [117]. 09:32:54.450 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[sync]. 4], Partition [0], Offset [118]. 09:32:54.450 INFO c.h.d.d.a.DISKafkaProducerDemo - ===== asynchronous send ===== 09:32:54.673 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 0], Partition [0], Offset [119]. 09:32:54.674 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 1], Partition [0], Offset [120]. 09:32:54.674 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 2], Partition [0], Offset [121]. 09:32:54.674 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 3], Partition [0], Offset [122]. 09:32:54.674 INFO c.h.d.d.a.DISKafkaProducerDemo - Success to send [Hello world[async]. 4], Partition [0], Offset [123].
与原生KafkaProducer接口适配说明
DISKafkaProducer的实现与KafkaProducer的实现不同,DISKafkaProducer的客户端与服务端通过Rest API实现,而KafkaProducer是基于TCP协议实现,在接口兼容上有如下差异。
|
原生KafkaProducer |
类型 |
DISKafkaProducer |
说明 |
|---|---|---|---|
|
Future<RecordMetadata> send(ProducerRecord<K, V> record) |
接口 |
支持 |
发送单条数据 |
|
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) |
接口 |
支持 |
发送单条数据并设置回调处理函数 |
|
void close() |
接口 |
支持 |
关闭Producer |
|
void close(long timeout, TimeUnit timeUnit) |
接口 |
支持 |
关闭Producer并设置超时时间 |
|
List<PartitionInfo> partitionsFor(String topic) |
接口 |
支持 |
获取通道的分区信息 |
|
void flush(long timeout, TimeUnit unit) |
接口 |
不支持 |
强制发送当前缓存数据,后续支持 |
|
Map<MetricName, ? extends Metric> metrics() |
接口 |
不支持 |
获取统计信息 |
|
key.serializer |
参数 |
支持 |
含义与kafka设置相同,但默认值为StringSerializer (kafka必须配置) |
|
value.serializer |
参数 |
支持 |
含义与kafka设置相同,但默认值为StringSerializer (kafka必须配置) |
|
linger.ms |
参数 |
支持 |
含义与kafka设置相同,但默认值为50(kafka是0),目的是提高Rest接口的上传效率 |
|
batch.size |
参数 |
支持 |
含义与kafka设置相同,但默认值为1MB(kafka是16KB),目的是匹配流控的大小 |
|
buffer.memory |
参数 |
支持 |
同kafka的默认设置(32MB) |
|
max.in.flight.requests.per.connection |
参数 |
支持 |
限制客户端在单个连接上能够发送的未响应请求的个数,默认值为100(Kafka默认为5)可提高发送性能,但可能出现数据顺序不一致的问题。如需严格保证顺序,建议此值设置为1 |
|
block.on.buffer.full |
参数 |
支持 |
同Kafka默认设置(false)。
|
|
max.block.ms |
参数 |
支持 |
同Kafka默认设置(60000)。 当发送缓冲区满且block.on.buffer.full为false时,控制send()的阻塞时间(毫秒)。 |
|
retries |
参数 |
支持,但是参数名改为exception.retries |
Kafka默认设置为0,DIS默认设置为8。 出现网络/服务端异常的重试次数,尽量保证数据上传成功 |
|
其他参数 |
参数 |
不支持 |
- |