使用Go SDK管理消费组
消费组创建完成后,您可以使用集成云日志服务Go SDK通过消费组消费数据。

目前此功能在邀测中,暂不支持申请开通。
约束限制
消费历史日志时,只支持开通公测白名单时间点后的日志。
前提条件
- 使用云日志SDK前,您需要注册华为云账号,并开通云日志服务。
- 确认云日志服务的区域,请用户根据所在区域,获取regionName。
- 获取华为账号的AK/SK。
- 获取华为云账号的项目ID(project id),步骤参考:请参见“我的凭证 > API凭证”。
- 获取需要上报到LTS的日志组ID、日志流ID、消费组名称。
- 云日志服务SDK仅支持在华为云ECS主机上使用。
- 当用户修改权限后,权限信息在一天后生效。
操作步骤
您可以通过“在Maven项目中加入依赖项”的方式安装云日志服务Go SDK。
- 获取云日志服务的Go SDK包:
go get github.com/huaweicloud/huaweicloud-lts-sdk-go
- 引用云日志服务的Go SDK包:
import github.com/huaweicloud/huaweicloud-lts-sdk-go
配置参数说明
LogConsumerConfig中配置参数说明:
参数名称 |
描述 |
类型 |
是否需要填写 |
默认值 |
---|---|---|---|---|
RegionName |
云日志服务的区域。 |
string |
必填 |
- |
ProjectId |
华为云账号的项目ID(project id)。 |
string |
必填 |
- |
LogGroupId |
LTS的日志组ID。 |
string |
必填 |
- |
LogStreamId |
LTS的日志流ID。 |
string |
必填 |
- |
AccessKeyId |
华为云账号的AK。 |
string |
必填 |
- |
AccessKeySecret |
华为云账号的SK。 |
string |
必填 |
- |
ConsumerGroupName |
LTS日志流对应的消费组名称。 |
string |
必填 |
- |
StartTimeNs |
消费开始时间,单位是毫秒。 |
time.Time |
必填 |
- |
EndTimeNs |
消费结束时间,单位是毫秒。 |
time.Time |
选填 |
- |
BatchSize |
每批次消费数量,可选范围[10, 1000]。 |
int |
选填 |
1000 |
consumerCount |
消费者的数量。 |
int |
选填 |
1 |
- startTimeNs(消费开始时间)和endTimeNs(消费结束时间),这里的时间指的是服务端时间。
- SDK消费保证最终一致性,即您可以获取到这条日志流的全部内容,但由于时间为服务端时间,所以在获取日志流的过程中,可能导致获取到的日志数量跟LTS页面查询的日志数量不一致。
通过startTimeNs/endTimeNs可以在Shard中定位生命周期内的日志,假设LogStream日志流的生命周期为[begin_time,end_time),startTimeNs/endTimeNs=from_time,因此:
- 当from_time ≤ begin_time or from_time = "begin"时:返回时间点为begin_time对应的Cursor位置。
- 当from_time ≥ end_time or from_time = "end"时:返回当前时间点下一条将被写入的Cursor位置(当前该Cursor位置上无数据)。
- 当from_time > begin_time and from_time < end_time时:返回第一个服务端接收时间大于等于from_time的数据包对应的Cursor。
消费数据示例代码
package main import ( ) var allShardLogCount int64 func init() { allShardLogCount = 0 } "github.com/huaweicloud/huaweicloud-lts-sdk-go/consumer" "github.com/huaweicloud/huaweicloud-lts-sdk-go/producer" "github.com/sirupsen/logrus" "log/slog" "sync/atomic" "time" func ConsumeLog(regionName, projectId, logGroupId, logStreamId, ak, sk, consumerGroupName, logLevel, logDest string, consumeCount, batchSize int, startTime, endTime int64) { // 消费开始时间,括号中填毫秒值 var StartTime time.Time if startTime != 0 { StartTime = time.UnixMilli(startTime) } // 消费结束时间 var EndTime time.Time if endTime != 0 { EndTime = time.UnixMilli(endTime) } var logConfig producer.LogConf if logDest == "file" { logConfig = producer.LogConf{ Dir: "/opt/clouds", Name: "lts-go-sdk.log", Level: logLevel, MaxSize: 100, } producer.InitLoggerFile(logConfig) } else { logConfig = producer.LogConf{ Dir: "", Name: "", Level: logLevel, MaxSize: 100, } producer.InitLoggerStd(logConfig) } slog.Info("region is: ", "region", regionName) slog.Info("projectId is: ", "region", projectId) slog.Info("logGroupId is: ", "logGroupId", logGroupId) slog.Info("ak is: ", "ak", ak) slog.Info("sk is: ", "sk", sk) slog.Info("consumerGroupName is: ", "consumerGroupName", consumerGroupName) slog.Info("consumerCount is: ", "consumerCount", consumeCount) slog.Info("batchSize is: ", "batchSize", batchSize) slog.Info("start time is:", "startTime", StartTime) slog.Info("end time:", "endTime", EndTime, "endTime is Zero", EndTime.IsZero()) workers := make([]*consumer.ClientConsumerWorker, 0) for i := 0; i < consumeCount; i++ { config := consumer.GetConsumerConfig() // 构建消费者配置, 参数有必填的:regionName, projectId, logGroupId, logStreamId, ak, sk, consumerGroupName, startTime config.ProjectId = projectId config.LogGroupId = logGroupId config.LogStreamId = logStreamId config.AccessKeyId = ak config.AccessKeySecret = sk config.BatchSize = batchSize //BatchSize默认值1000 config.StartTimeNs = StartTime config.EndTimeNs = EndTime config.ConsumerGroupName = consumerGroupName config.RegionName = regionName // 构建消费者的工作者 worker := consumer.GetClientConsumerWorker(new(DemoLogConsumerProcessorFactory), config) workers = append(workers, worker) } for _, work := range workers { // 启动消费者, ClientConsumerWorker启动后, 内置的消费任务会自动运行 work.Run() } time.Sleep(30 * time.Minute) for _, work := range workers { // 调用ClientConsumerWorker的shutdown方法, 安全的关闭消费者, 消费者中启动的内置线程也会自动停止 work.Shutdown() } // 调用ClientConsumerWorker的shutdown方法后, 由于消费者内置多个异步任务, 建议停止1分钟在关闭整个服务, 目的就是让消费者完成后台的异步任务, 安全的退出 // 如果消费者突然关闭, 没有调用shutdown方法; 或者调用shutdown方法之后, 没有等待一定的时间. 那么可能造成下次消费时, 会有一定的重复数据, 因为消费者后台的异步任务没有保存checkPoint点 time.Sleep(time.Minute) } type DemoLogConsumerProcessor struct { LogCount int } // Initialize这个方法给您回调返回的ShardId, 是告诉您当前这个shard-consumer在消费那个shard func (processor *DemoLogConsumerProcessor) Initialize(shardId string) { } // Process数据处理方法, logGroups为拉取到的日志 func (processor *DemoLogConsumerProcessor) Process(logGroups []consumer.LogData, checkPointTracker consumer.ILogConsumerCheckPointTracker) string { atomic.AddInt64(&allShardLogCount, int64(len(logGroups))) processor.LogCount = processor.LogCount + len(logGroups) slog.Info("this time process log", "consume log", len(logGroups), "total log num", processor.LogCount) slog.Info("after this consume", "consume log", len(logGroups), "all shard consume total log num", allShardLogCount) logrus.WithField("consume log", len(logGroups)).WithField("total log num", processor.LogCount).Info("this time process log") logrus.WithField("consume log", len(logGroups)).WithField("all shard consume total log num", allShardLogCount).Info("after this consume") //for _, logData := range logGroups { // // logData为您的一条日志,日志内容在Labels属性中。 // // Labels为一个JSON,存放您的这个条日志的内容,比如: "log_content": "日志内容" // fmt.Println(fmt.Sprintf("日志内容:%v", logData.Labels)) //} // 方法的返回值为一个checkPoint // 如果您在处理这批数据的时候, 遇到什么异常或者说想重新获取这一次的数据, 那么 return checkPointTracker.GetCurrentCursor(); return "" } // Shutdown 当调用ClientConsumerWorker的shutdown方法, 会调用此函数, 您可以在此处写一些关闭流程 func (processor *DemoLogConsumerProcessor) Shutdown(checkPointTracker consumer.ILogConsumerCheckPointTracker) error { // 关闭前, 立即保存checkPoint return checkPointTracker.SaveCheckPoint(true) } type DemoLogConsumerProcessorFactory struct { } func (processor *DemoLogConsumerProcessorFactory) GeneratorProcessor() consumer.ILogConsumerProcessor { demoProcessor := new(DemoLogConsumerProcessor) demoProcessor.LogCount = 0 return demoProcessor }
获取日志组和日志流ID
- 日志组ID:登录云日志服务页面,选择“日志管理”,鼠标悬浮在日志组名称上,可查看日志组名称和日志组ID。
- 日志流ID:单击日志组名称对应的
按钮,鼠标悬浮在日志流名称上,可查看日志流名称和日志流ID。