更新时间:2024-08-23 GMT+08:00
分享

使用Java SDK管理消费组

消费组创建完成后,您可以使用集成云日志服务Java SDK通过消费组消费数据。

目前此功能在邀测中,暂不支持申请开通。

前提条件

  • 使用云日志SDK前,您需要注册华为云账号,并开通云日志服务。
  • 确认云日志服务的区域,请用户根据所在区域,获取regionName。
  • 获取华为账号的AK/SK
  • 获取华为云账号的项目ID(project id),步骤参考:请参见“我的凭证 > API凭证”。
  • 获取需要上报到LTS的日志组ID、日志流ID、消费组名称。
  • 云日志服务SDK仅支持在华为云ECS主机上使用。
  • 当用户修改权限后,权限信息在一天后生效。

安装SDK

您可以通过以下两种方式安装日志服务Java SDK

方式一:Maven项目中加入依赖项(推荐方式)。

  1. maven构建时,settings.xml文件的mirrors节点需要增加华为外部开源仓。如下:
    <mirror>
        <id>huaweicloud</id>
        <mirrorOf>*</mirrorOf>
        <url>https://repo.huaweicloud.com/repository/maven/</url>
    </mirror>
  2. 您可以在华为外部开源仓中获取日志服务Java SDK依赖的最新版本。
  3. Maven工程中使用日志服务Java SDK,只需在pom.xml中加入相应依赖即可,Maven项目管理工具会自动下载相关JAR包。
    <dependency> 
         <groupId>io.github.huaweicloud</groupId> 
         <artifactId>lts-sdk-common</artifactId> 
         <version>此功能在1.0.2版本以上支持,建议在第2步中获取最新版本</version> 
     </dependency> 
     <dependency> 
         <groupId>io.github.huaweicloud</groupId> 
         <artifactId>lts-sdk-java</artifactId> 
         <version>此功能在1.0.2版本以上支持,建议在第2步中获取最新版本</version> 
     </dependency>

方式二:在项目中直接依赖Java SDK的jar包。

  1. 下载lts-sdk-commonlts-sdk-java包。建议使用1.0.2及以上更高版本,低于1.0.2版本导致功能无法使用。
  2. 请确保您的项目中有以下依赖,因为SDK的jar包中需要该依赖。
    <dependency> 
         <groupId>com.alibaba</groupId> 
         <artifactId>fastjson</artifactId> 
         <version>1.2.83</version> 
     </dependency> 
     <dependency> 
         <groupId>joda-time</groupId> 
         <artifactId>joda-time</artifactId> 
         <version>2.10.14</version> 
     </dependency> 
     <dependency> 
         <groupId>org.apache.commons</groupId> 
         <artifactId>commons-lang3</artifactId> 
         <version>3.11</version> 
     </dependency> 
     <dependency> 
         <groupId>org.apache.httpcomponents</groupId> 
         <artifactId>httpclient</artifactId> 
         <version>4.5.13</version> 
     </dependency> 
     <dependency> 
         <groupId>com.google.guava</groupId> 
         <artifactId>guava</artifactId> 
         <version>30.1.1-jre</version> 
     </dependency> 
     <dependency> 
         <groupId>commons-io</groupId> 
         <artifactId>commons-io</artifactId> 
         <version>2.11.0</version> 
     </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>2.0.7</version>
     </dependency>
     <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-simple</artifactId>
         <version>2.0.7</version>
     </dependency>
  3. 以0.6.75版本的IntelliJ IDEA为例:项目中导入JAR
    • IntelliJ IDEA中选择您的工程,选择File > Project Structure
    • Project Structure对话框,左侧单击Modules
    • Dependencies页签,选择+ > JARs or directories
    • Attach Files or Directories对话框中,选中已下载的JAR文件,单击OK

配置参数说明

LogConsumerConfig中配置参数说明:

表1 配置参数说明

参数名称

描述

类型

是否需要填写

默认值

regionName

云日志服务的区域

String

必填

-

projectId

华为云账号的项目ID(project id)

String

必填

-

logGroupId

LTS的日志组ID

String

必填

-

logStreamId

LTS的日志流ID

String

必填

-

accessKeyId

华为云账号的AK

String

必填

-

accessKeySecret

华为云账号的SK

String

必填

-

consumerGroupName

LTS日志流对应的消费组名称

String

必填

-

startTimeNs

消费开始时间,纳秒值

Long

必填

-

endTimeNs

消费结束时间,纳秒值

Long

选填

-

batchSize

每批次消费数量,可选范围[10, 1000]

Integer

选填

1000

  1. startTimeNs(消费开始时间)和endTimeNs(消费结束时间),这里的时间指的是服务端时间。
  2. SDK消费保证最终一致性,即您可以获取到这条日志流的全部内容,但由于时间为服务端时间,所以在获取日志流的过程中,可能导致获取到的日志数量跟LTS页面查询的日志数量不一致。

通过startTimeNs/endTimeNs可以在Shard中定位生命周期内的日志,假设LogStream日志流的生命周期为[begin_time,end_time),startTimeNs/endTimeNs=from_time,因此:

  • 当from_time ≤ begin_time or from_time = "begin"时:返回时间点为begin_time对应的Cursor位置。
  • 当from_time ≥ end_time or from_time = "end"时:返回当前时间点下一条将被写入的Cursor位置(当前该Cursor位置上无数据)。
  • 当from_time > begin_time and from_time < end_time时:返回第一个服务端接收时间大于等于from_time的数据包对应的Cursor。

消费数据示例代码

  • 创建TestDemo.java
    import com.huaweicloud.lts.consumer.ClientConsumerWorker;
    import com.huaweicloud.lts.consumer.config.LogConsumerConfig;
    import com.huaweicloud.lts.producer.exception.LogException;
    public class TestDemo {
    /* 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险, 建议在配置文件或者环境变量中密文存放, 使用时解密, 确保安全;
    本示例以ak和sk保存在环境变量中为例, 运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK. */
    String ak = System.getenv("HUAWEICLOUD_SDK_AK");
    String sk = System.getenv("HUAWEICLOUD_SDK_SK");
    // 云日志服务的区域
    private static final String TEST_REGION_NAME = "TEST_REGION_NAME";
    // 华为云账号的项目ID(project id)
    private static final String TEST_PROJECT = "TEST_PROJECT";
    // LTS的日志组ID
    private static final String TEST_LOG_GROUP_ID = "TEST_LOG_GROUP_ID";
    // LTS的日志流ID
    private static final String TEST_LOG_STREAM_ID = "TEST_LOG_STREAM_ID";
    // 华为云账号的AK
    private static final String ACCESS_KEY_ID = "ACCESS_KEY_ID";
    // 华为云账号的SK
    private static final String ACCESS_KEY_SECRET = "ACCESS_KEY_SECRET";
    // LTS日志流对应的消费组名称, 请您从云日志服务LTS页面创建日志流的消费组
    private static final String CONSUMER_GROUP_NAME = "CONSUMER_GROUP_NAME";
    // 启动消费者数量,由客户根据自身资源状态决定启动多少消费者
    private static final Integer CONSUMER_COUNT = 5;
    // 消费开始时间
    private static final Long START_TIME = 1685444710000000000L;
    public static void main(String[] args) throws LogException, InterruptedException {
        // 多个消费者可以在多台机器上启动, 会均衡消费一个消费组
        // 如下案例, 启动5个消费者开始消费.
        Thread[] threads = new Thread[CONSUMER_COUNT];
        ClientConsumerWorker[] workers = new ClientConsumerWorker[CONSUMER_COUNT];
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            // 构建消费者配置, 参数有必填的:regionName, projectId, logGroupId, logStreamId, ak, sk, consumerGroupName, startTime
            LogConsumerConfig config = new LogConsumerConfig(TEST_REGION_NAME, TEST_PROJECT, TEST_LOG_GROUP_ID,
                TEST_LOG_STREAM_ID, ACCESS_KEY_ID, ACCESS_KEY_SECRET, CONSUMER_GROUP_NAME, START_TIME);
            // 构建消费者的工作类
            ClientConsumerWorker worker = new ClientConsumerWorker(new DemoLogConsumerProcessorFactory(), config);
            threads[i] = new Thread(worker);
            workers[i] = worker;
        }
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            // 启动消费者, ClientConsumerWorker实现了Runnable接口, Thread启动后, 内置的消费任务会自动运行
            threads[i].start();
        }
        Thread.sleep(60 * 60 * 1000);
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            // 调用ClientConsumerWorker的shutdown方法, 安全的关闭消费者, 消费者中启动的内置线程也会自动停止
            workers[i].shutdown();
        }
            // 调用ClientConsumerWorker的shutdown方法后, 由于消费者内置多个异步任务, 建议停止1分钟在关闭整个服务, 目的就是让消费者完成后台的异步任务, 安全的退出
           // 如果消费者突然关闭, 没有调用shutdown方法; 或者调用shutdown方法之后, 没有等待一定的时间. 那么可能造成下次消费时, 会有一定的重复数据, 因为消费者后台的异步任务没有保存checkPoint点
        Thread.sleep(60 * 1000);
        }
    }
  • 创建DemoLogConsumerProcessor.java

    首先需要创建DemoLogConsumerProcessor的工厂类,即DemoLogConsumerProcessorFactory.jav

    import com.huaweicloud.lts.consumer.interfaces.ILogConsumerProcessor;
    import com.huaweicloud.lts.consumer.interfaces.ILogConsumerProcessorFactory;
    public class DemoLogConsumerProcessorFactory implements ILogConsumerProcessorFactory {
    @Override
    public ILogConsumerProcessor generatorProcessor() {
        // 通过这个方法会为每个Shard生成一个处理数据的DemoLogConsumerProcessor.java
        // 您也可以将返回值变成一个单例, 这样处理的话单个消费者所消费的所有Shard数据都会走这个单例流程
        return new DemoLogConsumerProcessor();
        }
    }

    然后创建DemoLogConsumerProcessor.java

    import java.util.List;
    import com.huaweicloud.lts.consumer.interfaces.ILogConsumerCheckPointTracker;
    import com.huaweicloud.lts.consumer.interfaces.ILogConsumerProcessor;
    import com.huaweicloud.lts.producer.ProjectConfig;
    import com.huaweicloud.lts.producer.exception.LogConsumerCheckPointException;
    import com.huaweicloud.lts.producer.model.consumer.LogData;
    public class DemoLogConsumerProcessor implements ILogConsumerProcessor {
    @Override
    // 这个方法给您回调返回的ShardId, 是告诉您当前这个shard-consumer在消费那个shard
    public void initialize(String shardId) {
    }
    @Override
    // 数据处理方法, logGroups为拉取到的日志
    public String process(List<LogData> logGroups, ILogConsumerCheckPointTracker checkPointTracker) {
        for (LogData logData : logGroups) {
            // logData为您的一条日志,日志内容在Labels属性中。
            // Labels为一个JSON,存放您的这个条日志的内容,比如: "log_content": "日志内容"
            System.out.println("这条日志内容: " + logData.getLabels());
        }
    // 方法的返回值为一个checkPoint
    // 如果您在处理这批数据的时候, 遇到什么异常或者说想重新获取这一次的数据, 那么 return checkPointTracker.getCurrentCursor();
    return null;
    }
    @Override
    // 当调用ClientConsumerWorker的shutdown方法, 会调用此函数, 您可以在此处写一些关闭流程
    public void shutdown(ILogConsumerCheckPointTracker checkPointTracker) {
        try {
            // 关闭前, 立即保存checkPoint
            checkPointTracker.saveCheckPoint(true);
        } catch (LogConsumerCheckPointException e) {
            e.printStackTrace();
        }
    }
    }

按照不同的消费需求,请使用不同的构造方法

  1. 指定消费开始时间,不指定结束时间。

    例如StartTimeNs = 0L,即从头开始一直拉取到当前最新。如果还有新的日志产生就一直拉取。

    例如StartTimeNs = 1685444710000000000L,即从1685444710000000000纳秒所代表的时间开始拉取,一直拉取到当前最新。如果还有新的日志产生就一直拉取。

    // 不设置batchSize,即每次拉取1000条
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", 1685444710000000000L);
    // 设置batchSize = 500,即每次拉取500条
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", 1685444710000000000L, 500);
    /* 如果需要按照日志组/日志流名称消费,此功能仅1.0.3版本以上才支持
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_NAME","TEST_LOG_STREAM_NAME", true ,"ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME", 1685444710000000000L, 500); */
  2. 指定消费开始时间,且指定结束时间。

    例如StartTimeNs = 0L,EndTimeNs=1685444710000000000L。即从头开始一直拉取到EndTimeNs所代表的时间。不会拉取这段时间范围以外的数据。

    例如StartTimeNs = 1685444710000000000L,EndTimeNs=1685445470192043318L。即从StartTimeNs所代表的时间开始一直拉取到EndTimeNs所代表的时间。不会拉取这段时间范围以外的数据。

    // 不设置batchSize,即每次拉取1000条
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME",1685444710000000000L, 1685445470192043318L);
    // 设置batchSize = 500,即每次拉取500条
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_ID","TEST_LOG_STREAM_ID","ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME",1685444710000000000L, 1685445470192043318L, 500);
    /* 如果需要按照日志组/日志流名称消费,此功能仅1.0.3版本以上才支持
    LogConsumerConfig config = new LogConsumerConfig("TEST_REGION_NAME", "TEST_PROJECT","TEST_LOG_GROUP_NAME","TEST_LOG_STREAM_NAME", true ,"ACCESS_KEY_ID","ACCESS_KEY_SECRET","CONSUMER_GROUP_NAME",1685444710000000000L, 1685445470192043318L, 500); */

获取日志组和日志流ID

  • 日志组ID:登录云日志服务页面,选择“日志管理”,鼠标悬浮在日志组名称上,可查看日志组名称和日志组ID。

  • 日志流ID单击日志组名称对应的按钮,鼠标悬浮在日志流名称上,可查看日志流名称和日志流ID

相关文档