使用Java SDK管理消费组
消费组创建完成后,您可以使用集成云日志服务Java SDK通过消费组消费数据。
目前此功能在邀测中,暂不支持申请开通。
前提条件
- 使用云日志SDK前,您需要注册华为云账号,并开通云日志服务。
- 确认云日志服务的区域,请用户根据所在区域,获取regionName。
- 获取华为账号的AK/SK。
- 获取华为云账号的项目ID(project id),步骤参考:请参见“我的凭证 > API凭证”。
- 获取需要上报到LTS的日志组ID、日志流ID、消费组名称。
- 云日志服务SDK仅支持在华为云ECS主机上使用。
- 当用户修改权限后,权限信息在一天后生效。
安装SDK
您可以通过以下两种方式安装日志服务Java SDK。
方式一:在Maven项目中加入依赖项(推荐方式)。
- maven构建时,settings.xml文件的mirrors节点需要增加华为外部开源仓。如下:
<mirror> <id>huaweicloud</id> <mirrorOf>*</mirrorOf> <url>https://repo.huaweicloud.com/repository/maven/</url> </mirror>
- 您可以在华为外部开源仓中获取日志服务Java SDK依赖的最新版本。
- 在Maven工程中使用日志服务Java SDK,只需在pom.xml中加入相应依赖即可,Maven项目管理工具会自动下载相关JAR包。
<dependency> <groupId>io.github.huaweicloud</groupId> <artifactId>lts-sdk-common</artifactId> <version>此功能在1.0.2版本以上支持,建议在第2步中获取最新版本</version> </dependency> <dependency> <groupId>io.github.huaweicloud</groupId> <artifactId>lts-sdk-java</artifactId> <version>此功能在1.0.2版本以上支持,建议在第2步中获取最新版本</version> </dependency>
方式二:在项目中直接依赖Java SDK的jar包。
- 下载lts-sdk-common和lts-sdk-java包。建议使用1.0.2及以上更高版本,低于1.0.2版本导致功能无法使用。
- 请确保您的项目中有以下依赖,因为SDK的jar包中需要该依赖。
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.83</version> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.10.14</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.11</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.13</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>30.1.1-jre</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.11.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>2.0.7</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>2.0.7</version> </dependency>
- 以0.6.75版本的IntelliJ IDEA为例:项目中导入JAR包
- 在IntelliJ IDEA中选择您的工程,选择File > Project Structure。
- 在Project Structure对话框,左侧单击Modules。
- 在Dependencies页签,选择+ > JARs or directories。
- 在Attach Files or Directories对话框中,选中已下载的JAR文件,单击OK。
配置参数说明
LogConsumerConfig中配置参数说明:
参数名称 |
描述 |
类型 |
是否需要填写 |
默认值 |
---|---|---|---|---|
regionName |
云日志服务的区域 |
String |
必填 |
- |
projectId |
华为云账号的项目ID(project id) |
String |
必填 |
- |
logGroupId |
LTS的日志组ID |
String |
必填 |
- |
logStreamId |
LTS的日志流ID |
String |
必填 |
- |
accessKeyId |
华为云账号的AK |
String |
必填 |
- |
accessKeySecret |
华为云账号的SK |
String |
必填 |
- |
consumerGroupName |
LTS日志流对应的消费组名称 |
String |
必填 |
- |
startTimeNs |
消费开始时间,纳秒值 |
Long |
必填 |
- |
endTimeNs |
消费结束时间,纳秒值 |
Long |
选填 |
- |
batchSize |
每批次消费数量,可选范围[10, 1000] |
Integer |
选填 |
1000 |
- startTimeNs(消费开始时间)和endTimeNs(消费结束时间),这里的时间指的是服务端时间。
- SDK消费保证最终一致性,即您可以获取到这条日志流的全部内容,但由于时间为服务端时间,所以在获取日志流的过程中,可能导致获取到的日志数量跟LTS页面查询的日志数量不一致。
通过startTimeNs/endTimeNs可以在Shard中定位生命周期内的日志,假设LogStream日志流的生命周期为[begin_time,end_time),startTimeNs/endTimeNs=from_time,因此:
- 当from_time ≤ begin_time or from_time = "begin"时:返回时间点为begin_time对应的Cursor位置。
- 当from_time ≥ end_time or from_time = "end"时:返回当前时间点下一条将被写入的Cursor位置(当前该Cursor位置上无数据)。
- 当from_time > begin_time and from_time < end_time时:返回第一个服务端接收时间大于等于from_time的数据包对应的Cursor。
消费数据示例代码
- 创建TestDemo.java
import com.huaweicloud.lts.consumer.ClientConsumerWorker; import com.huaweicloud.lts.consumer.config.LogConsumerConfig; import com.huaweicloud.lts.producer.exception.LogException; public class TestDemo { /* 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险, 建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全; 本示例以ak和sk保存在环境变量中为例, 运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK. */ String ak = System.getenv("HUAWEICLOUD_SDK_AK"); String sk = System.getenv("HUAWEICLOUD_SDK_SK"); // 云日志服务的区域 private static final String TEST_REGION_NAME = "TEST_REGION_NAME"; // 华为云账号的项目ID(project id) private static final String TEST_PROJECT = "TEST_PROJECT"; // LTS的日志组ID private static final String TEST_LOG_GROUP_ID = "TEST_LOG_GROUP_ID"; // LTS的日志流ID private static final String TEST_LOG_STREAM_ID = "TEST_LOG_STREAM_ID"; // 华为云账号的AK private static final String ACCESS_KEY_ID = "ACCESS_KEY_ID"; // 华为云账号的SK private static final String ACCESS_KEY_SECRET = "ACCESS_KEY_SECRET"; // LTS日志流对应的消费组名称, 请您从云日志服务LTS页面创建日志流的消费组 private static final String CONSUMER_GROUP_NAME = "CONSUMER_GROUP_NAME"; // 启动消费者数量,由客户根据自身资源状态决定启动多少消费者 private static final Integer CONSUMER_COUNT = 5; // 消费开始时间 private static final Long START_TIME = 1685444710000000000L; public static void main(String[] args) throws LogException, InterruptedException { // 多个消费者可以在多台机器上启动, 会均衡消费一个消费组 // 如下案例, 启动5个消费者开始消费. Thread[] threads = new Thread[CONSUMER_COUNT]; ClientConsumerWorker[] workers = new ClientConsumerWorker[CONSUMER_COUNT]; for (int i = 0; i < CONSUMER_COUNT; i++) { // 构建消费者配置, 参数有必填的:regionName, projectId, logGroupId, logStreamId, ak, sk, consumerGroupName, startTime LogConsumerConfig config = new LogConsumerConfig(TEST_REGION_NAME, TEST_PROJECT, TEST_LOG_GROUP_ID, TEST_LOG_STREAM_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET, CONSUMER_GROUP_NAME, START_TIME); // 构建消费者的工作类 ClientConsumerWorker worker = new ClientConsumerWorker(new DemoLogConsumerProcessorFactory(), config); threads[i] = new Thread(worker); workers[i] = worker; } for (int i = 0; i < CONSUMER_COUNT; i++) { // 启动消费者, ClientConsumerWorker实现了Runnable接口, Thread启动后, 内置的消费任务会自动运行 threads[i].start(); } Thread.sleep(60 * 60 * 1000); for (int i = 0; i < CONSUMER_COUNT; i++) { // 调用ClientConsumerWorker的shutdown方法, 安全的关闭消费者, 消费者中启动的内置线程也会自动停止 workers[i].shutdown(); } // 调用ClientConsumerWorker的shutdown方法后, 由于消费者内置多个异步任务, 建议停止1分钟在关闭整个服务, 目的就是让消费者完成后台的异步任务, 安全的退出 // 如果消费者突然关闭, 没有调用shutdown方法; 或者调用shutdown方法之后, 没有等待一定的时间. 那么可能造成下次消费时, 会有一定的重复数据, 因为消费者后台的异步任务没有保存checkPoint点 Thread.sleep(60 * 1000); } }
- 创建DemoLogConsumerProcessor.java
首先需要创建DemoLogConsumerProcessor的工厂类,即DemoLogConsumerProcessorFactory.jav
import com.huaweicloud.lts.consumer.interfaces.ILogConsumerProcessor; import com.huaweicloud.lts.consumer.interfaces.ILogConsumerProcessorFactory; public class DemoLogConsumerProcessorFactory implements ILogConsumerProcessorFactory { @Override public ILogConsumerProcessor generatorProcessor() { // 通过这个方法会为每个Shard生成一个处理数据的DemoLogConsumerProcessor.java // 您也可以将返回值变成一个单例, 这样处理的话单个消费者所消费的所有Shard数据都会走这个单例流程 return new DemoLogConsumerProcessor(); } }
然后创建DemoLogConsumerProcessor.java
import java.util.List; import com.huaweicloud.lts.consumer.interfaces.ILogConsumerCheckPointTracker; import com.huaweicloud.lts.consumer.interfaces.ILogConsumerProcessor; import com.huaweicloud.lts.producer.ProjectConfig; import com.huaweicloud.lts.producer.exception.LogConsumerCheckPointException; import com.huaweicloud.lts.producer.model.consumer.LogData; public class DemoLogConsumerProcessor implements ILogConsumerProcessor { @Override // 这个方法给您回调返回的ShardId, 是告诉您当前这个shard-consumer在消费那个shard public void initialize(String shardId) { } @Override // 数据处理方法, logGroups为拉取到的日志 public String process(List<LogData> logGroups, ILogConsumerCheckPointTracker checkPointTracker) { for (LogData logData : logGroups) { // logData为您的一条日志,日志内容在Labels属性中。 // Labels为一个JSON,存放您的这个条日志的内容,比如: "log_content": "日志内容" System.out.println("这条日志内容: " + logData.getLabels()); } // 方法的返回值为一个checkPoint // 如果您在处理这批数据的时候, 遇到什么异常或者说想重新获取这一次的数据, 那么 return checkPointTracker.getCurrentCursor(); return null; } @Override // 当调用ClientConsumerWorker的shutdown方法, 会调用此函数, 您可以在此处写一些关闭流程 public void shutdown(ILogConsumerCheckPointTracker checkPointTracker) { try { // 关闭前, 立即保存checkPoint checkPointTracker.saveCheckPoint(true); } catch (LogConsumerCheckPointException e) { e.printStackTrace(); } } }
按照不同的消费需求,请使用不同的构造方法
- 指定消费开始时间,不指定结束时间。
例如StartTimeNs = 0L,即从头开始一直拉取到当前最新。如果还有新的日志产生就一直拉取。
例如StartTimeNs = 1685444710000000000L,即从1685444710000000000纳秒所代表的时间开始拉取,一直拉取到当前最新。如果还有新的日志产生就一直拉取。
// 不设置batchSize,即每次拉取1000条 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", 1685444710000000000L); // 设置batchSize = 500,即每次拉取500条 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", 1685444710000000000L, 500); /* 如果需要按照日志组/日志流名称消费,此功能仅1.0.3版本以上才支持 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_NAME","TEST_LOG_STREAM_NAME", true ,"ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", 1685444710000000000L, 500); */
- 指定消费开始时间,且指定结束时间。
例如StartTimeNs = 0L,EndTimeNs=1685444710000000000L。即从头开始一直拉取到EndTimeNs所代表的时间。不会拉取这段时间范围以外的数据。
例如StartTimeNs = 1685444710000000000L,EndTimeNs=1685445470192043318L。即从StartTimeNs所代表的时间开始一直拉取到EndTimeNs所代表的时间。不会拉取这段时间范围以外的数据。
// 不设置batchSize,即每次拉取1000条 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME",1685444710000000000L, 1685445470192043318L); // 设置batchSize = 500,即每次拉取500条 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME",1685444710000000000L, 1685445470192043318L, 500); /* 如果需要按照日志组/日志流名称消费,此功能仅1.0.3版本以上才支持 LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_NAME","TEST_LOG_STREAM_NAME", true ,"ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME",1685444710000000000L, 1685445470192043318L, 500); */