使用SDK-API消费日志(邀测)
云日志服务LTS提供SDK-API消费日志接口,用户可以通过调用SDK提供的基础API来消费日志。
目前此功能在邀测中,暂不支持申请开通。
前提条件
- 使用云日志SDK前,您需要注册华为云账号,并开通云日志服务。
- 确认云日志服务的区域,请用户根据所在区域,获取regionName。
- 获取华为账号的AK/SK。
- 获取华为云账号的项目ID(project id),步骤参考:请参见“我的凭证 >API凭证”。
- 获取需要上报到LTS的日志组ID、日志流ID。
- 云日志服务SDK仅支持在华为云ECS主机上使用。
- 当用户修改权限后,权限信息在一天后生效。
安装SDK
您可以通过Maven依赖方式安装日志服务Java SDK,在Maven项目中加入依赖项。
- maven构建时,settings.xml文件的mirrors节点需要增加华为外部开源仓。如下:
<mirror> <id>huaweicloud</id> <mirrorOf>*</mirrorOf> <url>https://repo.huaweicloud.com/repository/maven/</url> </mirror>
- 您可以在华为外部开源仓中获取日志服务Java SDK依赖的最新版本。
- 在Maven工程中使用日志服务Java SDK,只需在pom.xml中加入相应依赖即可,Maven项目管理工具会自动下载相关JAR包。
<dependency> <groupId>io.github.huaweicloud</groupId> <artifactId>lts-sdk-common</artifactId> <version>此功能在1.0.3版本以上支持,建议在第2步中获取最新版本</version> </dependency> <dependency> <groupId>io.github.huaweicloud</groupId> <artifactId>lts-sdk-java</artifactId> <version>此功能在1.0.3版本以上支持,建议在第2步中获取最新版本</version> </dependency>
配置参数说明
LogConsumerConfig中配置参数说明:
参数名称 |
描述 |
类型 |
是否需要填写 |
默认值 |
---|---|---|---|---|
regionName |
云日志服务的区域 |
String |
必填 |
- |
projectId |
华为云账号的项目ID(project id) |
String |
必填 |
- |
logGroupId |
LTS的日志组ID |
String |
必填 |
- |
logStreamId |
LTS的日志流ID |
String |
必填 |
- |
accessKeyId |
华为云账号的AK |
String |
必填 |
- |
accessKeySecret |
华为云账号的SK |
String |
必填 |
- |
startTimeNs |
消费开始时间,纳秒值 |
Long |
必填 |
- |
endTimeNs |
消费结束时间,纳秒值 |
Long |
选填 |
- |
batchSize |
每批次消费数量,可选范围[10, 1000] |
Integer |
选填 |
- |
- startTimeNs(消费开始时间)和endTimeNs(消费结束时间),这里的时间指的是服务端时间。
- API消费保证最终一致性,即您可以获取到这条日志流的全部内容,但由于时间为服务端时间,所以在获取日志流的过程中,可能导致获取到的日志数量跟LTS页面查询的日志数量不一致。
API消费示例代码
- 消费某段时间日志
import java.util.List; import org.apache.commons.lang3.StringUtils; import com.huaweicloud.lts.producer.exception.LogException; import com.huaweicloud.lts.producer.http.Client; import com.huaweicloud.lts.producer.model.consumer.LogData; import com.huaweicloud.lts.producer.model.consumer.response.BatchGetLogResponse; import com.huaweicloud.lts.producer.model.consumer.response.GetCursorByTimeResponse; import com.huaweicloud.lts.producer.model.consumer.response.GetShardIdListResponse; // 此示例为消费某段时间日志 public class ApiConsumerRangeTimeDemo { // 云日志服务的区域 (regionName) private static final String TEST_REGION_NAME = "TEST_REGION_NAME"; // 华为云账号的项目ID (projectId) private static final String TEST_PROJECT = "TEST_PROJECT"; // 华为云账号的AK (accessKeyId), 本示例从环境变量中获取AK private static final String ACCESS_KEY_ID = System.getenv("ACCESS_KEY_ID"); // 华为云账号的SK (accessKeySecret), 本示例从环境变量中获取SK private static final String ACCESS_KEY_SECRET = System.getenv("ACCESS_KEY_SECRET"); // LTS的日志组ID (logGroupId) private static final String TEST_LOG_GROUP_ID = "TEST_LOG_GROUP_ID"; // LTS的日志流ID (logStreamId) private static final String TEST_LOG_STREAM_ID = "TEST_LOG_STREAM_ID"; // 消费开始时间(纳秒) (startTimeNs) private static final String START_TIME_NS = "1718680750082539300"; // 消费结束时间(纳秒) (endTimeNs) private static final String END_TIME_NS = "1718680750082539301"; // 每批次消费日志数量 (batchSize) private static final String BATCH_SIZE = "1718680750082539300"; public static void main(String[] args) { // 创建日志服务Client Client client = new Client(TEST_REGION_NAME, ACCESS_KEY_ID, ACCESS_KEY_SECRET); try { // 查询日志流ShardId列表 GetShardIdListResponse shardIdList = client.getShardIdList(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID); System.out.printf("%s has %s shards\n", TEST_LOG_STREAM_ID, shardIdList.getShardList().size()); // 分别从每个Shard中获取日志 for (String shardId : shardIdList.getShardList()) { // 通过开始时间来换取游标 GetCursorByTimeResponse startCursor = client.getCursorByTime(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID, shardId, START_TIME_NS); // 通过结束时间来换取游标 GetCursorByTimeResponse endCursor = client.getCursorByTime(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID, shardId, END_TIME_NS); String currentCursor = null; String nextCursor = startCursor.getCursor(); String finalCursor = endCursor.getCursor(); while (!StringUtils.equals(currentCursor, nextCursor)) { currentCursor = nextCursor; // 拉取到日志 BatchGetLogResponse batchGetLogResponse = client.batchGetLog(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID, shardId, BATCH_SIZE, currentCursor, finalCursor); List<LogData> logs = batchGetLogResponse.getLogs(); System.out.printf("Get %d logs from logStreamId:%s ,Shard:%s\n", logs.size(), TEST_LOG_STREAM_ID, shardId); // 移动此Shard游标 nextCursor = batchGetLogResponse.getNext(); } } } catch (LogException e) { System.out.println("error code :" + e.getErrorCode()); System.out.println("error message :" + e.getErrorMessage()); throw new RuntimeException(e); } } }
- 从某个时间点开始消费日志
import java.util.HashMap; import java.util.List; import java.util.Map; import com.huaweicloud.lts.producer.exception.LogException; import com.huaweicloud.lts.producer.http.Client; import com.huaweicloud.lts.producer.model.consumer.LogData; import com.huaweicloud.lts.producer.model.consumer.response.BatchGetLogResponse; import com.huaweicloud.lts.producer.model.consumer.response.GetCursorByTimeResponse; import com.huaweicloud.lts.producer.model.consumer.response.GetShardIdListResponse; // 此示例为从某个时间点开始消费日志 public class ApiConsumerStartTimeDemo { // 云日志服务的区域 (regionName) private static final String TEST_REGION_NAME = "TEST_REGION_NAME"; // 华为云账号的项目ID (projectId) private static final String TEST_PROJECT = "TEST_PROJECT"; // 华为云账号的AK (accessKeyId), 本示例从环境变量中获取AK private static final String ACCESS_KEY_ID = System.getenv("ACCESS_KEY_ID"); // 华为云账号的SK (accessKeySecret), 本示例从环境变量中获取SK private static final String ACCESS_KEY_SECRET = System.getenv("ACCESS_KEY_SECRET"); // LTS的日志组ID (logGroupId) private static final String TEST_LOG_GROUP_ID = "TEST_LOG_GROUP_ID"; // LTS的日志流ID (logStreamId) private static final String TEST_LOG_STREAM_ID = "TEST_LOG_STREAM_ID"; // 消费开始时间(纳秒) (startTimeNs) private static final String START_TIME_NS = "1718680750082539300"; // 每批次消费日志数量 (batchSize) private static final String BATCH_SIZE = "1718680750082539300"; public static void main(String[] args) { // 创建日志服务Client Client client = new Client(TEST_REGION_NAME, ACCESS_KEY_ID, ACCESS_KEY_SECRET); try { // 查询日志流ShardId列表 GetShardIdListResponse shardIdList = client.getShardIdList(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID); System.out.printf("%s has %s shards\n", TEST_LOG_STREAM_ID, shardIdList.getShardList().size()); Map<String, String> cursorMap = new HashMap<>(); for (String shardId : shardIdList.getShardList()) { // 通过时间来换取游标 GetCursorByTimeResponse cursor = client.getCursorByTime(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID, shardId, START_TIME_NS); cursorMap.put(shardId, cursor.getCursor()); } while (true) { // 分别从每个shard中获取日志 for (String shardId : shardIdList.getShardList()) { String cursorC = cursorMap.get(shardId); BatchGetLogResponse batchGetLogResponse = client.batchGetLog(TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID, shardId, BATCH_SIZE, cursorC); // 拉取到日志 List<LogData> logs = batchGetLogResponse.getLogs(); System.out.printf("Get %d logs from logStreamId:%s ,Shard:%s ,cursorCurrent:%s\n", logs.size(), TEST_LOG_STREAM_ID, shardId, cursorC); // 移动此shard游标 cursorMap.put(shardId, batchGetLogResponse.getNext()); } } } catch (LogException e) { System.out.println("error code :" + e.getErrorCode()); System.out.println("error message :" + e.getErrorMessage()); throw new RuntimeException(e); } } }