更新时间:2025-06-18 GMT+08:00
分享

云日志服务Java SDK

云日志服务SDK提供了Java语言上报日志功能。

传输协议

HTTPS

使用前提

  • 参考注册华为账号并开通华为云中操作,完成注册。
  • 确认云日志服务的区域,请用户根据所在区域,选择RegionName
  • 获取华为账号的AK/SK
  • 获取华为云账号的项目ID(project id),详细步骤请参见API凭证
  • 获取需要上报到LTS的日志组ID日志流ID
  • 已安装Java开发环境。日志服务Java SDK支持JDK1.8及以上,您可以执行java –version命令检查您已安装的Java版本。如未安装,可以从Java官方网站下载安装包进行安装。
  • 使用Java SDK进行跨云/本地上报日志(此功能仅适用于开发调测,对于上报性能不做保障):仅支持华北-北京四、华东-上海一、华南-广州、西南-贵阳一局点。
  • 通过Java SDK上报日志到LTS的时间相距当前时间不超过2天,否则上报日志会被LTS删除。

日志上报方式

支持以下上报方式:标准日志、结构化日志(新版)、结构化日志。推荐您使用结构化日志(新版)上报,更加灵活可变,性能更好。

  1. 标准日志:
    • 批量上报日志,一次可以上报多条日志。
    • 字段log表示原始日志,即一条日志是一个字符串。
    • 字段log_time_ns表示此条日志上报时间,单位ns纳秒,便于在LTS页面查看日志时按照时间排序。
    • 可以将日志按照不同的labels标签进行分类上报。
    上报一批日志结构体如下:
    [{
    	"contents": [{
    		"log": "log content1",
    		"log_time_ns": 1737527157333902200
    	}, {
    		"log": "log content2",
    		"log_time_ns": 1737527157333914100
    	}],
    	"labels": "{\"lts-test-count\":\"2\"}"
    }, {
    	"contents": [{
    		"log": "log content3",
    		"log_time_ns": 1737527157333986200
    	}, {
    		"log": "log content4",
    		"log_time_ns": 1737527157333987400
    	}],
    	"labels": "{\"lts-test-count\":\"2\"}"
    }]
  2. 推荐使用结构化日志(新版)最低SDK版本为1.1.3。

    目前该方式仅支持白名单用户使用,如有需要,请提交工单申请开通。

    • 支持批量日志上报、单条日志上报。
    • 字段mContents表示一条日志。
    • 字段mKey表示当前这条日志中的一个key。
    • 字段mValue表示当前这条日志中的一个key对应的value值。
    • 字段mLogTime表示此条日志上报时间,单位ms毫秒。

    上报一批日志结构体如下:

    [{
    	"mContents": [{
    		"mKey": "content_key_1",
    		"mValue": "content_value_1"
    	}, {
    		"mKey": "content_key_2",
    		"mValue": "content_value_2"
    	}, {
    		"mKey": "content",
    		"mValue": "sdk-new-struct-log1"
    	}],
    	"mLogTime": 1744159440780
    }, {
    	"mContents": [{
    		"mKey": "content_key_1",
    		"mValue": "content_value_1"
    	}, {
    		"mKey": "content_key_2",
    		"mValue": "content_value_2"
    	}, {
    		"mKey": "content",   // 选填,云日志服务内部字段,此字段代表原始日志
    		"mValue": "sdk-new-struct-log2"
    	}],
    	"mLogTime": 1744159440780
    }]
  3. 结构化日志:

    目前该方式仅支持白名单用户使用,如有需要,请提交工单申请开通。

    • 批量上报日志,一次可以上报多条日志。
    • 字段contents表示某几条日志。单条日志使用K-V结构,即一条日志是一个JSON体。content为LTS保留字段,用来表示原始日志,可以不上报。
    • 字段time表示某几条日志上报时间,单位ms毫秒,LTS会将这几条日志在ms毫秒的基础上拓展为ns纳秒保存。这样会丢失这几条日志的先后顺序,可能会影响到您在LTS页面查看日志的先后顺序。
    • 字段labels表示这批日志公共的标签。
    • 字段path表示这批日志的路径。
    • 字段source表示这批日志的来源。

    上报一批日志结构体如下:

    {
    	"labels": {
    		"label": "label"
    	},
    	"logs": [{
    		"contents": [{
    			"k1": "v1",
    			"k2": "v2",
    			"content": "log content1"
    		}, {
    			"k3": "v3",
    			"k4": "v4",
    			"content": "log content2"
    		}],
    		"time": 1721784021037
    	}, {
    		"contents": [{
    			"k5": "v5",
    			"k6": "v6",
    			"content": "log content3"
    		}],
    		"time": 1721784021038
    	}, ],
    	"path": "path",
    	"source": "source"
    }

步骤一:安装SDK

在Maven项目中加入依赖项。

  1. 使用Maven构建项目时,settings.xml文件的mirrors节点需要增加华为外部开源仓。如下:
    <mirror>
        <id>huaweicloud</id>
        <mirrorOf>*</mirrorOf>
        <url>https://repo.huaweicloud.com/repository/maven/</url>
    </mirror>
  2. 您可以在华为外部开源仓中获取日志服务Java SDK依赖的最新版本。
  3. 在Maven工程中使用云日志服务Java SDK,只需在pom.xml中加入相应依赖即可,Maven项目管理工具会自动下载相关JAR包。在<dependencies>中加入如下内容:
    推荐使用最新版本version:1.1.3,第2步中查看SDK版本。
    <dependency> 
         <groupId>io.github.huaweicloud</groupId> 
         <artifactId>lts-sdk-common</artifactId> 
         <version>version</version> 
     </dependency> 
     <dependency> 
         <groupId>io.github.huaweicloud</groupId> 
         <artifactId>lts-sdk-java</artifactId> 
         <version>version</version> 
     </dependency>

步骤二:创建Producer

由于Producer会消耗用户的ECS资源,请将Producer变为公共资源(单例)。发送日志时,仅需要调用producer.send()方法即可。appender支持配置永久AK、SK。

创建Producer的示例参考如下,修改示例代码时参考表1

import com.huaweicloud.lts.appender.JavaSDKAppender;
import com.huaweicloud.lts.producer.Producer;

public class ProducerUtil {

    private static final Producer producer;

    static {
         /* 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险, 建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全;
         本示例以ak和sk保存在环境变量中为例, 运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK. */
        String ak = System.getenv("HUAWEICLOUD_SDK_AK");
        String sk = System.getenv("HUAWEICLOUD_SDK_SK");
        // 构建appender
        JavaSDKAppender appender = JavaSDKAppender.custom()
            // 华为云账号的项目ID(project id)
            .setProjectId("xxx")
            // 华为云账号的AK
            .setAccessKeyId(ak)
            // 华为云账号的SK
            .setAccessKeySecret(sk)
            // 华为云STS-Token, 若使用临时STS-Token, 则AK、SK需要填写与STS-Token配套的临时AK、SK
            // .setSecurityToken("")
            // 云日志服务的区域
            .setRegionName("xxx")
            // 单个Appender能缓存的日志大小上限
            .setTotalSizeInBytes(104857600)
            // producer发送日志时阻塞时间
            .setMaxBlockMs(0L)
            // 执行日志发送任务的线程池大小
            .setIoThreadCount(8)
            // producer发送单批日志量上限
            .setBatchSizeThresholdInBytes(524288)
            // producer发送单批日志条数上限
            .setBatchCountThreshold(4096)
            // producer发送单批日志等待时间
            .setLingerMs(2000)
            // producer发送日志失败后重试次数
            .setRetries(5)
            // 首次重试的退避时间
            .setBaseRetryBackoffMs(500L)
            // 重试的最大退避时间
            .setMaxRetryBackoffMs(500L)
            // 默认false, true: 可以跨云上报日志, false: 仅能在华为云ecs主机上报日志
            // .setEnableLocalTest(false)
            .builder();
        // 获取发送日志的producer
        producer = appender.getProducer();
    }

    public static Producer getProducer() {
        return producer;
    }
}
appender支持配置临时AK、SK、STS-Token。如果使用临时AK、SK、STS-Token,需要定期(在过期前)创建新的ProjectConfig,然后调用Producer.putProjectConfig()方法进行更新。示例代码如下:
// 如果使用临时AK、SK、STS-Token,需要定期(在过期前)创建新的ProjectConfig,然后调用Producer.putProjectConfig()方法进行更新。
ProjectConfig projectConfig = new ProjectConfig("projectId", "regionName", "accessKeyId", "accessKeySecret", "securityToken");
producer.putProjectConfig(projectConfig);
表1 Producer参数配置表

参数名称

描述

类型

是否需要填写

默认值

projectId

华为云号的项目ID(project id)。

String

必填

-

accessKeyId

华为云号的AK。认证用的AK硬编码到代码中或者明文存储都有很大的安全风险,建议密文存放,使用时解密,确保安全。

String

必填

-

accessKeySecret

华为云号的SK。认证用的SK硬编码到代码中或者明文存储都有很大的安全风险,建议密文存放,使用时解密,确保安全。

String

必填

-

securityToken

用户可以通过华为云IAM服务来获取临时访问密钥(临时AK/SK)和securitytoken。如果要使用临时访问密钥,则临时AK、SK、STS-Token为一套访问密钥,必须配合使用。

String

选填

-

regionName

云日志服务的区域。

String

必填

-

totalSizeInBytes

单个producer实例能缓存的日志大小上限。

int

选填

104857600(即100MB)

maxBlockMs

如果producer可用空间不足,调用者在send方法上的最大阻塞时间,单位为毫秒。默认为60000毫秒,建议为0毫秒。

  • 当maxBlockMs值>=0时,则阻塞到设置的时间,如果到达阻塞时间,还是不能获取到内存,即报错且丢弃日志。
  • 当maxBlockMs值=-1时,则一直阻塞到发送成功,且不会丢弃日志。

long

选填

60000毫秒

ioThreadCount

执行日志发送任务的线程池大小。

int

选填

客户机器空闲CPU数量,但一定大于1

batchSizeThresholdInBytes

当一个ProducerBatch中缓存的日志大小大于等于batchSizeThresholdInBytes时,该ProducerBatch缓存的日志将被发送到LTS。

int

选填

524288(即0.5MB)

batchCountThreshold

当一个ProducerBatch中缓存的日志条数大于等于batchCountThreshold时,该ProducerBatch缓存的日志将被发送到LTS。

int

选填

4096

lingerMs

一个ProducerBatch从创建到可发送的逗留时间。

int

选填

2s

retries

如果某个ProducerBatch首次发送失败,能够对其重试的次数,建议为5次。如果retries小于等于0,该ProducerBatch首次发送失败后将直接进入失败队列。

int

选填

10

baseRetryBackoffMs

首次重试的退避时间。

long

选填

0.1s

maxRetryBackoffMs

重试的最大退避时间。

long

选填

50s

enableLocalTest

是否开启跨云上报日志。(此功能仅适用于开发调测,对于上报性能不做保障)

  • 选true时,开启跨云后用户能通过公网(仅支持华北-北京四、华东-上海一、华南-广州、西南-贵阳一)访问。
  • 选false时,关闭跨云后用户是通过华为云当前区域主机访问。

boolean

选填

false

proxyIp

使用自定义ip端口上报日志。

String

选填

-

步骤三:构建发送日志结构体

  • 标准日志和结构化日志使用的日志结构体:
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import com.alibaba.fastjson.JSONObject;
    import com.huaweicloud.lts.producer.model.log.LogContent;
    import com.huaweicloud.lts.producer.model.log.LogItem;
    import com.huaweicloud.lts.producer.model.log.StructLog;
    import com.huaweicloud.lts.producer.model.log.StructLogItem;
    
    public class LogUtil {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ProducerUtil.class);
    
        public static void doSomething() throws InterruptedException {
            LOGGER.info("Before doSomething");
            Thread.sleep(30000);
            LOGGER.info("After doSomething");
        }
    
        // 标准日志模型
        public static List<LogItem> getLogItems(int size) {
            List<LogItem> logItemList = new ArrayList<>();
            logItemList.add(getLogItem(size));
            return logItemList;
        }
    
        private static LogItem getLogItem(int count) {
            LogItem logItem = new LogItem();
    
            // 客户可以自行定义日志标签, 如果不需要请传入一个空Map
            Map<String, String> labels = new HashMap<>();
            labels.put("labels-k", "labels-v");
            logItem.setLabels(JSONObject.toJSONString(labels));
    
            List<LogContent> contents = new ArrayList<>();
            for (int i = 1; i <= count; i++) {
                LogContent logContent = new LogContent();
                // 日志产生时间, 当前时间ns(必须为ns纳秒)
                long logTimeNs = System.currentTimeMillis() * 1000000L + System.nanoTime() % 1000000L;
                logContent.setLogTimeNs(logTimeNs);
                // 日志内容
                logContent.setLog("java-sdk-log-" + i);
                contents.add(logContent);
            }
            logItem.setContents(contents);
    
            return logItem;
        }
    
        // 结构化日志模型
        public static StructLogItem getStructLog(int size) {
            // 构建日志结构体
            StructLogItem structLogItem = new StructLogItem();
            // 设置此批日志公共标签
            structLogItem.setLabels(getPostLabels());
            // 添加日志List
            structLogItem.setLogs(getLogs(size));
            // 设置此批日志公共路径
            structLogItem.setPath("test-path");
            // 设置此批日志公共来源
            structLogItem.setSource("test-source");
            return structLogItem;
        }
    
        private static JSONObject getPostLabels() {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("common-k", "common-v");
            return jsonObject;
        }
    
        private static List<StructLog> getLogs(int size) {
            List<StructLog> logs = new ArrayList<>();
            for (int i = 1; i <= size; i++) {
                logs.add(getLog(size));
            }
            return logs;
        }
    
        private static StructLog getLog(int size) {
            StructLog structLog = new StructLog();
            // 日志产生时间, 当前时间ms值(必须为ms毫秒)
            long time = System.currentTimeMillis();
            structLog.setTime(time);
    
            List<JSONObject> contents = new ArrayList<>();
            for (int i = 1; i <= size; i++) {
                // 创建一条日志
                JSONObject jsonLog = new JSONObject();
    
                jsonLog.put("log-k", "log-v");
                // LTS保留字段"content", 指原始日志。
                jsonLog.put("content", "java-struct-log-" + i);
    
                contents.add(jsonLog);
            }
    
            structLog.setContents(contents);
            return structLog;
        }
    
    }
  • 结构化日志(新版)的结构体如下,SDK版本1.1.3及以上才支持此功能。
    // 批量日志
    public static List<LtsStructLog> getNewBatchStructLog(int size) {
        List<LtsStructLog> batchStructLog = new ArrayList<>();
        for (int i = 1; i <= size; i++) {
            // 创建一条日志
            LtsStructLog ltsStructLog = new LtsStructLog();
    
            ltsStructLog.pushBack("content_key_1", "content_value_1");
            ltsStructLog.pushBack("content_key_2", "content_value_2");
            // LTS保留字段content, 指原始日志。
            ltsStructLog.pushBack("content", "sdk_new_struct_log");
            // 日志产生时间, 当前时间ms值(必须为ms毫秒)
            ltsStructLog.setLogTime(System.currentTimeMillis());
    
            batchStructLog.add(ltsStructLog);
        }
        return batchStructLog;
    }
    
    // 单条日志
    public static LtsStructLog getNewStructLog() {
        // 创建一条日志
        LtsStructLog ltsStructLog = new LtsStructLog();
    
        ltsStructLog.pushBack("content_key_1", "content_value_1");
        ltsStructLog.pushBack("content_key_2", "content_value_2");
        // LTS保留字段content, 指原始日志。
        ltsStructLog.pushBack("content", "sdk_new_struct_log");
        // 日志产生时间, 当前时间ms值(必须为ms毫秒)
        ltsStructLog.setLogTime(System.currentTimeMillis());
        return ltsStructLog;
    }

步骤四:发送日志

由于Producer是异步发送日志,会有本地缓存,所以单个Producer只能选择标准日志或者结构化日志的一种上报,不能同时上报两种日志格式。

  • 异步发送日志,无需返回值进行额外操作。
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import com.huaweicloud.lts.producer.Producer;
    import com.huaweicloud.lts.producer.exception.ProducerException;
    
    public class SamplePerformance {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SamplePerformance.class);
    
        public static void main(String[] args) throws InterruptedException {
            // 获取日志发送的producer
            Producer producer = ProducerUtil.getProducer();
    
            int sendThreadCount = Math.max(Runtime.getRuntime().availableProcessors(), 1);
            ExecutorService executorService = Executors.newFixedThreadPool(sendThreadCount);
            final CountDownLatch latch = new CountDownLatch(sendThreadCount);
    
            LOGGER.info("Test started.");
            long t1 = System.currentTimeMillis();
            int n = 100;
            for (int i = 1; i <= sendThreadCount; ++i) {
                executorService.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            for (int i = 0; i < n; ++i) {
               
                                // 上报标准日志
                                producer.send("log_group_id", "log_stream_id", LogUtil.getLogItems(1));
                                // 使用日志组名称,日志流名称发送日志。SDK版本最低必须为1.1.0支持此功能。
                                // producer.send("log_group_name", "log_stream_name", true, logItemList);
                                // 注意:方法中第3个参数,请填true,代表使用名称发送日志。如果是false,则是ID发送日志。
    
                                // 上报结构化日志
                                // producer.send("log_group_id", "log_stream_id", LogUtil.getStructLog(1));
                                // 使用日志组名称,日志流名称发送日志。SDK版本最低必须为1.1.0支持此功能。
                                // producer.send("log_group_name", "log_stream_name", true, structLog);
                                // 注意:方法中第3个参数,请填true,代表使用名称发送日志。如果是false,则是ID发送日志。
    
                                // 上报新版结构化日志
                                // 备注:上报参数中的"log_group" 默认表示日志组Id、"log_stream" 默认表示日志流Id。
                                // 如果需要日志组名称和日志流名称上报日志,只需要在初始化appender时,设置参数:setLogGroupStreamByName(true)
                                // 上报单条日志
                                // producer.sendStructLog("log_group", "log_stream", LogUtil.getNewStructLog());
                                // 上报一批日志
                                // producer.sendStructLog("log_group", "log_stream", LogUtil.getNewBatchStructLog(1));
                            }
                        } catch (Exception e) {
                            LOGGER.error("Failed to send log, e=", e);
                        } finally {
                            latch.countDown();
                        }
                    }
                });
            }
            latch.await();
            Thread.sleep(30000);
            long t2 = System.currentTimeMillis();
            LOGGER.info("Test end.");
            LOGGER.info("======Summary======");
            LOGGER.info("Total count " + sendThreadCount * n + ".");
            long timeCost = t2 - t1;
            LOGGER.info("Time cost " + timeCost + " millis");
            try {
                producer.close();
            } catch (ProducerException e) {
                LOGGER.error("Failed to close producer, e=", e);
            }
            executorService.shutdown();
        }
    
    }
  • 通过Future来执行额外操作。

    Send 方法会返回一个ListenableFuture,它除了可以像普通future那样通过调用get方法阻塞获得发送结果外,还允许您注册回调方法(回调方法会在完成 future 设置后被调用)。以下代码片段展示了ListenableFuture的使用方法,用户需要为该future注册一个FutureCallback并将其投递到应用提供的线程池EXECUTOR_SERVICE中执行。

    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicLong;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import com.google.common.util.concurrent.FutureCallback;
    import com.google.common.util.concurrent.Futures;
    import com.google.common.util.concurrent.ListenableFuture;
    import com.huaweicloud.lts.producer.Producer;
    import com.huaweicloud.lts.producer.Result;
    import com.huaweicloud.lts.producer.exception.ProducerException;
    import com.huaweicloud.lts.producer.exception.ResultFailedException;
    import com.huaweicloud.lts.producer.model.log.LogItem;
    import com.huaweicloud.lts.producer.model.log.StructLogItem;
    
    public class SampleProducerWithFuture {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithFuture.class);
    
        private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(
            Math.max(Runtime.getRuntime().availableProcessors(), 1));
    
        public static void main(String[] args) throws InterruptedException {
            Producer producer = ProducerUtil.getProducer();
            int n = 100;
            int size = 20;
    
            final AtomicLong completed = new AtomicLong(0);
    
            for (int i = 0; i < n; ++i) {
                try {
                    // 上报标准日志
                    List<LogItem> logItems = LogUtil.getLogItems(size);
                    ListenableFuture<Result> f = producer.send("log_group_id", "log_stream_id", logItems);
                    Futures.addCallback(f, new SampleFutureCallback("log_group_id", "log_stream_id", logItems, completed),
                        EXECUTOR_SERVICE);
    
                    // 上报结构化日志
                    // StructLogItem structLog = LogUtil.getStructLog(size);
                    // ListenableFuture<Result> f = producer.send("log_group_id", "log_stream_id", structLog);
                    // Futures.addCallback(f, new SampleFutureCallback("log_group_id", "log_stream_id", structLog, completed),
                    //     EXECUTOR_SERVICE);
    
                    // 上报新版结构化日志
                    // List<LtsStructLog> newBatchStructLog = LogUtil.getNewBatchStructLog(1);
                    // ListenableFuture<Result> f = producer.sendStructLog("log_group", "log_stream", newBatchStructLog);
                    // Futures.addCallback(f, new SampleFutureCallback("log_group", "log_stream", completed, newBatchStructLog),
                    //     EXECUTOR_SERVICE);
    
                } catch (Exception e) {
                    LOGGER.error("Failed to send logs, e=", e);
                }
            }
    
            LogUtil.doSomething();
    
            try {
                producer.close();
            } catch (ProducerException e) {
                LOGGER.info("Failed to close producer, e=", e);
            }
    
            EXECUTOR_SERVICE.shutdown();
            while (!EXECUTOR_SERVICE.isTerminated()) {
                EXECUTOR_SERVICE.awaitTermination(100, TimeUnit.MILLISECONDS);
            }
            LOGGER.info("All log complete, completed={}", completed.get());
        }
    
        private static final class SampleFutureCallback implements FutureCallback<Result> {
    
            private static final Logger LOGGER = LoggerFactory.getLogger(SampleFutureCallback.class);
    
            private final String logGroupId;
    
            private final String logStreamId;
    
            private List<LogItem> logItems = null;
    
            private StructLogItem structLog = null;
    
            private List<LtsStructLog> newBatchStructLog = null;
    
            private final AtomicLong completed;
    
            SampleFutureCallback(String logGroupId, String logStreamId, List<LogItem> logItems, AtomicLong completed) {
                this.logGroupId = logGroupId;
                this.logStreamId = logStreamId;
                this.logItems = logItems;
                this.completed = completed;
            }
    
            SampleFutureCallback(String logGroupId, String logStreamId, StructLogItem structLog, AtomicLong completed) {
                this.logGroupId = logGroupId;
                this.logStreamId = logStreamId;
                this.structLog = structLog;
                this.completed = completed;
            }
    
            SampleFutureCallback(String logGroupId, String logStreamId, AtomicLong completed, List<LtsStructLog> newBatchStructLog) {
                this.logGroupId = logGroupId;
                this.logStreamId = logStreamId;
                this.newBatchStructLog = newBatchStructLog;
                this.completed = completed;
            }
    
            @Override
            public void onSuccess(Result result) {
                LOGGER.info("Send logs successfully.");
                completed.getAndIncrement();
            }
    
            @Override
            public void onFailure(Throwable t) {
                if (t instanceof ResultFailedException) {
                    Result result = ((ResultFailedException) t).getResult();
                    LOGGER.error("Failed to send logs, logGroupId={}, logStreamId={}, result={}", logGroupId, logStreamId,
                        result);
                } else {
                    LOGGER.error("Failed to send log, e=", t);
                }
                completed.getAndIncrement();
            }
        }
    }
  • 通过Callback来执行额外操作。

    Callback由producer内部线程负责执行,并且只有在执行完毕后数据“占用”的空间才会释放。为了不阻塞producer造成整体吞吐量的下降,要避免在callback里执行耗时的操作。不建议在callback中调用send方法进行重试。SDK自带重试功能。

    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicLong;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import com.huaweicloud.lts.producer.Callback;
    import com.huaweicloud.lts.producer.Producer;
    import com.huaweicloud.lts.producer.Result;
    import com.huaweicloud.lts.producer.exception.ProducerException;
    import com.huaweicloud.lts.producer.model.log.LogItem;
    import com.huaweicloud.lts.producer.model.log.StructLogItem;
    
    public class SampleProducerWithCallback {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(SampleProducerWithCallback.class);
    
        private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(
            Math.max(Runtime.getRuntime().availableProcessors(), 1));
    
        public static void main(String[] args) throws InterruptedException {
            Producer producer = ProducerUtil.getProducer();
            int nTask = 100;
            int size = 20;
    
            final AtomicLong completed = new AtomicLong(0);
    
            final CountDownLatch latch = new CountDownLatch(nTask);
    
            for (int i = 0; i < nTask; ++i) {
                EXECUTOR_SERVICE.submit(() -> {
                    try {
                        // 上报标准日志
                        List<LogItem> logItems = LogUtil.getLogItems(size);
                        producer.send("log_group_id", "log_stream_id", logItems,
                            new SampleCallback("log_group_id", "log_stream_id", logItems, completed));
    
                        // 上报结构化日志
                        // StructLogItem structLog = LogUtil.getStructLog(size);
                        // producer.send("log_group_id", "log_stream_id", structLog,
                        //     new SampleCallback("log_group_id", "log_stream_id", structLog, completed));
    
                        // 上报新版结构化日志
                        // List<LtsStructLog> newBatchStructLog = LogUtil.getNewBatchStructLog(1);
                        // producer.sendStructLog("log_group", "log_stream", newBatchStructLog,
                        //     new SampleCallback("log_group", "log_stream", completed, newBatchStructLog));
                    } catch (Exception e) {
                        LOGGER.error("Failed to send log, e=", e);
                    } finally {
                        latch.countDown();
                    }
                });
            }
            latch.await();
            EXECUTOR_SERVICE.shutdown();
    
            LogUtil.doSomething();
    
            try {
                producer.close();
            } catch (InterruptedException e) {
                LOGGER.warn("The current thread has been interrupted from close.");
            } catch (ProducerException e) {
                LOGGER.info("Failed to close producer, e=", e);
            }
    
            LOGGER.info("All log complete, completed={}", completed.get());
        }
    
        private static final class SampleCallback implements Callback {
    
            private static final Logger LOGGER = LoggerFactory.getLogger(SampleCallback.class);
    
            private final String logGroupId;
    
            private final String logStreamId;
    
            private List<LogItem> logItems = null;
    
            private StructLogItem structLog = null;
    
            private List<LtsStructLog> newBatchStructLog = null;
    
            private final AtomicLong completed;
    
            SampleCallback(String logGroupId, String logStreamId, List<LogItem> logItems, AtomicLong completed) {
                this.logGroupId = logGroupId;
                this.logStreamId = logStreamId;
                this.logItems = logItems;
                this.completed = completed;
            }
    
            SampleCallback(String logGroupId, String logStreamId, StructLogItem structLog, AtomicLong completed) {
                this.logGroupId = logGroupId;
                this.logStreamId = logStreamId;
                this.structLog = structLog;
                this.completed = completed;
            }
    
            SampleCallback(String logGroupId, String logStreamId, AtomicLong completed, List<LtsStructLog> newBatchStructLog) {
                this.logGroupId = logGroupId;
                this.logStreamId = logStreamId;
                this.completed = completed;
                this.newBatchStructLog = newBatchStructLog;
            }
    
            @Override
            public void onCompletion(Result result) {
                try {
                    if (result.isSuccessful()) {
                        LOGGER.info("Send log successfully.");
                    } else {
                        LOGGER.error("Failed to send logs, logGroupId={}, logStreamId={}, result={}", logGroupId, logStreamId,
                            result);
                    }
                } finally {
                    completed.getAndIncrement();
                }
            }
        }
    }

步骤五:关闭Producer

当您已经没有数据需要发送或者当前进程准备退出时,需要关闭Producer,目的是让Producer中缓存的数据全部被处理。目前,Producer提供安全关闭和有限关闭两种模式。

  1. 安全关闭(推荐):建议您使用安全关闭。安全关闭对应的方法是close(),SDK会等到Producer中缓存的数据全部被处理、线程全部停止、注册的callback全部执行、返回future全部被设置后才会关闭Producer。
  2. 有限关闭:如果您的callback在执行过程中有可能阻塞,但您又希望close方法能在短时间内返回,可以使用有限关闭。有限关闭对应的方法是close(long timeoutMs),如果超过指定的timeoutMs后Producer仍未完全关闭,它会抛出IllegalStateException异常,这意味着缓存的数据可能还没来得及处理就被丢弃,用户注册的Callback也可能不会被执行。

Producer性能基线

上报日志时,请参考如下参数的测试性能基线,若超出基线值,可能会导致日志上报异常。

ECS虚拟机配置参考如下

  • 实例规格:通用计算增强型c7.xlarge.2
  • CPU:4 vCPU
  • 内存:8 GB
  • 基准带宽:100 Mbit/s
  • OS:Huawei Cloud EulerOS release 2.0
  • JVM:OpenJDK 64-Bit Server VM (build 17.0.7+7, mixed mode)

测试程序说明(单个producer)

  • totalSizeInBytes: 104857600
  • maxBlockMs:0
  • batchSizeThresholdInBytes: 1048576
  • batchCountThreshold:40960
  • lingerMs:2000
  • ioThreadCount:具体用例中调整
  • JVM初始堆/最大堆大小:1GB
  • 发送日志总条数:100,000,000
  • 发送日志总大小:约50GB

按照参数基线值设置后,使用华为云ECS机器作为日志上报环境,通过华为云内网服务入口进行上报。

标准日志SDK上报性能基线测试结果参考:

  • 上报日志格式:测试上报一批日志,包含4条日志,总大小约为2.2KB。为了模拟数据的随机性,测试使用的日志数据为随机字符串,单条日志大小约为510字节。
    [
        {
            "contents" : [{
                    "log" : "随机字符串510字节",
                    "log_time_ns" : 1637527157333902200
                }, {
                    "log" : "随机字符串510字节",
                    "log_time_ns" : 1637527157333902200
                }
            ]
        }, {
            "contents" : [{
                    "log" : "随机字符串510字节",
                    "log_time_ns" : 1637527157333902200
                }, {
                    "log" : "随机字符串510字节",
                    "log_time_ns" : 1737527157333987400
                }
            ]
        }
    ]
  • 性能基线参考如下:
    表2 标准日志SDK上报性能基线

    IO 线程数量

    数据吞吐量

    数据吞吐速率

    CPU 使用率

    2

    7.8 MB/S

    1.5W 条/S

    9 %

    4

    15.4 MB/S

    3.1W 条/S

    19 %

    6

    19.7 MB/S

    3.9W 条/S

    27 %

结构化日志SDK上报性能基线测试结果参考:

  • 上报日志格式:测试上报一批日志,包含4条日志,总大小约为2.2KB。 单条日志包含10个键值对以及content、time字段。 为了模拟数据的随机性,测试使用的数据为随机字符串,单条日志大小约为550字节。
    {
        "logs" : [
            {
                "contents" : [
                    {
                        "content": "sdk-log-new-struct-1",
                        "content_key_1":  "XmGFubcemwrceBWbZYRBTgohfxfFih",
                        "..." : "...",
                        "content_key_10":  "amchrqwPdigHopmAkNLvJtNxgiPUzh"
                    },{
                        "content": "sdk-log-new-struct-2",
                        "content_key_1":  "zOPDFRCNYsVznSgtnFejWFbaxklkMQ",
                        "..." : "...",
                        "content_key_10":  "mLzpbYcumXsIgYtQIbzizoACLtUgwS"
                    }
                ],
                "time": 1645374890235
            },{
                "contents" : [
                    {
                        "content": "sdk-log-new-struct-3",
                        "content_key_1":  "SaGsfDrQskJaHlciNAUXFyxiqCAqXe",
                        "..." : "...",
                        "content_key_10":  "wMQNuoVWonxVSsRsocQoDkEjcjiPio"
                    },{
                        "content": "sdk-log-new-struct-4",
                        "content_key_1":  "bHDjNmAvdiLAvWdxoETANqCYxhVMMk",
                        "..." : "...",
                        "content_key_10":  "scsxtrXrPUFYVARzOvbCxSofYZBsFV"
                    }
                ],
                "time": 1645374890235
            }
        ]
    }
  • 性能基线参考如下:
    表3 结构化日志SDK上报性能基线

    IO 线程数量

    数据吞吐量

    数据吞吐速率

    CPU 使用率

    2

    18.7 MB/S

    3.5W 条/S

    13 %

    4

    34.6 MB/S

    6.4W 条/S

    25 %

    6

    42.7 MB/S

    7.8W 条/S

    32 %

新版本结构化日志SDK上报性能基线测试结果参考:

  • 上报日志格式:测试中使用的日志包含10个键值对以及content、logTime 字段。为了模拟数据的随机性,测试使用的数据为随机字符串。 单条日志大小约为540字节。
    content: sdk-log-new-struct-1
    content_key_1:  sshyaqKCfrAPCMpdlxroPuCedeuJ
    content_key_2:  RVFtqFBjBtTAHVvdHBYQsDsoogJc
    ...
    content_key_9:  zLKeuxDnzGtupeZrQKKIlkQemXvX  
    content_key_10:  fYxmtYxKNfBhRfqMbZEOfimlsAIo
    logTime: 1645390242169
  • 性能基线参考如下:
    表4 新版本结构化日志SDK上报性能基线

    IO 线程数量

    数据吞吐量

    数据吞吐速率

    CPU 使用率

    2

    23.7MB/S

    4.5W条/s

    11%

    4

    45.6MB/S

    8.7W条/s

    23%

    6

    54.7MB/S

    10.3W条/s

    30%

总结如下:

  • CPU时间主要花费在对象的序列化和压缩上,在吞吐量较高的情况下CPU使用率比较高。但在日常环境中,单机数据流量均值为100KB/S,因此造成的CPU消耗几乎可以忽略不计。
  • 增加IO线程数量可以显著提高吞吐量,尤其是当IO线程数量少于可用处理器个数时。
  • 调整totalSizeInBytes对吞吐量影响不够显著,增加totalSizeInBytes会造成更多的CPU消耗,建议使用默认值。
  • 当日志上报量超过单个producer时:
    1. 建议拆分日志流,使用多个producer上报日志,分摊流量,以保障SDK处于正常上报状态。
    2. 如果maxBlockMs为0时,SDK处于非阻塞状态,会触发保护机制自动降级,可能会对部分日志做丢弃处理。
    3. 如果maxBlockMs大于0时,SDK处于阻塞状态,阻塞时间为maxBlockMs,可能会造成producer.send()发送日志方法处于阻塞状态。

参数获取方式

  • 区域表
    表5 区域表

    区域名称

    区域

    华北-北京二

    cn-north-2

    华北-北京四

    cn-north-4

    华北-北京一

    cn-north-1

    华东-上海二

    cn-east-2

    华东-上海一

    cn-east-3

    华南-广州

    cn-south-1

    华南-深圳

    cn-south-2

    西南-贵阳一

    cn-southwest-2

    亚太-新加坡

    ap-southeast-3

  • 日志组ID:在云日志服务控制台,选择“日志管理”,鼠标悬浮在日志组名称上,可查看日志组名称和日志组ID。
  • 日志流ID:单击日志组名称对应的,鼠标悬浮在日志流名称上,可查看日志流名称和日志流ID。

相关文档