云日志服务Java SDK
云日志服务SDK提供了Java语言上报日志功能。
传输协议
HTTPS
使用前提
- 参考注册华为账号并开通华为云中操作,完成注册。
- 确认云日志服务的区域,请用户根据所在区域,选择RegionName。
- 获取华为账号的AK/SK。
- 获取华为云账号的项目ID(project id),详细步骤请参见API凭证。
- 获取需要上报到LTS的日志组ID和日志流ID。
- 已安装Java开发环境。云日志服务Java SDK支持JDK1.8及以上,您可以执行java –version命令检查您已安装的Java版本。如未安装,可以从Java官方网站下载安装包进行安装。
- 使用Java SDK进行跨云/本地上报日志(此功能仅适用于开发调测,对于上报性能不做保障):仅支持华北-北京四、华东-上海一、华南-广州、西南-贵阳一局点。
- 通过Java SDK上报日志到LTS的时间相距当前时间不超过2天,否则上报日志会被LTS删除。
日志上报方式
支持以下上报方式:标准日志、结构化日志(新版)、结构化日志。推荐您使用结构化日志(新版)上报,更加灵活可变,性能更好。
- 标准日志:
- 批量上报日志,一次可以上报多条日志。
- 字段log表示原始日志,即一条日志是一个字符串。
- 字段log_time_ns表示此条日志上报时间,单位ns纳秒,便于在LTS页面查看日志时按照时间排序。
- 可以将日志按照不同的labels标签进行分类上报。
上报一批日志结构体如下:[{ "contents": [{ "log": "log content1", "log_time_ns": 1737527157333902200 }, { "log": "log content2", "log_time_ns": 1737527157333914100 }], "labels": "{\"lts-test-count\":\"2\"}" }, { "contents": [{ "log": "log content3", "log_time_ns": 1737527157333986200 }, { "log": "log content4", "log_time_ns": 1737527157333987400 }], "labels": "{\"lts-test-count\":\"2\"}" }]
- 推荐使用结构化日志(新版),最低SDK版本为1.1.3。
- 支持批量日志上报、单条日志上报。
- 字段mContents表示一条日志。
- 字段mKey表示当前这条日志中的一个key。
- 字段mValue表示当前这条日志中的一个key对应的value值。
- 字段mLogTime表示此条日志上报时间,单位ms毫秒。
上报一批日志结构体如下:
[{ "mContents": [{ "mKey": "content_key_1", "mValue": "content_value_1" }, { "mKey": "content_key_2", "mValue": "content_value_2" }, { "mKey": "content", "mValue": "sdk-new-struct-log1" }], "mLogTime": 1744159440780 }, { "mContents": [{ "mKey": "content_key_1", "mValue": "content_value_1" }, { "mKey": "content_key_2", "mValue": "content_value_2" }, { "mKey": "content", // 选填,云日志服务内部字段,此字段代表原始日志 "mValue": "sdk-new-struct-log2" }], "mLogTime": 1744159440780 }]
- 结构化日志:
- 批量上报日志,一次可以上报多条日志。
- 字段contents表示某几条日志。单条日志使用K-V结构,即一条日志是一个JSON体。content为LTS保留字段,用来表示原始日志,可以不上报。
- 字段time表示某几条日志上报时间,单位ms毫秒,LTS会将这几条日志在ms毫秒的基础上拓展为ns纳秒保存。这样会丢失这几条日志的先后顺序,可能会影响到您在LTS页面查看日志的先后顺序。
- 字段labels表示这批日志公共的标签。
- 字段path表示这批日志的路径。
- 字段source表示这批日志的来源。
上报一批日志结构体如下:
{ "labels": { "label": "label" }, "logs": [{ "contents": [{ "k1": "v1", "k2": "v2", "content": "log content1" }, { "k3": "v3", "k4": "v4", "content": "log content2" }], "time": 1721784021037 }, { "contents": [{ "k5": "v5", "k6": "v6", "content": "log content3" }], "time": 1721784021038 }, ], "path": "path", "source": "source" }
步骤一:安装SDK
在Maven项目中加入依赖项。
- 使用Maven构建项目时,settings.xml文件的mirrors节点需要增加华为外部开源仓。如下:
<mirror> <id>huaweicloud</id> <mirrorOf>*</mirrorOf> <url>https://repo.huaweicloud.com/repository/maven/</url> </mirror>
- 您可以在华为外部开源仓中获取日志服务Java SDK依赖的最新版本。
- 在Maven工程中使用云日志服务Java SDK,只需在pom.xml中加入相应依赖即可,Maven项目管理工具会自动下载相关JAR包。在<dependencies>中加入如下内容:
推荐使用最新版本version:1.1.3,第2步中查看SDK版本。
<dependency> <groupId>io.github.huaweicloud</groupId> <artifactId>lts-sdk-common</artifactId> <version>version</version> </dependency> <dependency> <groupId>io.github.huaweicloud</groupId> <artifactId>lts-sdk-java</artifactId> <version>version</version> </dependency>
步骤二:创建Producer
由于Producer会消耗用户的ECS资源,请将Producer变为公共资源(单例)。发送日志时,仅需要调用producer.send()方法即可。appender支持配置永久AK、SK。
创建Producer的示例参考如下,修改示例代码时参考表1。
import com.huaweicloud.lts.appender.JavaSDKAppender; import com.huaweicloud.lts.producer.Producer; public class ProducerUtil { private static final Producer producer; static { /* 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险, 建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全; 本示例以ak和sk保存在环境变量中为例, 运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK. */ String ak = System.getenv("HUAWEICLOUD_SDK_AK"); String sk = System.getenv("HUAWEICLOUD_SDK_SK"); // 构建appender JavaSDKAppender appender = JavaSDKAppender.custom() // 华为云账号的项目ID(project id) .setProjectId("xxx") // 华为云账号的AK .setAccessKeyId(ak) // 华为云账号的SK .setAccessKeySecret(sk) // 华为云STS-Token, 若使用临时STS-Token, 则AK、SK需要填写与STS-Token配套的临时AK、SK // .setSecurityToken("") // 云日志服务的区域 .setRegionName("xxx") // 单个Appender能缓存的日志大小上限 .setTotalSizeInBytes(104857600) // producer发送日志时阻塞时间 .setMaxBlockMs(0L) // 执行日志发送任务的线程池大小 .setIoThreadCount(8) // producer发送单批日志量上限 .setBatchSizeThresholdInBytes(524288) // producer发送单批日志条数上限 .setBatchCountThreshold(4096) // producer发送单批日志等待时间 .setLingerMs(2000) // producer发送日志失败后重试次数 .setRetries(5) // 首次重试的退避时间 .setBaseRetryBackoffMs(500L) // 重试的最大退避时间 .setMaxRetryBackoffMs(500L) // 默认false, true: 可以跨云上报日志, false: 仅能在华为云ecs主机上报日志 // .setEnableLocalTest(false) .builder(); // 获取发送日志的producer producer = appender.getProducer(); } public static Producer getProducer() { return producer; } }
// 如果使用临时AK、SK、STS-Token,需要定期(在过期前)创建新的ProjectConfig,然后调用Producer.putProjectConfig()方法进行更新。 ProjectConfig projectConfig = new ProjectConfig("projectId", "regionName", "accessKeyId", "accessKeySecret", "securityToken"); producer.putProjectConfig(projectConfig);
参数名称 |
描述 |
类型 |
是否需要填写 |
默认值 |
---|---|---|---|---|
projectId |
华为云账号的项目ID(project id)。 |
String |
必填 |
- |
accessKeyId |
华为云账号的AK。认证用的AK硬编码到代码中或者明文存储都有很大的安全风险,建议密文存放,使用时解密,确保安全。 |
String |
必填 |
- |
accessKeySecret |
华为云账号的SK。认证用的SK硬编码到代码中或者明文存储都有很大的安全风险,建议密文存放,使用时解密,确保安全。 |
String |
必填 |
- |
securityToken |
用户可以通过华为云IAM服务来获取临时访问密钥(临时AK/SK)和securitytoken。如果要使用临时访问密钥,则临时AK、SK、STS-Token为一套访问密钥,必须配合使用。 |
String |
选填 |
- |
regionName |
云日志服务的区域。 |
String |
必填 |
- |
totalSizeInBytes |
单个producer实例能缓存的日志大小上限。 |
int |
选填 |
104857600(即100MB) |
maxBlockMs |
如果producer可用空间不足,调用者在send方法上的最大阻塞时间,单位为毫秒。默认为60000毫秒,建议为0毫秒。
|
long |
选填 |
60000毫秒 |
ioThreadCount |
执行日志发送任务的线程池大小。 |
int |
选填 |
客户机器空闲CPU数量,但一定大于1 |
batchSizeThresholdInBytes |
当一个ProducerBatch中缓存的日志大小大于等于batchSizeThresholdInBytes时,该ProducerBatch缓存的日志将被发送到LTS。 |
int |
选填 |
524288(即0.5MB) |
batchCountThreshold |
当一个ProducerBatch中缓存的日志条数大于等于batchCountThreshold时,该ProducerBatch缓存的日志将被发送到LTS。 |
int |
选填 |
4096 |
lingerMs |
一个ProducerBatch从创建到可发送的逗留时间。 |
int |
选填 |
2s |
retries |
如果某个ProducerBatch首次发送失败,能够对其重试的次数,建议为5次。如果retries小于等于0,该ProducerBatch首次发送失败后将直接进入失败队列。 |
int |
选填 |
10 |
baseRetryBackoffMs |
首次重试的退避时间。 |
long |
选填 |
0.1s |
maxRetryBackoffMs |
重试的最大退避时间。 |
long |
选填 |
50s |
enableLocalTest |
是否开启跨云上报日志。(此功能仅适用于开发调测,对于上报性能不做保障)
|
boolean |
选填 |
false |
proxyIp |
使用自定义ip端口上报日志。 |
String |
选填 |
- |
步骤三:构建发送日志结构体
- 标准日志和结构化日志使用的日志结构体:
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject; import com.huaweicloud.lts.producer.model.log.LogContent; import com.huaweicloud.lts.producer.model.log.LogItem; import com.huaweicloud.lts.producer.model.log.StructLog; import com.huaweicloud.lts.producer.model.log.StructLogItem; public class LogUtil { private static final Logger LOGGER = LoggerFactory.getLogger(ProducerUtil.class); public static void doSomething() throws InterruptedException { LOGGER.info("Before doSomething"); Thread.sleep(30000); LOGGER.info("After doSomething"); } // 标准日志模型 public static List<LogItem> getLogItems(int size) { List<LogItem> logItemList = new ArrayList<>(); logItemList.add(getLogItem(size)); return logItemList; } private static LogItem getLogItem(int count) { LogItem logItem = new LogItem(); // 客户可以自行定义日志标签, 如果不需要请传入一个空Map Map<String, String> labels = new HashMap<>(); labels.put("labels-k", "labels-v"); logItem.setLabels(JSONObject.toJSONString(labels)); List<LogContent> contents = new ArrayList<>(); for (int i = 1; i <= count; i++) { LogContent logContent = new LogContent(); // 日志产生时间, 当前时间ns(必须为ns纳秒) long logTimeNs = System.currentTimeMillis() * 1000000L + System.nanoTime() % 1000000L; logContent.setLogTimeNs(logTimeNs); // 日志内容 logContent.setLog("java-sdk-log-" + i); contents.add(logContent); } logItem.setContents(contents); return logItem; } // 结构化日志模型 public static StructLogItem getStructLog(int size) { // 构建日志结构体 StructLogItem structLogItem = new StructLogItem(); // 设置此批日志公共标签 structLogItem.setLabels(getPostLabels()); // 添加日志List structLogItem.setLogs(getLogs(size)); // 设置此批日志公共路径 structLogItem.setPath("test-path"); // 设置此批日志公共来源 structLogItem.setSource("test-source"); return structLogItem; } private static JSONObject getPostLabels() { JSONObject jsonObject = new JSONObject(); jsonObject.put("common-k", "common-v"); return jsonObject; } private static List<StructLog> getLogs(int size) { List<StructLog> logs = new ArrayList<>(); for (int i = 1; i <= size; i++) { logs.add(getLog(size)); } return logs; } private static StructLog getLog(int size) { StructLog structLog = new StructLog(); // 日志产生时间, 当前时间ms值(必须为ms毫秒) long time = System.currentTimeMillis(); structLog.setTime(time); List<JSONObject> contents = new ArrayList<>(); for (int i = 1; i <= size; i++) { // 创建一条日志 JSONObject jsonLog = new JSONObject(); jsonLog.put("log-k", "log-v"); // LTS保留字段"content", 指原始日志。 jsonLog.put("content", "java-struct-log-" + i); contents.add(jsonLog); } structLog.setContents(contents); return structLog; } }
- 结构化日志(新版)的结构体如下,SDK版本1.1.3及以上才支持此功能。
// 批量日志 public static List<LtsStructLog> getNewBatchStructLog(int size) { List<LtsStructLog> batchStructLog = new ArrayList<>(); for (int i = 1; i <= size; i++) { // 创建一条日志 LtsStructLog ltsStructLog = new LtsStructLog(); ltsStructLog.pushBack("content_key_1", "content_value_1"); ltsStructLog.pushBack("content_key_2", "content_value_2"); // LTS保留字段content, 指原始日志。 ltsStructLog.pushBack("content", "sdk_new_struct_log"); // 日志产生时间, 当前时间ms值(必须为ms毫秒) ltsStructLog.setLogTime(System.currentTimeMillis()); batchStructLog.add(ltsStructLog); } return batchStructLog; } // 单条日志 public static LtsStructLog getNewStructLog() { // 创建一条日志 LtsStructLog ltsStructLog = new LtsStructLog(); ltsStructLog.pushBack("content_key_1", "content_value_1"); ltsStructLog.pushBack("content_key_2", "content_value_2"); // LTS保留字段content, 指原始日志。 ltsStructLog.pushBack("content", "sdk_new_struct_log"); // 日志产生时间, 当前时间ms值(必须为ms毫秒) ltsStructLog.setLogTime(System.currentTimeMillis()); return ltsStructLog; }
步骤四:发送日志
由于Producer是异步发送日志,会有本地缓存,所以单个Producer只能选择标准日志或者结构化日志的一种上报,不能同时上报两种日志格式。
- 异步发送日志,无需返回值进行额外操作。
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.huaweicloud.lts.producer.Producer; import com.huaweicloud.lts.producer.exception.ProducerException; public class SamplePerformance { private static final Logger LOGGER = LoggerFactory.getLogger(SamplePerformance.class); public static void main(String[] args) throws InterruptedException { // 获取日志发送的producer Producer producer = ProducerUtil.getProducer(); int sendThreadCount = Math.max(Runtime.getRuntime().availableProcessors(), 1); ExecutorService executorService = Executors.newFixedThreadPool(sendThreadCount); final CountDownLatch latch = new CountDownLatch(sendThreadCount); LOGGER.info("Test started."); long t1 = System.currentTimeMillis(); int n = 100; for (int i = 1; i <= sendThreadCount; ++i) { executorService.submit(new Runnable() { @Override public void run() { try { for (int i = 0; i < n; ++i) { // 上报标准日志 producer.send("log_group_id", "log_stream_id", LogUtil.getLogItems(1)); // 使用日志组名称,日志流名称发送日志。SDK版本最低必须为1.1.0支持此功能。 // producer.send("log_group_name", "log_stream_name", true, logItemList); // 注意:方法中第3个参数,请填true,代表使用名称发送日志。如果是false,则是ID发送日志。 // 上报结构化日志 // producer.send("log_group_id", "log_stream_id", LogUtil.getStructLog(1)); // 使用日志组名称,日志流名称发送日志。SDK版本最低必须为1.1.0支持此功能。 // producer.send("log_group_name", "log_stream_name", true, structLog); // 注意:方法中第3个参数,请填true,代表使用名称发送日志。如果是false,则是ID发送日志。 // 上报新版结构化日志 // 备注:上报参数中的"log_group" 默认表示日志组Id、"log_stream" 默认表示日志流Id。 // 如果需要日志组名称和日志流名称上报日志,只需要在初始化appender时,设置参数:setLogGroupStreamByName(true) // 上报单条日志 // producer.sendStructLog("log_group", "log_stream", LogUtil.getNewStructLog()); // 上报一批日志 // producer.sendStructLog("log_group", "log_stream", LogUtil.getNewBatchStructLog(1)); } } catch (Exception e) { LOGGER.error("Failed to send log, e=", e); } finally { latch.countDown(); } } }); } latch.await(); Thread.sleep(30000); long t2 = System.currentTimeMillis(); LOGGER.info("Test end."); LOGGER.info("======Summary======"); LOGGER.info("Total count " + sendThreadCount * n + "."); long timeCost = t2 - t1; LOGGER.info("Time cost " + timeCost + " millis"); try { producer.close(); } catch (ProducerException e) { LOGGER.error("Failed to close producer, e=", e); } executorService.shutdown(); } }
- 通过Future来执行额外操作。
Send 方法会返回一个ListenableFuture,它除了可以像普通future那样通过调用get方法阻塞获得发送结果外,还允许您注册回调方法(回调方法会在完成 future 设置后被调用)。以下代码片段展示了ListenableFuture的使用方法,用户需要为该future注册一个FutureCallback并将其投递到应用提供的线程池EXECUTOR_SERVICE中执行。
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.huaweicloud.lts.producer.Producer; import com.huaweicloud.lts.producer.Result; import com.huaweicloud.lts.producer.exception.ProducerException; import com.huaweicloud.lts.producer.exception.ResultFailedException; import com.huaweicloud.lts.producer.model.log.LogItem; import com.huaweicloud.lts.producer.model.log.StructLogItem; public class SampleProducerWithFuture { private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithFuture.class); private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool( Math.max(Runtime.getRuntime().availableProcessors(), 1)); public static void main(String[] args) throws InterruptedException { Producer producer = ProducerUtil.getProducer(); int n = 100; int size = 20; final AtomicLong completed = new AtomicLong(0); for (int i = 0; i < n; ++i) { try { // 上报标准日志 List<LogItem> logItems = LogUtil.getLogItems(size); ListenableFuture<Result> f = producer.send("log_group_id", "log_stream_id", logItems); Futures.addCallback(f, new SampleFutureCallback("log_group_id", "log_stream_id", logItems, completed), EXECUTOR_SERVICE); // 上报结构化日志 // StructLogItem structLog = LogUtil.getStructLog(size); // ListenableFuture<Result> f = producer.send("log_group_id", "log_stream_id", structLog); // Futures.addCallback(f, new SampleFutureCallback("log_group_id", "log_stream_id", structLog, completed), // EXECUTOR_SERVICE); // 上报新版结构化日志 // List<LtsStructLog> newBatchStructLog = LogUtil.getNewBatchStructLog(1); // ListenableFuture<Result> f = producer.sendStructLog("log_group", "log_stream", newBatchStructLog); // Futures.addCallback(f, new SampleFutureCallback("log_group", "log_stream", completed, newBatchStructLog), // EXECUTOR_SERVICE); } catch (Exception e) { LOGGER.error("Failed to send logs, e=", e); } } LogUtil.doSomething(); try { producer.close(); } catch (ProducerException e) { LOGGER.info("Failed to close producer, e=", e); } EXECUTOR_SERVICE.shutdown(); while (!EXECUTOR_SERVICE.isTerminated()) { EXECUTOR_SERVICE.awaitTermination(100, TimeUnit.MILLISECONDS); } LOGGER.info("All log complete, completed={}", completed.get()); } private static final class SampleFutureCallback implements FutureCallback<Result> { private static final Logger LOGGER = LoggerFactory.getLogger(SampleFutureCallback.class); private final String logGroupId; private final String logStreamId; private List<LogItem> logItems = null; private StructLogItem structLog = null; private List<LtsStructLog> newBatchStructLog = null; private final AtomicLong completed; SampleFutureCallback(String logGroupId, String logStreamId, List<LogItem> logItems, AtomicLong completed) { this.logGroupId = logGroupId; this.logStreamId = logStreamId; this.logItems = logItems; this.completed = completed; } SampleFutureCallback(String logGroupId, String logStreamId, StructLogItem structLog, AtomicLong completed) { this.logGroupId = logGroupId; this.logStreamId = logStreamId; this.structLog = structLog; this.completed = completed; } SampleFutureCallback(String logGroupId, String logStreamId, AtomicLong completed, List<LtsStructLog> newBatchStructLog) { this.logGroupId = logGroupId; this.logStreamId = logStreamId; this.newBatchStructLog = newBatchStructLog; this.completed = completed; } @Override public void onSuccess(Result result) { LOGGER.info("Send logs successfully."); completed.getAndIncrement(); } @Override public void onFailure(Throwable t) { if (t instanceof ResultFailedException) { Result result = ((ResultFailedException) t).getResult(); LOGGER.error("Failed to send logs, logGroupId={}, logStreamId={}, result={}", logGroupId, logStreamId, result); } else { LOGGER.error("Failed to send log, e=", t); } completed.getAndIncrement(); } } }
- 通过Callback来执行额外操作。
Callback由producer内部线程负责执行,并且只有在执行完毕后数据“占用”的空间才会释放。为了不阻塞producer造成整体吞吐量的下降,要避免在callback里执行耗时的操作。不建议在callback中调用send方法进行重试。SDK自带重试功能。
import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.huaweicloud.lts.producer.Callback; import com.huaweicloud.lts.producer.Producer; import com.huaweicloud.lts.producer.Result; import com.huaweicloud.lts.producer.exception.ProducerException; import com.huaweicloud.lts.producer.model.log.LogItem; import com.huaweicloud.lts.producer.model.log.StructLogItem; public class SampleProducerWithCallback { private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class); private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool( Math.max(Runtime.getRuntime().availableProcessors(), 1)); public static void main(String[] args) throws InterruptedException { Producer producer = ProducerUtil.getProducer(); int nTask = 100; int size = 20; final AtomicLong completed = new AtomicLong(0); final CountDownLatch latch = new CountDownLatch(nTask); for (int i = 0; i < nTask; ++i) { EXECUTOR_SERVICE.submit(() -> { try { // 上报标准日志 List<LogItem> logItems = LogUtil.getLogItems(size); producer.send("log_group_id", "log_stream_id", logItems, new SampleCallback("log_group_id", "log_stream_id", logItems, completed)); // 上报结构化日志 // StructLogItem structLog = LogUtil.getStructLog(size); // producer.send("log_group_id", "log_stream_id", structLog, // new SampleCallback("log_group_id", "log_stream_id", structLog, completed)); // 上报新版结构化日志 // List<LtsStructLog> newBatchStructLog = LogUtil.getNewBatchStructLog(1); // producer.sendStructLog("log_group", "log_stream", newBatchStructLog, // new SampleCallback("log_group", "log_stream", completed, newBatchStructLog)); } catch (Exception e) { LOGGER.error("Failed to send log, e=", e); } finally { latch.countDown(); } }); } latch.await(); EXECUTOR_SERVICE.shutdown(); LogUtil.doSomething(); try { producer.close(); } catch (InterruptedException e) { LOGGER.warn("The current thread has been interrupted from close."); } catch (ProducerException e) { LOGGER.info("Failed to close producer, e=", e); } LOGGER.info("All log complete, completed={}", completed.get()); } private static final class SampleCallback implements Callback { private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class); private final String logGroupId; private final String logStreamId; private List<LogItem> logItems = null; private StructLogItem structLog = null; private List<LtsStructLog> newBatchStructLog = null; private final AtomicLong completed; SampleCallback(String logGroupId, String logStreamId, List<LogItem> logItems, AtomicLong completed) { this.logGroupId = logGroupId; this.logStreamId = logStreamId; this.logItems = logItems; this.completed = completed; } SampleCallback(String logGroupId, String logStreamId, StructLogItem structLog, AtomicLong completed) { this.logGroupId = logGroupId; this.logStreamId = logStreamId; this.structLog = structLog; this.completed = completed; } SampleCallback(String logGroupId, String logStreamId, AtomicLong completed, List<LtsStructLog> newBatchStructLog) { this.logGroupId = logGroupId; this.logStreamId = logStreamId; this.completed = completed; this.newBatchStructLog = newBatchStructLog; } @Override public void onCompletion(Result result) { try { if (result.isSuccessful()) { LOGGER.info("Send log successfully."); } else { LOGGER.error("Failed to send logs, logGroupId={}, logStreamId={}, result={}", logGroupId, logStreamId, result); } } finally { completed.getAndIncrement(); } } } }
步骤五:关闭Producer
当您已经没有数据需要发送或者当前进程准备退出时,需要关闭Producer,目的是让Producer中缓存的数据全部被处理。目前,Producer提供安全关闭和有限关闭两种模式。
- 安全关闭(推荐):建议您使用安全关闭。安全关闭对应的方法是close(),SDK会等到Producer中缓存的数据全部被处理、线程全部停止、注册的callback全部执行、返回future全部被设置后才会关闭Producer。
- 有限关闭:如果您的callback在执行过程中有可能阻塞,但您又希望close方法能在短时间内返回,可以使用有限关闭。有限关闭对应的方法是close(long timeoutMs),如果超过指定的timeoutMs后Producer仍未完全关闭,它会抛出IllegalStateException异常,这意味着缓存的数据可能还没来得及处理就被丢弃,用户注册的Callback也可能不会被执行。
Producer性能基线
上报日志时,请参考如下参数的测试性能基线,若超出基线值,可能会导致日志上报异常。
ECS虚拟机配置参考如下:
- 实例规格:通用计算增强型c7.xlarge.2
- CPU:4 vCPU
- 内存:8 GB
- 基准带宽:100 Mbit/s
- OS:Huawei Cloud EulerOS release 2.0
- JVM:OpenJDK 64-Bit Server VM (build 17.0.7+7, mixed mode)
测试程序说明(单个producer):
- totalSizeInBytes: 104857600
- maxBlockMs:0
- batchSizeThresholdInBytes: 1048576
- batchCountThreshold:40960
- lingerMs:2000
- ioThreadCount:具体用例中调整
- JVM初始堆/最大堆大小:1GB
- 发送日志总条数:100,000,000
- 发送日志总大小:约50GB
按照参数基线值设置后,使用华为云ECS机器作为日志上报环境,通过华为云内网服务入口进行上报。
标准日志SDK上报性能基线测试结果参考:
- 上报日志格式:测试上报一批日志,包含4条日志,总大小约为2.2KB。为了模拟数据的随机性,测试使用的日志数据为随机字符串,单条日志大小约为510字节。
[ { "contents" : [{ "log" : "随机字符串510字节", "log_time_ns" : 1637527157333902200 }, { "log" : "随机字符串510字节", "log_time_ns" : 1637527157333902200 } ] }, { "contents" : [{ "log" : "随机字符串510字节", "log_time_ns" : 1637527157333902200 }, { "log" : "随机字符串510字节", "log_time_ns" : 1737527157333987400 } ] } ]
- 性能基线参考如下:
表2 标准日志SDK上报性能基线 IO 线程数量
数据吞吐量
数据吞吐速率
CPU 使用率
2
7.8 MB/S
1.5W 条/S
9 %
4
15.4 MB/S
3.1W 条/S
19 %
6
19.7 MB/S
3.9W 条/S
27 %
结构化日志SDK上报性能基线测试结果参考:
- 上报日志格式:测试上报一批日志,包含4条日志,总大小约为2.2KB。 单条日志包含10个键值对以及content、time字段。 为了模拟数据的随机性,测试使用的数据为随机字符串,单条日志大小约为550字节。
{ "logs" : [ { "contents" : [ { "content": "sdk-log-new-struct-1", "content_key_1": "XmGFubcemwrceBWbZYRBTgohfxfFih", "..." : "...", "content_key_10": "amchrqwPdigHopmAkNLvJtNxgiPUzh" },{ "content": "sdk-log-new-struct-2", "content_key_1": "zOPDFRCNYsVznSgtnFejWFbaxklkMQ", "..." : "...", "content_key_10": "mLzpbYcumXsIgYtQIbzizoACLtUgwS" } ], "time": 1645374890235 },{ "contents" : [ { "content": "sdk-log-new-struct-3", "content_key_1": "SaGsfDrQskJaHlciNAUXFyxiqCAqXe", "..." : "...", "content_key_10": "wMQNuoVWonxVSsRsocQoDkEjcjiPio" },{ "content": "sdk-log-new-struct-4", "content_key_1": "bHDjNmAvdiLAvWdxoETANqCYxhVMMk", "..." : "...", "content_key_10": "scsxtrXrPUFYVARzOvbCxSofYZBsFV" } ], "time": 1645374890235 } ] }
- 性能基线参考如下:
表3 结构化日志SDK上报性能基线 IO 线程数量
数据吞吐量
数据吞吐速率
CPU 使用率
2
18.7 MB/S
3.5W 条/S
13 %
4
34.6 MB/S
6.4W 条/S
25 %
6
42.7 MB/S
7.8W 条/S
32 %
新版本结构化日志SDK上报性能基线测试结果参考:
- 上报日志格式:测试中使用的日志包含10个键值对以及content、logTime 字段。为了模拟数据的随机性,测试使用的数据为随机字符串。 单条日志大小约为540字节。
content: sdk-log-new-struct-1 content_key_1: sshyaqKCfrAPCMpdlxroPuCedeuJ content_key_2: RVFtqFBjBtTAHVvdHBYQsDsoogJc ... content_key_9: zLKeuxDnzGtupeZrQKKIlkQemXvX content_key_10: fYxmtYxKNfBhRfqMbZEOfimlsAIo logTime: 1645390242169
- 性能基线参考如下:
表4 新版本结构化日志SDK上报性能基线 IO 线程数量
数据吞吐量
数据吞吐速率
CPU 使用率
2
23.7MB/S
4.5W条/s
11%
4
45.6MB/S
8.7W条/s
23%
6
54.7MB/S
10.3W条/s
30%
总结如下:
- CPU时间主要花费在对象的序列化和压缩上,在吞吐量较高的情况下CPU使用率比较高。但在日常环境中,单机数据流量均值为100KB/S,因此造成的CPU消耗几乎可以忽略不计。
- 增加IO线程数量可以显著提高吞吐量,尤其是当IO线程数量少于可用处理器个数时。
- 调整totalSizeInBytes对吞吐量影响不够显著,增加totalSizeInBytes会造成更多的CPU消耗,建议使用默认值。
- 当日志上报量超过单个producer时:
- 建议拆分日志流,使用多个producer上报日志,分摊流量,以保障SDK处于正常上报状态。
- 如果maxBlockMs为0时,SDK处于非阻塞状态,会触发保护机制自动降级,可能会对部分日志做丢弃处理。
- 如果maxBlockMs大于0时,SDK处于阻塞状态,阻塞时间为maxBlockMs,可能会造成producer.send()发送日志方法处于阻塞状态。