使用Java SDK管理消费组
消费组创建完成后,您可以使用集成云日志服务Java SDK通过消费组消费数据。

目前此功能在邀测中,暂不支持申请开通。
前提条件
- 使用云日志SDK前,您需要注册华为云账号,并开通云日志服务。
- 确认云日志服务的区域,请用户根据所在区域,获取regionName。
- 获取华为账号的AK/SK。
- 获取华为云账号的项目ID(project id),步骤参考:请参见“我的凭证 > API凭证”。
- 获取需要上报到LTS的日志组ID、日志流ID、消费组名称。
- 云日志服务SDK仅支持在华为云ECS主机上使用。
- 当用户修改权限后,权限信息在一天后生效。
安装SDK
在Maven项目中加入依赖项。
- maven构建时,settings.xml文件的mirrors节点需要增加华为外部开源仓。如下:
<mirror> <id>huaweicloud</id> <mirrorOf>*</mirrorOf> <url>https://repo.huaweicloud.com/repository/maven/</url> </mirror>
- 您可以在华为外部开源仓中获取日志服务Java SDK依赖的最新版本。
- 在Maven工程中使用日志服务Java SDK,只需在pom.xml中加入相应依赖即可,Maven项目管理工具会自动下载相关JAR包。
推荐使用最新版本version:1.1.3,第2步中查看SDK版本。
<dependency> <groupId>io.github.huaweicloud</groupId> <artifactId>lts-sdk-common</artifactId> <version>version</version> </dependency> <dependency> <groupId>io.github.huaweicloud</groupId> <artifactId>lts-sdk-java</artifactId> <version>version</version> </dependency>
配置参数说明
LogConsumerConfig中配置参数说明:
参数名称 |
描述 |
类型 |
是否需要填写 |
默认值 |
---|---|---|---|---|
regionName |
云日志服务的区域 |
String |
必填 |
- |
projectId |
华为云账号的项目ID(project id) |
String |
必填 |
- |
logGroupId |
LTS的日志组ID |
String |
必填 |
- |
logStreamId |
LTS的日志流ID |
String |
必填 |
- |
accessKeyId |
华为云账号的AK |
String |
必填 |
- |
accessKeySecret |
华为云账号的SK |
String |
必填 |
- |
consumerGroupName |
LTS日志流对应的消费组名称 |
String |
必填 |
- |
startTimeNs |
消费开始时间,纳秒值 |
Long |
必填 |
- |
endTimeNs |
消费结束时间,纳秒值 |
Long |
选填 |
- |
batchSize |
每批次消费数量,可选范围[10, 1000] |
Integer |
选填 |
1000 |
consumerPosition |
BEGIN_CURSOR:消费组从头开始消费日志,起始消费位点为logStream中的第一条日志。 END_CURSOR: 此消费位点记录logStream日志的最后一条日志之后。 (SDK版本最低必须为1.0.7支持此功能) |
ConsumerPosition枚举:BEGIN_CURSOR、END_CURSOR |
选填 |
- |
enableLocalTest |
是否开启跨云消费日志(SDK版本最低必须为1.1.0支持此功能)。此功能仅适用于开发调测,对于消费性能不做保障。
|
boolean |
选填 |
false |
historyQuery |
是否消费历史数据(SDK版本最低必须为1.1.0支持此功能)。
|
boolean |
选填 |
false |
proxyIp |
使用自定义ip端口上报日志。 |
String |
选填 |
- |
- startTimeNs(消费开始时间)和endTimeNs(消费结束时间),这里的时间指的是服务端时间。
- SDK消费保证最终一致性,即您可以获取到这条日志流的全部内容,但由于时间为服务端时间,所以在获取日志流的过程中,可能导致获取到的日志数量跟LTS页面查询的日志数量不一致。
通过startTimeNs/endTimeNs可以在Shard中定位生命周期内的日志,假设LogStream日志流的生命周期为[begin_time,end_time),startTimeNs/endTimeNs=from_time,因此:
- 当from_time ≤ begin_time or from_time = "begin"时:返回时间点为begin_time对应的Cursor位置。
- 当from_time ≥ end_time or from_time = "end"时:返回当前时间点下一条将被写入的Cursor位置(当前该Cursor位置上无数据)。
- 当from_time > begin_time and from_time < end_time时:返回第一个服务端接收时间大于等于from_time的数据包对应的Cursor。
消费数据示例代码
- 创建消费者逻辑代码DemoLogConsumerProcessor.java
public class DemoLogConsumerProcessor implements ILogConsumerProcessor { @Override // 这个方法给您回调返回的ShardId, 是告诉您当前这个shard-consumer在消费那个shard public void initialize(String shardId) { } @Override // 数据处理方法, logGroups为拉取到的日志 public String process(List<LogData> logGroups, ILogConsumerCheckPointTracker checkPointTracker) { for (LogData logData : logGroups) { // logData为您的一条日志,日志内容在Labels属性中。 // Labels为一个JSON,存放您的这个条日志的内容,比如: "log_content": "日志内容" System.out.println("这条日志内容: " + logData.getLabels()); } // 方法的返回值为一个checkPoint // 如果您在处理这批数据的时候, 遇到什么异常或者需要重新获取这一次的数据, 那么 return checkPointTracker.getCurrentCursor(); return null; } @Override // 当调用ClientConsumerWorker的shutdown方法, 会调用此函数, 您可以在此处写一些关闭流程 public void shutdown(ILogConsumerCheckPointTracker checkPointTracker) { try { // 关闭前, 立即保存checkPoint checkPointTracker.saveCheckPoint(true); } catch (LogConsumerCheckPointException e) { e.printStackTrace(); } } }
- 创建消费者实体DemoLogConsumerProcessorFactory.java
import com.huaweicloud.lts.consumer.interfaces.ILogConsumerProcessor; import com.huaweicloud.lts.consumer.interfaces.ILogConsumerProcessorFactory; public class DemoLogConsumerProcessorFactory implements ILogConsumerProcessorFactory { @Override public ILogConsumerProcessor generatorProcessor() { // 通过这个方法会为每个Shard生成一个处理数据的DemoLogConsumerProcessor.java // 您也可以将返回值变成一个单例, 这样处理的话单个消费者所消费的所有Shard数据都会走这个单例流程 return new DemoLogConsumerProcessor(); } }
- 创建TestDemo.java文件。创建消费者并启动消费者线程,该消费者会从指定的logStream中消费数据。
import com.huaweicloud.lts.consumer.ClientConsumerWorker; import com.huaweicloud.lts.consumer.config.LogConsumerConfig; import com.huaweicloud.lts.producer.exception.LogException; public class TestDemo { // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险, 建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全; // 本示例以ak和sk保存在环境变量中为例, 运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK. String ak = System.getenv("HUAWEICLOUD_SDK_AK"); String sk = System.getenv("HUAWEICLOUD_SDK_SK"); // 云日志服务的区域 private static final String TEST_REGION_NAME = "TEST_REGION_NAME"; // 华为云账号的项目ID(project id) private static final String TEST_PROJECT = "TEST_PROJECT"; // LTS的日志组ID private static final String TEST_LOG_GROUP_ID = "TEST_LOG_GROUP_ID"; // LTS的日志流ID private static final String TEST_LOG_STREAM_ID = "TEST_LOG_STREAM_ID"; // 华为云账号的AK private static final String ACCESS_KEY_ID = "ACCESS_KEY_ID"; // 华为云账号的SK private static final String ACCESS_KEY_SECRET = "ACCESS_KEY_SECRET"; // LTS日志流对应的消费组名称, 请您从云日志服务LTS页面创建日志流的消费组 private static final String CONSUMER_GROUP_NAME = "CONSUMER_GROUP_NAME"; // 启动消费者数量,由客户根据自身资源状态决定启动多少消费者 private static final Integer CONSUMER_COUNT = 5; // 消费开始时间 private static final Long START_TIME = 1685444710000000000L; public static void main(String[] args) throws LogException, InterruptedException { // 多个消费者可以在多台机器上启动, 会均衡消费一个消费组 // 如下案例, 启动5个消费者开始消费. Thread[] threads = new Thread[CONSUMER_COUNT]; ClientConsumerWorker[] workers = new ClientConsumerWorker[CONSUMER_COUNT]; for (int i = 0; i < CONSUMER_COUNT; i++) { // 构建消费者配置, 参数有必填的:regionName, projectId, logGroupId, logStreamId, ak, sk, consumerGroupName, startTime // 当消费组名称不存在,SDK会自动为日志流创建消费组。 LogConsumerConfig config = new LogConsumerConfig(TEST_REGION_NAME, TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET, CONSUMER_GROUP_NAME, START_TIME); // 构建消费者的工作类 ClientConsumerWorker worker = new ClientConsumerWorker(new DemoLogConsumerProcessorFactory(), config); threads[i] = new Thread(worker); workers[i] = worker; } for (int i = 0; i < CONSUMER_COUNT; i++) { // 启动消费者, ClientConsumerWorker实现了Runnable接口, Thread启动后, 内置的消费任务会自动运行 threads[i].start(); } Thread.sleep(60 * 60 * 1000); for (int i = 0; i < CONSUMER_COUNT; i++) { // 调用ClientConsumerWorker的shutdown方法, 安全的关闭消费者, 消费者中启动的内置线程也会自动停止 workers[i].shutdown(); } // 调用ClientConsumerWorker的shutdown方法后, 由于消费者内置多个异步任务, 建议停止1分钟再关闭整个服务, 目的就是让消费者完成后台的异步任务, 安全的退出 // 如果消费者突然关闭, 没有调用shutdown方法; 或者调用shutdown方法之后, 没有等待一定的时间. 那么可能造成下次消费时, 会有一定的重复数据, 因为消费者后台的异步任务没有保存checkPoint点 Thread.sleep(60 * 1000); } }
按照不同的消费需求,创建不同的LogConsumerConfig
- 指定消费开始时间,不指定结束时间。
// 设置batchSize = 500,即每次拉取500条。 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", 1685444710000000000L, 500); // 如果需要按照日志组/日志流名称消费 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_NAME","TEST_LOG_STREAM_NAME", true ,"ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", 1685444710000000000L, 500);
- 指定消费开始时间,且指定结束时间。
// 设置batchSize = 500,即每次拉取500条 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME",1685444710000000000L, 1685445470192043318L, 500); // 如果需要按照日志组/日志流名称消费 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_NAME","TEST_LOG_STREAM_NAME", true ,"ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME",1685444710000000000L, 1685445470192043318L, 500);
- 指定ConsumerPosition(BEGIN_CURSOR、END_CURSOR)
// 指定 BEGIN_CURSOR 开始消费 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", "ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", ConsumerPosition.BEGIN_CURSOR); // 指定 END_CURSOR 开始消费 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", "ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", ConsumerPosition.END_CURSOR);
- 支持使用临时AK、SK、STS-Token消费数据(若使用临时STS-Token, 则AK、SK需要填写与STS-Token配套的临时AK、SK)。
- 创建STS-Token更新类DemoLogConsumerSTSToken.java (由于临时凭证会存在过期,为了不影响您的消费业务,需要实时更新临时凭证)
import com.huaweicloud.lts.consumer.interfaces.ILogConsumerSTSToken; import com.huaweicloud.lts.producer.STSTokenConfig; public class DemoLogConsumerSTSToken implements ILogConsumerSTSToken { @Override // SDK会定期从此方法中获取临时AK, 临时SK, 临时securityToken. 如果临时认证信息有变化, 在此方法中实现即可 public STSTokenConfig getSTSTokenConfig() { String accessKeyId = ""; String accessKeySecret = ""; String securityToken = ""; return new STSTokenConfig(accessKeyId, accessKeySecret, securityToken); } }
- 创建LogConsumerConfig
// 指定消费开始时间,不指定结束时间 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", new DemoLogConsumerSTSToken("ACCESS_KEY_ID","ACCESS_KEY_SECRET","STS-TOKEN"), "CONSUMER_GROUP_NAME", 1685444710000000000L); // 指定消费开始时间,且指定结束时间 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", new DemoLogConsumerSTSToken("ACCESS_KEY_ID","ACCESS_KEY_SECRET","STS-TOKEN"), "CONSUMER_GROUP_NAME", 1685444710000000000L, 1685445470192043318L); // 指定 BEGIN_CURSOR LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", new DemoLogConsumerSTSToken("ACCESS_KEY_ID","ACCESS_KEY_SECRET","STS-TOKEN"), "CONSUMER_GROUP_NAME", ConsumerPosition.BEGIN_CURSOR); // 指定 END_CURSOR LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", new DemoLogConsumerSTSToken("ACCESS_KEY_ID","ACCESS_KEY_SECRET","STS-TOKEN"), "CONSUMER_GROUP_NAME", ConsumerPosition.END_CURSOR);
- 创建STS-Token更新类DemoLogConsumerSTSToken.java (由于临时凭证会存在过期,为了不影响您的消费业务,需要实时更新临时凭证)
- 开启跨云消费日志。此功能仅适用于开发调测,对于消费性能不做保障
LogConsumerConfig config = new LogConsumerConfig(....); config.setEnableLocalTest(true);
- 开启消费历史数据。
LogConsumerConfig config = new LogConsumerConfig(....); config.setHistoryQuery(true);
-
LogConsumerConfig config = new LogConsumerConfig(....); config.setProxyIp("127.0.0.1:8102");
获取日志组和日志流ID
- 日志组ID:登录云日志服务页面,选择“日志管理”,鼠标悬浮在日志组名称上,可查看日志组名称和日志组ID。
- 日志流ID:单击日志组名称对应的
按钮,鼠标悬浮在日志流名称上,可查看日志流名称和日志流ID。