更新时间:2024-02-04 GMT+08:00
示例函数(Python)
评估由配置变更触发的示例函数
Config服务检测到自定义合规规则范围内的资源发生更改时,会调用函数的示例如下:
import requests import http.client import time requests.packages.urllib3.disable_warnings() def get_policy_resource(domain_id, resource): return { "domain_id": domain_id, "region_id": resource.get("region_id"), "resource_id": resource.get("id"), "resource_name": resource.get("name"), "resource_provider": resource.get("provider"), "resource_type": resource.get("type") } ''' 合规规则评估逻辑:返回“Compliant”或“NonCompliant”。 本示例中,当资源类型为ecs.cloudservers,且该ecs的vpcId字段不是合规规则参数所指定的vpcId时,会返回不合规,否则返回合规。 ''' def evaluate_compliance(resource, parameter): if resource.get("provider") != "ecs" or resource.get("type") != "cloudservers": return "Compliant" vpc_id = resource.get("properties", {}).get("metadata", {}).get("vpcId") return "Compliant" if vpc_id == parameter.get("vpcId") else "NonCompliant" def update_policy_state(token, domain_id, evaluation): endpoint = "https://rms.myhuaweicloud.com" url = "{}/v1/resource-manager/domains/{}/policy-states".format(endpoint, domain_id) return requests.put( url=url, headers={ "X-Auth-Token": token }, json=evaluation, verify=False, ) def handler(event, context): resource = event.get("invoking_event", {}) parameters = event.get("rule_parameter") compliance_state = evaluate_compliance(resource, parameters) requests = { "policy_resource": get_policy_resource(event.get("domain_id"), resource), "trigger_type": event.get("trigger_type"), "compliance_state": compliance_state, "policy_assignment_id": event.get("policy_assignment_id"), "policy_assignment_name": event.get("policy_assignment_name"), "function_urn": event.get("function_urn"), "evaluation_time": event.get("evaluation_time"), "evaluation_hash": event.get("evaluation_hash") } for retry in range(3): response = update_policy_state(context.getToken(), event.get("domain_id"), requests) if response.status_code == http.client.TOO_MANY_REQUESTS: print("TOO_MANY_REQUESTS: retry again") time.sleep(1) else: if response.status_code == http.client.OK: print("Update policyState successfully.") else: print("Failed to update policyState.") print(response.json()) break
评估由周期执行触发的示例函数
Config针对周期执行的自定义合规规则,会调用函数的示例如下:
import requests import http.client import time requests.packages.urllib3.disable_warnings() def get_policy_resource(domain_id, resource): return { "domain_id": domain_id, "region_id": resource.get("region_id"), "resource_id": resource.get("id"), "resource_name": resource.get("name"), "resource_provider": resource.get("provider"), "resource_type": resource.get("type") } """ 合规规则评估逻辑:返回“Compliant” 或 “NonCompliant”。 本示例中, 当账号设置的登录会话失效时间大于30分钟, 会返回不合规, 否则返回合规。 实现方式是调用IAM服务的接口ShowDomainLoginPolicy。 """ def evaluate_compliance(token, domain_id): endpoint = "https://iam.cn-north-4.myhuaweicloud.com" url = "{}/v3.0/OS-SECURITYPOLICY/domains/{}/login-policy".format(endpoint, domain_id) r = requests.get( url=url, headers={ "X-Auth-Token": token, "User-Agent": "API Explorer", "Content-Type": "application/json;charset=UTF-8" }, verify=False, ) session_timeout = r.json().get("login_policy", {}).get("session_timeout", 60) return "NonCompliant" if session_timeout > 30 else "Compliant" def update_policy_state(token, domain_id, evaluation): endpoint = "https://rms.myhuaweicloud.com" url = "{}/v1/resource-manager/domains/{}/policy-states".format(endpoint, domain_id) return requests.put( url=url, headers={ "X-Auth-Token": token }, json=evaluation, verify=False, ) def handler(event, context): resource = event.get("invoking_event", {}) if resource.get("name") != "Account": return compliance_state = evaluate_compliance(context.getToken(), event.get("domain_id")) requests = { "policy_resource": get_policy_resource(event.get("domain_id"), resource), "trigger_type": event.get("trigger_type"), "compliance_state": compliance_state, "policy_assignment_id": event.get("policy_assignment_id"), "policy_assignment_name": event.get("policy_assignment_name"), "function_urn": event.get("function_urn"), "evaluation_time": event.get("evaluation_time"), "evaluation_hash": event.get("evaluation_hash") } for retry in range(3): response = update_policy_state(context.getToken(), event.get("domain_id"), requests) if response.status_code == http.client.TOO_MANY_REQUESTS: print("TOO_MANY_REQUESTS: retry again") time.sleep(1) else: if response.status_code == http.client.OK: print("Update policyState successfully.") else: print("Failed to update policyState.") print(response.json()) break
父主题: 自定义合规规则样例