使用Python SDK管理消费组
消费组创建完成后,您可以使用集成云日志服务Python SDK通过消费组消费数据。
目前此功能在邀测中,暂不支持申请开通。
约束限制
- Python SDK适用于python 3.10.1及以上版本。
- 消费历史日志时,只支持开通公测白名单时间点后的日志。
前提条件
- 使用云日志SDK前,您需要注册华为云账号,并开通云日志服务。
- 确认云日志服务的区域,请用户根据所在区域,获取regionName。
- 获取华为账号的AK/SK。
- 获取华为云账号的项目ID(project id),步骤参考:请参见“我的凭证 > API凭证”。
- 获取需要上报到LTS的日志组ID、日志流ID、消费组名称。
- 云日志服务SDK仅支持在华为云ECS主机上使用。
- 当用户修改权限后,权限信息在一天后生效。
操作步骤
- 获取LTS Python SDK包源码。
git clone https://gitee.com/lordstar-habile/huaweicloud-lts-python-sdk.git - 安装相关依赖。
pip3 install requests pip3 install loguru pip3 install six
配置参数说明
LogConsumerConfig中配置参数说明:
|
参数名称 |
描述 |
类型 |
是否需要填写 |
默认值 |
|---|---|---|---|---|
|
region |
云日志服务的区域。 |
string |
必填 |
- |
|
projectId |
华为云账号的项目ID(project id)。 |
string |
必填 |
- |
|
logGroup |
LTS的日志组ID。 |
string |
必填 |
- |
|
logStream |
LTS的日志流ID。 |
string |
必填 |
- |
|
ak |
华为云账号的AK。 |
string |
必填 |
- |
|
sk |
华为云账号的SK。 |
string |
必填 |
- |
|
consumer_group_name |
LTS日志流对应的消费组名称。 |
string |
必填 |
- |
|
start_time |
消费开始时间,单位是纳秒。 |
time.Time |
必填 |
- |
|
end_time |
消费结束时间,单位是纳秒。 |
time.Time |
选填 |
- |
|
consume_batch_size |
每批次消费数量。 |
int |
选填 |
1000 |
|
consumer_count |
消费者的数量。 |
int |
选填 |
1 |
- startTimeNs(消费开始时间)和endTimeNs(消费结束时间),这里的时间指的是服务端时间。
- SDK消费保证最终一致性,即您可以获取到这条日志流的全部内容,但由于时间为服务端时间,所以在获取日志流的过程中,可能导致获取到的日志数量跟LTS页面查询的日志数量不一致。
通过startTimeNs/endTimeNs可以在Shard中定位生命周期内的日志,假设LogStream日志流的生命周期为[begin_time,end_time),startTimeNs/endTimeNs=from_time,因此:
- 当from_time ≤ begin_time or from_time = "begin"时:返回时间点为begin_time对应的Cursor位置。
- 当from_time ≥ end_time or from_time = "end"时:返回当前时间点下一条将被写入的Cursor位置(当前该Cursor位置上无数据)。
- 当from_time > begin_time and from_time < end_time时:返回第一个服务端接收时间大于等于from_time的数据包对应的Cursor。
消费数据示例代码
import argparse
import sys
import time
from typing import List
from loguru import logger
from consumer.core.client_consumer_worker import get_client_consumer_worker
from consumer.interface.consumer_check_point_tracker import ILogConsumerCheckPointTracker
from consumer.interface.consumer_processor import ILogConsumerProcessor
from consumer.interface.consumer_processor_factory import ILogConsumerProcessorFactory
from consumer.model.consumer_config import ConsumerConfig
from consumer.model.log_data import LogData
class DemoLogConsumerProcessor(ILogConsumerProcessor):
def __init__(self):
self.log_count = 0
def initialize(self, shard_id):
pass
def process(self, log_group: List[LogData], tracker: ILogConsumerCheckPointTracker) -> str:
self.log_count += len(log_group)
logger.info("this time consume log {} total log num {}", len(log_group), self.log_count)
def shutdown(self, tracker: ILogConsumerCheckPointTracker):
return tracker.save_check_point(True)
class DemoLogConsumerProcessorFactory(ILogConsumerProcessorFactory):
def generate_processor(self):
return DemoLogConsumerProcessor()
def start_consume(endpoint, region_name, project_id, log_group_id, log_stream_id, access_key, access_secret,
consumer_group_name, start_time, end_time, log_level, is_test, consumer_count, consume_batch_size):
logger.remove()
logger.add(sys.stderr, level=log_level)
if is_test:
logger.add("/fanxin/log.log", level="DEBUG")
logger.info("start time {}, end time {}", start_time, end_time)
workers = []
for i in range(consumer_count):
config = ConsumerConfig(endpoint, region_name, project_id, log_group_id, log_stream_id, access_key,
access_secret, consumer_group_name, start_time, end_time, consume_batch_size)
work = get_client_consumer_worker(DemoLogConsumerProcessorFactory(), config)
if work is None:
logger.error("get work error, work is none")
else:
workers.append(work)
for work in workers:
work.start()
time.sleep(30000 * 60)
for work in workers:
work.shutdown()
time.sleep(60)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='lts python sdk consume log help tester information')
parser.add_argument('--endpoint', dest='endpoint', type=str, help='lts endpoint', default="https://endpoint")
parser.add_argument('--project_id', dest='project_id', type=str, help='huaweicloud project_id', default="projectId")
parser.add_argument('--region', dest='region', type=str, help='huaweicloud region', default="region")
parser.add_argument('--group_id', dest='group_id', type=str, help='send log group_id', default="group_id")
parser.add_argument('--stream_id', dest='stream_id', type=str, help='send log stream_id', default="stream_id")
parser.add_argument('--ak', dest='ak', type=str, help='huaweicloud ak', default="ak")
parser.add_argument('--sk', dest='sk', type=str, help='huaweicloud sk', default="sk")
parser.add_argument('--consumer_count', dest='consumer_count', type=int, help='consumer count', default=1)
parser.add_argument('--consume_batch_size', dest='consume_batch_size', type=int, help='each fetch max log nums',
default=1000)
parser.add_argument('--consumer_group_name', dest='consumer_group_name', type=str, help='consumer group name',
default="consumer_group_name")
parser.add_argument('--start_time', dest='start_time', type=int, help='consume log start time ms',
default=213123123123)
parser.add_argument('--end_time', dest='end_time', type=int, help='consume log end time ms', default=0)
parser.add_argument('--log_level', dest='log_level', type=str, help='consume log end time ms', default="INFO")
parser.add_argument('--is_test', dest='is_test', type=bool, help='test consumer will write log to node',
default=True)
args = parser.parse_args()
start_consume(args.endpoint, args.region, args.project_id, args.group_id, args.stream_id, args.ak,
args.sk, args.consumer_group_name, args.start_time, args.end_time, args.log_level, args.is_test,
args.consumer_count, args.consume_batch_size)
获取日志组和日志流ID
- 日志组ID:登录云日志服务页面,选择“日志管理”,鼠标悬浮在日志组名称上,可查看日志组名称和日志组ID。
- 日志流ID:单击日志组名称对应的
按钮,鼠标悬浮在日志流名称上,可查看日志流名称和日志流ID。