使用Python SDK管理消费组
消费组创建完成后,您可以使用集成云日志服务Python SDK通过消费组消费数据。

目前此功能在邀测中,暂不支持申请开通。
约束限制
- Python SDK适用于python 3.10.1及以上版本。
- 消费历史日志时,只支持开通公测白名单时间点后的日志。
前提条件
- 使用云日志SDK前,您需要注册华为云账号,并开通云日志服务。
- 确认云日志服务的区域,请用户根据所在区域,获取regionName。
- 获取华为账号的AK/SK。
- 获取华为云账号的项目ID(project id),步骤参考:请参见“我的凭证 > API凭证”。
- 获取需要上报到LTS的日志组ID、日志流ID、消费组名称。
- 云日志服务SDK仅支持在华为云ECS主机上使用。
- 当用户修改权限后,权限信息在一天后生效。
操作步骤
- 获取LTS Python SDK包源码。
git clone https://gitee.com/lordstar-habile/huaweicloud-lts-python-sdk.git
- 安装相关依赖。
pip3 install requests pip3 install loguru pip3 install six
配置参数说明
LogConsumerConfig中配置参数说明:
参数名称 |
描述 |
类型 |
是否需要填写 |
默认值 |
---|---|---|---|---|
region |
云日志服务的区域。 |
string |
必填 |
- |
projectId |
华为云账号的项目ID(project id)。 |
string |
必填 |
- |
logGroup |
LTS的日志组ID。 |
string |
必填 |
- |
logStream |
LTS的日志流ID。 |
string |
必填 |
- |
ak |
华为云账号的AK。 |
string |
必填 |
- |
sk |
华为云账号的SK。 |
string |
必填 |
- |
consumer_group_name |
LTS日志流对应的消费组名称。 |
string |
必填 |
- |
start_time |
消费开始时间,单位是纳秒。 |
time.Time |
必填 |
- |
end_time |
消费结束时间,单位是纳秒。 |
time.Time |
选填 |
- |
consume_batch_size |
每批次消费数量。 |
int |
选填 |
1000 |
consumer_count |
消费者的数量。 |
int |
选填 |
1 |
- startTimeNs(消费开始时间)和endTimeNs(消费结束时间),这里的时间指的是服务端时间。
- SDK消费保证最终一致性,即您可以获取到这条日志流的全部内容,但由于时间为服务端时间,所以在获取日志流的过程中,可能导致获取到的日志数量跟LTS页面查询的日志数量不一致。
通过startTimeNs/endTimeNs可以在Shard中定位生命周期内的日志,假设LogStream日志流的生命周期为[begin_time,end_time),startTimeNs/endTimeNs=from_time,因此:
- 当from_time ≤ begin_time or from_time = "begin"时:返回时间点为begin_time对应的Cursor位置。
- 当from_time ≥ end_time or from_time = "end"时:返回当前时间点下一条将被写入的Cursor位置(当前该Cursor位置上无数据)。
- 当from_time > begin_time and from_time < end_time时:返回第一个服务端接收时间大于等于from_time的数据包对应的Cursor。
消费数据示例代码
import argparse import sys import time from typing import List from loguru import logger from consumer.core.client_consumer_worker import get_client_consumer_worker from consumer.interface.consumer_check_point_tracker import ILogConsumerCheckPointTracker from consumer.interface.consumer_processor import ILogConsumerProcessor from consumer.interface.consumer_processor_factory import ILogConsumerProcessorFactory from consumer.model.consumer_config import ConsumerConfig from consumer.model.log_data import LogData class DemoLogConsumerProcessor(ILogConsumerProcessor): def __init__(self): self.log_count = 0 def initialize(self, shard_id): pass def process(self, log_group: List[LogData], tracker: ILogConsumerCheckPointTracker) -> str: self.log_count += len(log_group) logger.info("this time consume log {} total log num {}", len(log_group), self.log_count) def shutdown(self, tracker: ILogConsumerCheckPointTracker): return tracker.save_check_point(True) class DemoLogConsumerProcessorFactory(ILogConsumerProcessorFactory): def generate_processor(self): return DemoLogConsumerProcessor() def start_consume(endpoint, region_name, project_id, log_group_id, log_stream_id, access_key, access_secret, consumer_group_name, start_time, end_time, log_level, is_test, consumer_count, consume_batch_size): logger.remove() logger.add(sys.stderr, level=log_level) if is_test: logger.add("/fanxin/log.log", level="DEBUG") logger.info("start time {}, end time {}", start_time, end_time) workers = [] for i in range(consumer_count): config = ConsumerConfig(endpoint, region_name, project_id, log_group_id, log_stream_id, access_key, access_secret, consumer_group_name, start_time, end_time, consume_batch_size) work = get_client_consumer_worker(DemoLogConsumerProcessorFactory(), config) if work is None: logger.error("get work error, work is none") else: workers.append(work) for work in workers: work.start() time.sleep(30000 * 60) for work in workers: work.shutdown() time.sleep(60) if __name__ == '__main__': parser = argparse.ArgumentParser(description='lts python sdk consume log help tester information') parser.add_argument('--endpoint', dest='endpoint', type=str, help='lts endpoint', default="https://endpoint") parser.add_argument('--project_id', dest='project_id', type=str, help='huaweicloud project_id', default="projectId") parser.add_argument('--region', dest='region', type=str, help='huaweicloud region', default="region") parser.add_argument('--group_id', dest='group_id', type=str, help='send log group_id', default="group_id") parser.add_argument('--stream_id', dest='stream_id', type=str, help='send log stream_id', default="stream_id") parser.add_argument('--ak', dest='ak', type=str, help='huaweicloud ak', default="ak") parser.add_argument('--sk', dest='sk', type=str, help='huaweicloud sk', default="sk") parser.add_argument('--consumer_count', dest='consumer_count', type=int, help='consumer count', default=1) parser.add_argument('--consume_batch_size', dest='consume_batch_size', type=int, help='each fetch max log nums', default=1000) parser.add_argument('--consumer_group_name', dest='consumer_group_name', type=str, help='consumer group name', default="consumer_group_name") parser.add_argument('--start_time', dest='start_time', type=int, help='consume log start time ms', default=213123123123) parser.add_argument('--end_time', dest='end_time', type=int, help='consume log end time ms', default=0) parser.add_argument('--log_level', dest='log_level', type=str, help='consume log end time ms', default="INFO") parser.add_argument('--is_test', dest='is_test', type=bool, help='test consumer will write log to node', default=True) args = parser.parse_args() start_consume(args.endpoint, args.region, args.project_id, args.group_id, args.stream_id, args.ak, args.sk, args.consumer_group_name, args.start_time, args.end_time, args.log_level, args.is_test, args.consumer_count, args.consume_batch_size)
获取日志组和日志流ID
- 日志组ID:登录云日志服务页面,选择“日志管理”,鼠标悬浮在日志组名称上,可查看日志组名称和日志组ID。
- 日志流ID:单击日志组名称对应的
按钮,鼠标悬浮在日志流名称上,可查看日志流名称和日志流ID。