代码示例如下所示,请根据实际情况进行修改: # -*- coding:utf-8 -*-
import json
import time # 新增模块:time,用于记录时间戳
import requests
from datetime import datetime, timezone, timedelta
from urllib.parse import urlencode
def get_workspace_headers(region_name, token):
headers = {"Host": "workspace." + region_name + ".myhuaweicloud.com",
"X-Auth-Token": token,
"Content-Type": "application/json"}
return headers
def fetch_users_in_batches(region_name, ip, token, project_id, user_name_list, batch_size=100):
all_responses = []
merged_responses = {
"count": 0,
"users": []
}
for i in range(0, len(user_name_list), batch_size):
batch = user_name_list[i:i + batch_size]
response = list_users(region_name, ip, token, project_id, batch)
all_responses.append(response)
for resp in all_responses:
merged_responses["count"] += resp["total_count"]
merged_responses["users"].extend(resp["users"])
return merged_responses
def list_users(region_name, ip, token, project_id, user_name_list):
base_url = "https://" + ip + "/v2/" + project_id + "/users"
headers = get_workspace_headers(region_name, token)
params = {}
if user_name_list:
params['user_names'] = ",".join(user_name_list)
query_string = urlencode(params)
if query_string:
full_url = base_url + "?" + query_string
else:
full_url = base_url
response = requests.request("GET", full_url, headers=headers, verify=False)
return json.loads(response.text)
def list_screen_records(region_name, ip, token, project_id, start_time, end_time):
base_url = "https://" + ip + "/v2/" + project_id + "/screen-records"
headers = get_workspace_headers(region_name, token)
params = {}
limit = 1000
params['limit'] = limit
offset = 0
params['offset'] = offset
params['start_time'] = start_time
params['end_time'] = end_time
all_screen_records = []
total_count = 0
while True:
params['offset'] = offset
query_string = urlencode(params)
full_url = f"{base_url}?{query_string}" if query_string else base_url
response = requests.request("GET", full_url, headers=headers, verify=False)
print("full_url", full_url)
response_data = json.loads(response.text)
print("response_data", response_data)
screen_records = response_data.get("screen_records", [])
current_total = response_data.get("total_count", len(screen_records))
if total_count == 0:
total_count = current_total
all_screen_records.extend(screen_records)
if len(all_screen_records) >= total_count:
break
offset += limit
return {
"total_count": total_count,
"screen_records": all_screen_records
}
def list_desktop_operations(region_name, ip, token, project_id, record_id):
base_url = "https://" + ip + "/v2/" + project_id + "/screen-records/" + record_id + "/os-operations"
headers = get_workspace_headers(region_name, token)
params = {}
limit = 1000
params['limit'] = limit
offset = 0
params['offset'] = offset
all_operations = []
total_count = 0
while True:
params['offset'] = offset
query_string = urlencode(params)
full_url = f"{base_url}?{query_string}" if query_string else base_url
response = requests.request("GET", full_url, headers=headers, verify=False)
response_data = json.loads(response.text)
operations = response_data.get("operations", [])
current_total = response_data.get("total_count", len(operations))
if total_count == 0:
total_count = current_total
all_operations.extend(operations)
if len(all_operations) >= total_count:
break
offset += limit
return {
"screen_record_id": record_id,
"total_count": total_count,
"screen_records": all_operations
}
def report_lts(region_name, token, project_id, log_group_id, log_stream_id, report_items):
url = ("https://lts-access." + region_name + ".myhuaweicloud.com:8102/v2/" + project_id + "/lts/groups/"
+ log_group_id + "/streams/" + log_stream_id + "/tenant/contents")
headers = {"X-Auth-Token": token,
"Content-Type": "application/json"}
print("url:", url)
str_report_items = list(map(lambda x: json.dumps(x), report_items))
payload = {
"log_time_ns": time.time_ns(),
"contents": str_report_items,
"labels": {
"tag": "workspace"
}
}
response = requests.post(url=url, headers=headers, json=payload, verify=False)
print("report_lts_response 内容预览:", response.text)
return {
"body": ""
}
def handler(event, context):
print("START START START")
token = context.getToken()
region = context.getUserData('region')
ip = context.getUserData('ip')
project_id = context.getUserData('project_id')
log_group_id = context.getUserData('log_group_id')
log_stream_id = context.getUserData('log_stream_id')
batch_size = int(context.getUserData('batch_size'))
wait_time = int(context.getUserData('wait_time'))
now = datetime.utcnow()
start_of_previous_hour = now.replace(minute=0, second=0, microsecond=0) - timedelta(hours=1)
end_of_previous_hour = start_of_previous_hour + timedelta(hours=1) - timedelta(microseconds=1)
start_time = start_of_previous_hour.strftime("%Y-%m-%dT%H:%M:%SZ")
end_time = end_of_previous_hour.strftime("%Y-%m-%dT%H:%M:%SZ")
screen_records_list = list_screen_records(region, ip, token, project_id, start_time, end_time)
screen_record_ids = [screen_record["id"] for screen_record in screen_records_list["screen_records"]]
screen_records_dict = {}
for screen_record in screen_records_list["screen_records"]:
screen_records_dict[screen_record.get("id")] = screen_record
unique_user_name_set = set()
for item in screen_records_list["screen_records"]:
unique_user_name_set.add(item["username"])
user_detail_list = fetch_users_in_batches(region, ip, token, project_id, list(unique_user_name_set))
user_group_names_dict = {
user_detail["user_name"]: sorted(user_detail.get("group_names", []))
for user_detail in user_detail_list["users"]
}
all_desktop_operations = []
for i in range(0, len(screen_record_ids), batch_size):
batch = screen_record_ids[i:i + batch_size]
print(f"Processing batch {i // batch_size + 1}, processing {len(batch)} records...")
for record_id in batch:
operations = list_desktop_operations(region, ip, token, project_id, record_id)
all_desktop_operations.extend(operations.get("screen_records", []))
# 如果不是最后一轮,并且本轮处理了完整的 50 条,则等待 60 秒避免流控
if (i + batch_size) < len(screen_record_ids):
print(f"Reached rate limit. Waiting for {wait_time} seconds...")
time.sleep(wait_time)
else:
print("All batches processed.")
report_data = []
print("len(all_desktop_operations)", len(all_desktop_operations))
for desktop_operation in all_desktop_operations:
username = desktop_operation.get("username", "")
user_group_names = user_group_names_dict.get("username", [])
desktop_id = desktop_operation.get("desktop_id", "")
screen_record_id = desktop_operation.get("screen_record_id", "")
event_type = desktop_operation.get("event_type", "")
event_level = desktop_operation.get("event_level", "")
event_id = desktop_operation.get("event_id", "")
event_data = desktop_operation.get("event_data", "")
operation_time = desktop_operation.get("operation_time", "")
relative_start_time = desktop_operation.get("relative_start_time", "")
desktop_name = screen_records_dict.get(screen_record_id, "").get("desktop_name", "")
report_data.append({
"username": username,
"user_group_names": user_group_names,
"desktop_name": desktop_name,
"desktop_id": desktop_id,
"screen_record_id": screen_record_id,
"event_type": event_type,
"event_level": event_level,
"event_id": event_id,
"event_data": event_data,
"operation_time": operation_time,
"relative_start_time": relative_start_time
})
report_lts(region, token, project_id, log_group_id, log_stream_id, report_data)
return {
"statusCode": 200,
"isBase64Encoded": False,
"body": len(report_data),
"headers": {"Content-Type": "application/json"}
}