更新时间:2025-07-23 GMT+08:00
分享

使用Python SDK管理消费组

消费组创建完成后,您可以使用集成云日志服务Python SDK通过消费组消费数据。

目前此功能在邀测中,暂不支持申请开通。

约束限制

  • Python SDK适用于python 3.10.1及以上版本。
  • 消费历史日志时,只支持开通公测白名单时间点后的日志。

前提条件

  • 使用云日志SDK前,您需要注册华为云账号,并开通云日志服务。
  • 确认云日志服务的区域,请用户根据所在区域,获取regionName。
  • 获取华为账号的AK/SK
  • 获取华为云账号的项目ID(project id),步骤参考:请参见“我的凭证 > API凭证”。
  • 获取需要上报到LTS的日志组ID、日志流ID、消费组名称。
  • 云日志服务SDK仅支持在华为云ECS主机上使用。
  • 当用户修改权限后,权限信息在一天后生效。

操作步骤

  1. 获取LTS Python SDK包源码。

    git clone https://gitee.com/lordstar-habile/huaweicloud-lts-python-sdk.git

  2. 安装相关依赖。

    pip3 install requests
    pip3 install loguru
    pip3 install six

配置参数说明

LogConsumerConfig中配置参数说明:

表1 配置参数说明

参数名称

描述

类型

是否需要填写

默认值

region

云日志服务的区域。

string

必填

-

projectId

华为云账号的项目ID(project id)。

string

必填

-

logGroup

LTS的日志组ID。

string

必填

-

logStream

LTS的日志流ID。

string

必填

-

ak

华为云账号的AK。

string

必填

-

sk

华为云账号的SK。

string

必填

-

consumer_group_name

LTS日志流对应的消费组名称。

string

必填

-

start_time

消费开始时间,单位是纳秒。

time.Time

必填

-

end_time

消费结束时间,单位是纳秒。

time.Time

选填

-

consume_batch_size

每批次消费数量。

int

选填

1000

consumer_count

消费者的数量。

int

选填

1

  1. startTimeNs(消费开始时间)和endTimeNs(消费结束时间),这里的时间指的是服务端时间。
  2. SDK消费保证最终一致性,即您可以获取到这条日志流的全部内容,但由于时间为服务端时间,所以在获取日志流的过程中,可能导致获取到的日志数量跟LTS页面查询的日志数量不一致。

通过startTimeNs/endTimeNs可以在Shard中定位生命周期内的日志,假设LogStream日志流的生命周期为[begin_time,end_time),startTimeNs/endTimeNs=from_time,因此:

  • 当from_time ≤ begin_time or from_time = "begin"时:返回时间点为begin_time对应的Cursor位置。
  • 当from_time ≥ end_time or from_time = "end"时:返回当前时间点下一条将被写入的Cursor位置(当前该Cursor位置上无数据)。
  • 当from_time > begin_time and from_time < end_time时:返回第一个服务端接收时间大于等于from_time的数据包对应的Cursor。

消费数据示例代码

import argparse
import sys
import time
from typing import List
from loguru import logger
from consumer.core.client_consumer_worker import get_client_consumer_worker
from consumer.interface.consumer_check_point_tracker import ILogConsumerCheckPointTracker
from consumer.interface.consumer_processor import ILogConsumerProcessor
from consumer.interface.consumer_processor_factory import ILogConsumerProcessorFactory
from consumer.model.consumer_config import ConsumerConfig
from consumer.model.log_data import LogData
class DemoLogConsumerProcessor(ILogConsumerProcessor):
    def __init__(self):
        self.log_count = 0
    def initialize(self, shard_id):
        pass
    def process(self, log_group: List[LogData], tracker: ILogConsumerCheckPointTracker) -> str:
        self.log_count += len(log_group)
        logger.info("this time consume log {} total log num {}", len(log_group), self.log_count)
    def shutdown(self, tracker: ILogConsumerCheckPointTracker):
        return tracker.save_check_point(True)
class DemoLogConsumerProcessorFactory(ILogConsumerProcessorFactory):
    def generate_processor(self):
        return DemoLogConsumerProcessor()
def start_consume(endpoint, region_name, project_id, log_group_id, log_stream_id, access_key, access_secret,
                  consumer_group_name, start_time, end_time, log_level, is_test, consumer_count, consume_batch_size):
    logger.remove()
    logger.add(sys.stderr, level=log_level)
    if is_test:
        logger.add("/fanxin/log.log", level="DEBUG")
    logger.info("start time {}, end time {}", start_time, end_time)
    workers = []
    for i in range(consumer_count):
        config = ConsumerConfig(endpoint, region_name, project_id, log_group_id, log_stream_id, access_key,
                                access_secret, consumer_group_name, start_time, end_time, consume_batch_size)
        work = get_client_consumer_worker(DemoLogConsumerProcessorFactory(), config)
        if work is None:
            logger.error("get work error, work is none")
        else:
            workers.append(work)
    for work in workers:
        work.start()
    time.sleep(30000 * 60)
    for work in workers:
        work.shutdown()
    time.sleep(60)
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='lts python sdk consume log help tester information')
    parser.add_argument('--endpoint', dest='endpoint', type=str, help='lts endpoint', default="https://endpoint")
    parser.add_argument('--project_id', dest='project_id', type=str, help='huaweicloud project_id', default="projectId")
    parser.add_argument('--region', dest='region', type=str, help='huaweicloud region', default="region")
    parser.add_argument('--group_id', dest='group_id', type=str, help='send log group_id', default="group_id")
    parser.add_argument('--stream_id', dest='stream_id', type=str, help='send log stream_id', default="stream_id")
    parser.add_argument('--ak', dest='ak', type=str, help='huaweicloud ak', default="ak")
    parser.add_argument('--sk', dest='sk', type=str, help='huaweicloud sk', default="sk")
    parser.add_argument('--consumer_count', dest='consumer_count', type=int, help='consumer count', default=1)
    parser.add_argument('--consume_batch_size', dest='consume_batch_size', type=int, help='each fetch max log nums',
                        default=1000)
    parser.add_argument('--consumer_group_name', dest='consumer_group_name', type=str, help='consumer group name',
                        default="consumer_group_name")
    parser.add_argument('--start_time', dest='start_time', type=int, help='consume log start time ms',
                        default=213123123123)
    parser.add_argument('--end_time', dest='end_time', type=int, help='consume log end time ms', default=0)
    parser.add_argument('--log_level', dest='log_level', type=str, help='consume log end time ms', default="INFO")
    parser.add_argument('--is_test', dest='is_test', type=bool, help='test consumer will write log to node',
                        default=True)
    args = parser.parse_args()
    start_consume(args.endpoint, args.region, args.project_id, args.group_id, args.stream_id, args.ak,
                  args.sk, args.consumer_group_name, args.start_time, args.end_time, args.log_level, args.is_test,
                  args.consumer_count, args.consume_batch_size)

获取日志组和日志流ID

  • 日志组ID:登录云日志服务页面,选择“日志管理”,鼠标悬浮在日志组名称上,可查看日志组名称和日志组ID。
  • 日志流ID:单击日志组名称对应的按钮,鼠标悬浮在日志流名称上,可查看日志流名称和日志流ID。

相关文档