更新时间:2025-07-09 GMT+08:00
分享

使用Java SDK管理消费组

消费组创建完成后,您可以使用集成云日志服务Java SDK通过消费组消费数据。

目前此功能在邀测中,暂不支持申请开通。

前提条件

  • 使用云日志SDK前,您需要注册华为云账号,并开通云日志服务。
  • 确认云日志服务的区域,请用户根据所在区域,获取regionName。
  • 获取华为账号的AK/SK
  • 获取华为云账号的项目ID(project id),步骤参考:请参见“我的凭证 > API凭证”。
  • 获取需要上报到LTS的日志组ID、日志流ID、消费组名称。
  • 云日志服务SDK仅支持在华为云ECS主机上使用。
  • 当用户修改权限后,权限信息在一天后生效。

安装SDK

在Maven项目中加入依赖项。

  1. maven构建时,settings.xml文件的mirrors节点需要增加华为外部开源仓。如下:
    <mirror>
        <id>huaweicloud</id>
        <mirrorOf>*</mirrorOf>
        <url>https://repo.huaweicloud.com/repository/maven/</url>
    </mirror>
  2. 您可以在华为外部开源仓中获取日志服务Java SDK依赖的最新版本。
  3. 在Maven工程中使用日志服务Java SDK,只需在pom.xml中加入相应依赖即可,Maven项目管理工具会自动下载相关JAR包。
    推荐使用最新版本version:1.1.3,第2步中查看SDK版本。
    <dependency> 
         <groupId>io.github.huaweicloud</groupId> 
         <artifactId>lts-sdk-common</artifactId> 
         <version>version</version> 
     </dependency> 
     <dependency> 
         <groupId>io.github.huaweicloud</groupId> 
         <artifactId>lts-sdk-java</artifactId> 
         <version>version</version> 
     </dependency>

配置参数说明

LogConsumerConfig中配置参数说明:

表1 配置参数说明

参数名称

描述

类型

是否需要填写

默认值

regionName

云日志服务的区域

String

必填

-

projectId

华为云账号的项目ID(project id)

String

必填

-

logGroupId

LTS的日志组ID

String

必填

-

logStreamId

LTS的日志流ID

String

必填

-

accessKeyId

华为云账号的AK

String

必填

-

accessKeySecret

华为云账号的SK

String

必填

-

consumerGroupName

LTS日志流对应的消费组名称

String

必填

-

startTimeNs

消费开始时间,纳秒值

Long

必填

-

endTimeNs

消费结束时间,纳秒值

Long

选填

-

batchSize

每批次消费数量,可选范围[10, 1000]

Integer

选填

1000

consumerPosition

BEGIN_CURSOR:消费组从头开始消费日志,起始消费位点为logStream中的第一条日志。

END_CURSOR: 此消费位点记录logStream日志的最后一条日志之后。

(SDK版本最低必须为1.0.7支持此功能)

ConsumerPosition枚举:BEGIN_CURSOR、END_CURSOR

选填

-

enableLocalTest

是否开启跨云消费日志(SDK版本最低必须为1.1.0支持此功能)。此功能仅适用于开发调测,对于消费性能不做保障。

  • 选true时,开启跨云后用户能通过公网(华东-上海一)访问。
  • 选false时,关闭跨云后用户是通过华为云当前区域主机访问。

boolean

选填

false

historyQuery

是否消费历史数据(SDK版本最低必须为1.1.0支持此功能)。

  • 选true时,可以消费开启日志流引擎之前的数据。
  • 选false时,只能消费当开启了日志流引擎之后的数据。

boolean

选填

false

proxyIp

使用自定义ip端口上报日志。

String

选填

-

  1. startTimeNs(消费开始时间)和endTimeNs(消费结束时间),这里的时间指的是服务端时间。
  2. SDK消费保证最终一致性,即您可以获取到这条日志流的全部内容,但由于时间为服务端时间,所以在获取日志流的过程中,可能导致获取到的日志数量跟LTS页面查询的日志数量不一致。

通过startTimeNs/endTimeNs可以在Shard中定位生命周期内的日志,假设LogStream日志流的生命周期为[begin_time,end_time),startTimeNs/endTimeNs=from_time,因此:

  • 当from_time ≤ begin_time or from_time = "begin"时:返回时间点为begin_time对应的Cursor位置。
  • 当from_time ≥ end_time or from_time = "end"时:返回当前时间点下一条将被写入的Cursor位置(当前该Cursor位置上无数据)。
  • 当from_time > begin_time and from_time < end_time时:返回第一个服务端接收时间大于等于from_time的数据包对应的Cursor。

消费数据示例代码

  • 创建消费者逻辑代码DemoLogConsumerProcessor.java
    public class DemoLogConsumerProcessor implements ILogConsumerProcessor {
        @Override
        // 这个方法给您回调返回的ShardId, 是告诉您当前这个shard-consumer在消费那个shard
        public void initialize(String shardId) {
        }
    
        @Override
        // 数据处理方法, logGroups为拉取到的日志
        public String process(List<LogData> logGroups, ILogConsumerCheckPointTracker checkPointTracker) {
            for (LogData logData : logGroups) {
                // logData为您的一条日志,日志内容在Labels属性中。
                // Labels为一个JSON,存放您的这个条日志的内容,比如: "log_content": "日志内容"
                System.out.println("这条日志内容: " + logData.getLabels());
            }
            // 方法的返回值为一个checkPoint
            // 如果您在处理这批数据的时候, 遇到什么异常或者需要重新获取这一次的数据, 那么 return checkPointTracker.getCurrentCursor();
            return null;
        }
    
        @Override
        // 当调用ClientConsumerWorker的shutdown方法, 会调用此函数, 您可以在此处写一些关闭流程
        public void shutdown(ILogConsumerCheckPointTracker checkPointTracker) {
            try {
                // 关闭前, 立即保存checkPoint
                checkPointTracker.saveCheckPoint(true);
            } catch (LogConsumerCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
  • 创建消费者实体DemoLogConsumerProcessorFactory.java
    import com.huaweicloud.lts.consumer.interfaces.ILogConsumerProcessor;
    import com.huaweicloud.lts.consumer.interfaces.ILogConsumerProcessorFactory;
    
    public class DemoLogConsumerProcessorFactory implements ILogConsumerProcessorFactory {
        @Override
        public ILogConsumerProcessor generatorProcessor() {
            // 通过这个方法会为每个Shard生成一个处理数据的DemoLogConsumerProcessor.java
            // 您也可以将返回值变成一个单例, 这样处理的话单个消费者所消费的所有Shard数据都会走这个单例流程
            return new DemoLogConsumerProcessor();
        }
    }
  • 创建TestDemo.java文件。创建消费者并启动消费者线程,该消费者会从指定的logStream中消费数据。
    import com.huaweicloud.lts.consumer.ClientConsumerWorker;
    import com.huaweicloud.lts.consumer.config.LogConsumerConfig;
    import com.huaweicloud.lts.producer.exception.LogException;
    
    public class TestDemo {
        // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险, 建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全;
        // 本示例以ak和sk保存在环境变量中为例, 运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK.
        String ak = System.getenv("HUAWEICLOUD_SDK_AK");
    
        String sk = System.getenv("HUAWEICLOUD_SDK_SK");
    
        // 云日志服务的区域
        private static final String TEST_REGION_NAME = "TEST_REGION_NAME";
    
        // 华为云账号的项目ID(project id)
        private static final String TEST_PROJECT = "TEST_PROJECT";
    
        // LTS的日志组ID
        private static final String TEST_LOG_GROUP_ID = "TEST_LOG_GROUP_ID";
    
        // LTS的日志流ID
        private static final String TEST_LOG_STREAM_ID = "TEST_LOG_STREAM_ID";
    
        // 华为云账号的AK
        private static final String ACCESS_KEY_ID = "ACCESS_KEY_ID";
    
        // 华为云账号的SK
        private static final String ACCESS_KEY_SECRET = "ACCESS_KEY_SECRET";
    
        // LTS日志流对应的消费组名称, 请您从云日志服务LTS页面创建日志流的消费组
        private static final String CONSUMER_GROUP_NAME = "CONSUMER_GROUP_NAME";
    
        // 启动消费者数量,由客户根据自身资源状态决定启动多少消费者
        private static final Integer CONSUMER_COUNT = 5;
    
        // 消费开始时间
        private static final Long START_TIME = 1685444710000000000L;
    
        public static void main(String[] args) throws LogException, InterruptedException {
            // 多个消费者可以在多台机器上启动, 会均衡消费一个消费组
            // 如下案例, 启动5个消费者开始消费.
            Thread[] threads = new Thread[CONSUMER_COUNT];
            ClientConsumerWorker[] workers = new ClientConsumerWorker[CONSUMER_COUNT];
            for (int i = 0; i < CONSUMER_COUNT; i++) {
                // 构建消费者配置, 参数有必填的:regionName, projectId, logGroupId, logStreamId, ak, sk, consumerGroupName, startTime
                // 当消费组名称不存在,SDK会自动为日志流创建消费组。
                LogConsumerConfig config = new LogConsumerConfig(TEST_REGION_NAME, TEST_PROJECT, TEST_LOG_GROUP_ID,
                    TEST_LOG_STREAM_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET, CONSUMER_GROUP_NAME, START_TIME);
                // 构建消费者的工作类
                ClientConsumerWorker worker = new ClientConsumerWorker(new DemoLogConsumerProcessorFactory(), config);
                threads[i] = new Thread(worker);
                workers[i] = worker;
            }
            for (int i = 0; i < CONSUMER_COUNT; i++) {
                // 启动消费者, ClientConsumerWorker实现了Runnable接口, Thread启动后, 内置的消费任务会自动运行
                threads[i].start();
            }
            Thread.sleep(60 * 60 * 1000);
            for (int i = 0; i < CONSUMER_COUNT; i++) {
                // 调用ClientConsumerWorker的shutdown方法, 安全的关闭消费者, 消费者中启动的内置线程也会自动停止
                workers[i].shutdown();
            }
            // 调用ClientConsumerWorker的shutdown方法后, 由于消费者内置多个异步任务, 建议停止1分钟再关闭整个服务, 目的就是让消费者完成后台的异步任务, 安全的退出
            // 如果消费者突然关闭, 没有调用shutdown方法; 或者调用shutdown方法之后, 没有等待一定的时间. 那么可能造成下次消费时, 会有一定的重复数据, 因为消费者后台的异步任务没有保存checkPoint点
            Thread.sleep(60 * 1000);
        }
    }

按照不同的消费需求,创建不同的LogConsumerConfig

  1. 指定消费开始时间,不指定结束时间。
    // 设置batchSize = 500,即每次拉取500条。
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", 1685444710000000000L, 500);
    
    // 如果需要按照日志组/日志流名称消费
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_NAME","TEST_LOG_STREAM_NAME", true ,"ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", 1685444710000000000L, 500); 
  2. 指定消费开始时间,且指定结束时间。
    // 设置batchSize = 500,即每次拉取500条
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME",1685444710000000000L, 1685445470192043318L, 500);
    
    // 如果需要按照日志组/日志流名称消费
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_NAME","TEST_LOG_STREAM_NAME", true ,"ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME",1685444710000000000L, 1685445470192043318L, 500); 
  3. 指定ConsumerPosition(BEGIN_CURSOR、END_CURSOR)
    // 指定 BEGIN_CURSOR 开始消费
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", "ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", ConsumerPosition.BEGIN_CURSOR);
    
    // 指定 END_CURSOR 开始消费
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", "ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", ConsumerPosition.END_CURSOR);
  4. 支持使用临时AK、SK、STS-Token消费数据(若使用临时STS-Token, 则AK、SK需要填写与STS-Token配套的临时AK、SK)。
    1. 创建STS-Token更新类DemoLogConsumerSTSToken.java (由于临时凭证会存在过期,为了不影响您的消费业务,需要实时更新临时凭证)
      import com.huaweicloud.lts.consumer.interfaces.ILogConsumerSTSToken;
      import com.huaweicloud.lts.producer.STSTokenConfig;
      
      public class DemoLogConsumerSTSToken implements ILogConsumerSTSToken {
      
          @Override
          // SDK会定期从此方法中获取临时AK, 临时SK, 临时securityToken. 如果临时认证信息有变化, 在此方法中实现即可
          public STSTokenConfig getSTSTokenConfig() {
              String accessKeyId = "";
              String accessKeySecret = "";
              String securityToken = "";
              return new STSTokenConfig(accessKeyId, accessKeySecret, securityToken);
          }
      }
    2. 创建LogConsumerConfig
      // 指定消费开始时间,不指定结束时间
      LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", new DemoLogConsumerSTSToken("ACCESS_KEY_ID","ACCESS_KEY_SECRET","STS-TOKEN"), "CONSUMER_GROUP_NAME", 1685444710000000000L);
      
      // 指定消费开始时间,且指定结束时间
      LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", new DemoLogConsumerSTSToken("ACCESS_KEY_ID","ACCESS_KEY_SECRET","STS-TOKEN"), "CONSUMER_GROUP_NAME", 1685444710000000000L, 1685445470192043318L);
      
      // 指定 BEGIN_CURSOR
      LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", new DemoLogConsumerSTSToken("ACCESS_KEY_ID","ACCESS_KEY_SECRET","STS-TOKEN"), "CONSUMER_GROUP_NAME", ConsumerPosition.BEGIN_CURSOR);
      
      // 指定 END_CURSOR
      LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID", new DemoLogConsumerSTSToken("ACCESS_KEY_ID","ACCESS_KEY_SECRET","STS-TOKEN"), "CONSUMER_GROUP_NAME", ConsumerPosition.END_CURSOR);
  5. 开启跨云消费日志。此功能仅适用于开发调测,对于消费性能不做保障
    LogConsumerConfig config = new LogConsumerConfig(....);
    config.setEnableLocalTest(true);
  6. 开启消费历史数据。
    LogConsumerConfig config = new LogConsumerConfig(....);
    config.setHistoryQuery(true);
  7. 使用自定义IP消费日志

    LogConsumerConfig config = new LogConsumerConfig(....);
    config.setProxyIp("127.0.0.1:8102");

获取日志组和日志流ID

  • 日志组ID:登录云日志服务页面,选择“日志管理”,鼠标悬浮在日志组名称上,可查看日志组名称和日志组ID。
  • 日志流ID:单击日志组名称对应的按钮,鼠标悬浮在日志流名称上,可查看日志流名称和日志流ID。

相关文档