文档首页/ 云日志服务 LTS/ 用户指南/ 日志消费与加工/ 使用SDK-API消费日志(邀测)
更新时间:2024-08-23 GMT+08:00
分享

使用SDK-API消费日志(邀测)

云日志服务LTS提供SDK-API消费日志接口,用户可以通过调用SDK提供的基础API来消费日志。

目前此功能在邀测中,暂不支持申请开通。

前提条件

  • 使用云日志SDK前,您需要注册华为云账号,并开通云日志服务。
  • 确认云日志服务的区域,请用户根据所在区域,获取regionName。
  • 获取华为账号的AK/SK
  • 获取华为云账号的项目ID(project id),步骤参考:请参见“我的凭证 > API凭证”。
  • 获取需要上报到LTS的日志组ID、日志流ID。
  • 云日志服务SDK仅支持在华为云ECS主机上使用。
  • 当用户修改权限后,权限信息在一天后生效。

安装SDK

您可以通过Maven依赖方式安装日志服务Java SDK,在Maven项目中加入依赖项。

  1. maven构建时,settings.xml文件的mirrors节点需要增加华为外部开源仓。如下:
    <mirror>
        <id>huaweicloud</id>
        <mirrorOf>*</mirrorOf>
        <url>https://repo.huaweicloud.com/repository/maven/</url>
    </mirror>
  2. 您可以在华为外部开源仓中获取日志服务Java SDK依赖的最新版本。
  3. 在Maven工程中使用日志服务Java SDK,只需在pom.xml中加入相应依赖即可,Maven项目管理工具会自动下载相关JAR包。
    <dependency>
        <groupId>io.github.huaweicloud</groupId>
        <artifactId>lts-sdk-common</artifactId>
        <version>此功能在1.0.3版本以上支持,建议在第2步中获取最新版本</version>
    </dependency>
    
    <dependency>
        <groupId>io.github.huaweicloud</groupId>
        <artifactId>lts-sdk-java</artifactId>
        <version>此功能在1.0.3版本以上支持,建议在第2步中获取最新版本</version>
    </dependency>

配置参数说明

LogConsumerConfig中配置参数说明:

表1 配置参数说明

参数名称

描述

类型

是否需要填写

默认值

regionName

云日志服务的区域

String

必填

-

projectId

华为云账号的项目ID(project id)

String

必填

-

logGroupId

LTS的日志组ID

String

必填

-

logStreamId

LTS的日志流ID

String

必填

-

accessKeyId

华为云账号的AK

String

必填

-

accessKeySecret

华为云账号的SK

String

必填

-

startTimeNs

消费开始时间,纳秒值

Long

必填

-

endTimeNs

消费结束时间,纳秒值

Long

选填

-

batchSize

每批次消费数量,可选范围[10, 1000]

Integer

选填

-

  • startTimeNs(消费开始时间)和endTimeNs(消费结束时间),这里的时间指的是服务端时间。
  • API消费保证最终一致性,即您可以获取到这条日志流的全部内容,但由于时间为服务端时间,所以在获取日志流的过程中,可能导致获取到的日志数量跟LTS页面查询的日志数量不一致。

API消费示例代码

  • 消费某段时间日志
    import java.util.List;
    import org.apache.commons.lang3.StringUtils;
    
    import com.huaweicloud.lts.producer.exception.LogException;
    import com.huaweicloud.lts.producer.http.Client;
    import com.huaweicloud.lts.producer.model.consumer.LogData;
    import com.huaweicloud.lts.producer.model.consumer.response.BatchGetLogResponse;
    import com.huaweicloud.lts.producer.model.consumer.response.GetCursorByTimeResponse;
    import com.huaweicloud.lts.producer.model.consumer.response.GetShardIdListResponse;
    
    // 此示例为消费某段时间日志
    public class ApiConsumerRangeTimeDemo {
    
        // 云日志服务的区域 (regionName)
        private static final String TEST_REGION_NAME = "TEST_REGION_NAME";
    
        // 华为云账号的项目ID (projectId)
        private static final String TEST_PROJECT = "TEST_PROJECT";
    
        // 华为云账号的AK (accessKeyId), 本示例从环境变量中获取AK
        private static final String ACCESS_KEY_ID = System.getenv("ACCESS_KEY_ID");
    
        // 华为云账号的SK (accessKeySecret), 本示例从环境变量中获取SK
        private static final String ACCESS_KEY_SECRET = System.getenv("ACCESS_KEY_SECRET");
    
        // LTS的日志组ID (logGroupId)
        private static final String TEST_LOG_GROUP_ID = "TEST_LOG_GROUP_ID";
    
        // LTS的日志流ID (logStreamId)
        private static final String TEST_LOG_STREAM_ID = "TEST_LOG_STREAM_ID";
    
        // 消费开始时间(纳秒) (startTimeNs)
        private static final String START_TIME_NS = "1718680750082539300";
    
        // 消费结束时间(纳秒) (endTimeNs)
        private static final String END_TIME_NS = "1718680750082539301";
    
        // 每批次消费日志数量 (batchSize)
        private static final String BATCH_SIZE = "1718680750082539300";
    
        public static void main(String[] args) {
            // 创建日志服务Client
            Client client = new Client(TEST_REGION_NAME, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
            try {
                // 查询日志流ShardId列表
                GetShardIdListResponse shardIdList = client.getShardIdList(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID);
                System.out.printf("%s has %s shards\n", TEST_LOG_STREAM_ID, shardIdList.getShardList().size());
    
                // 分别从每个Shard中获取日志
                for (String shardId : shardIdList.getShardList()) {
                    // 通过开始时间来换取游标
                    GetCursorByTimeResponse startCursor = client.getCursorByTime(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID, shardId, START_TIME_NS);
                    // 通过结束时间来换取游标
                    GetCursorByTimeResponse endCursor = client.getCursorByTime(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID, shardId, END_TIME_NS);
    
                    String currentCursor = null;
                    String nextCursor = startCursor.getCursor();
                    String finalCursor = endCursor.getCursor();
                    while (!StringUtils.equals(currentCursor, nextCursor)) {
                        currentCursor = nextCursor;
                        // 拉取到日志
                        BatchGetLogResponse batchGetLogResponse = client.batchGetLog(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID, shardId, BATCH_SIZE, currentCursor, finalCursor);
                        List<LogData> logs = batchGetLogResponse.getLogs();
                        System.out.printf("Get %d logs from logStreamId:%s ,Shard:%s\n", logs.size(), TEST_LOG_STREAM_ID, shardId);
                        // 移动此Shard游标
                        nextCursor = batchGetLogResponse.getNext();
                    }
                }
            } catch (LogException e) {
                System.out.println("error code :" + e.getErrorCode());
                System.out.println("error message :" + e.getErrorMessage());
                throw new RuntimeException(e);
            }
        }
    }
  • 从某个时间点开始消费日志
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import com.huaweicloud.lts.producer.exception.LogException;
    import com.huaweicloud.lts.producer.http.Client;
    import com.huaweicloud.lts.producer.model.consumer.LogData;
    import com.huaweicloud.lts.producer.model.consumer.response.BatchGetLogResponse;
    import com.huaweicloud.lts.producer.model.consumer.response.GetCursorByTimeResponse;
    import com.huaweicloud.lts.producer.model.consumer.response.GetShardIdListResponse;
    
    // 此示例为从某个时间点开始消费日志
    public class ApiConsumerStartTimeDemo {
    
        // 云日志服务的区域 (regionName)
        private static final String TEST_REGION_NAME = "TEST_REGION_NAME";
    
        // 华为云账号的项目ID (projectId)
        private static final String TEST_PROJECT = "TEST_PROJECT";
    
        // 华为云账号的AK (accessKeyId), 本示例从环境变量中获取AK
        private static final String ACCESS_KEY_ID = System.getenv("ACCESS_KEY_ID");
    
        // 华为云账号的SK (accessKeySecret), 本示例从环境变量中获取SK
        private static final String ACCESS_KEY_SECRET = System.getenv("ACCESS_KEY_SECRET");
    
        // LTS的日志组ID (logGroupId)
        private static final String TEST_LOG_GROUP_ID = "TEST_LOG_GROUP_ID";
    
        // LTS的日志流ID (logStreamId)
        private static final String TEST_LOG_STREAM_ID = "TEST_LOG_STREAM_ID";
    
        // 消费开始时间(纳秒) (startTimeNs)
        private static final String START_TIME_NS = "1718680750082539300";
    
        // 每批次消费日志数量 (batchSize)
        private static final String BATCH_SIZE = "1718680750082539300";
    
        public static void main(String[] args) {
            // 创建日志服务Client
            Client client = new Client(TEST_REGION_NAME, ACCESS_KEY_ID, ACCESS_KEY_SECRET);
            try {
                // 查询日志流ShardId列表
                GetShardIdListResponse shardIdList = client.getShardIdList(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID);
                System.out.printf("%s has %s shards\n", TEST_LOG_STREAM_ID, shardIdList.getShardList().size());
    
                Map<String, String> cursorMap = new HashMap<>();
                for (String shardId : shardIdList.getShardList()) {
                    // 通过时间来换取游标
                    GetCursorByTimeResponse cursor = client.getCursorByTime(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID, shardId, START_TIME_NS);
                    cursorMap.put(shardId, cursor.getCursor());
                }
                while (true) {
                    // 分别从每个shard中获取日志
                    for (String shardId : shardIdList.getShardList()) {
                        String cursorC = cursorMap.get(shardId);
                        BatchGetLogResponse batchGetLogResponse = client.batchGetLog(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID, shardId, BATCH_SIZE, cursorC);
                        // 拉取到日志
                        List<LogData> logs = batchGetLogResponse.getLogs();
                        System.out.printf("Get %d logs from logStreamId:%s ,Shard:%s ,cursorCurrent:%s\n", logs.size(), TEST_LOG_STREAM_ID, shardId, cursorC);
                        // 移动此shard游标
                        cursorMap.put(shardId, batchGetLogResponse.getNext());
                    }
                }
            } catch (LogException e) {
                System.out.println("error code :" + e.getErrorCode());
                System.out.println("error message :" + e.getErrorMessage());
                throw new RuntimeException(e);
            }
        }
    }

参数获取方式

  • 区域表

    区域名称

    RegionName

    华北-北京二

    cn-north-2

    华北-北京四

    cn-north-4

    华北-北京一

    cn-north-1

    华东-上海二

    cn-east-2

    华东-上海一

    cn-east-3

    华南-广州

    cn-south-1

    华南-深圳

    cn-south-2

    西南-贵阳一

    cn-southwest-2

    亚太-新加坡

    ap-southeast-3

  • 日志组ID:登录云日志服务页面,选择“日志管理”,鼠标悬浮在日志组名称上,可查看日志组名称和日志组ID。

  • 日志流ID单击日志组名称对应的按钮,鼠标悬浮在日志流名称上,可查看日志流名称和日志流ID

相关文档