import time import http.client from huaweicloudsdkcore.auth.credentials import GlobalCredentials from huaweicloudsdkcore.exceptions.exceptions import ConnectionException from huaweicloudsdkcore.exceptions.exceptions import RequestTimeoutException from huaweicloudsdkcore.exceptions.exceptions import ServiceResponseException from huaweicloudsdkconfig.v1.region.config_region import ConfigRegion from huaweicloudsdkconfig.v1.config_client import ConfigClient from huaweicloudsdkconfig.v1 import PolicyResource, PolicyStateRequestBody from huaweicloudsdkconfig.v1 import UpdatePolicyStateRequest ''' 合规规则评估逻辑: 返回“Compliant” 或 “NonCompliant” 本示例中, 当资源类型为ecs.cloudservers, 且该ecs的vpcId字段不是合规规则参数所指定的vpcId时, 会返回不合规, 否则返回合规。 ''' def evaluate_compliance(resource, parameter): if resource.get("provider") != "ecs" or resource.get("type") != "cloudservers": return "Compliant" vpc_id = resource.get("properties", {}).get("metadata", {}).get("vpcId") return "Compliant" if vpc_id == parameter.get("vpcId").get("value") else "NonCompliant" def update_policy_state(context, domain_id, evaluation): auth = GlobalCredentials( ak=context.getSecurityAccessKey(), sk=context.getSecuritySecretKey(), domain_id=domain_id ).with_security_token(context.getSecurityToken()) client = ConfigClient.new_builder() \ .with_credentials(credentials=auth) \ .with_region(region=ConfigRegion.value_of(region_id="cn-north-4")) \ .build() try: response = client.update_policy_state(evaluation) return 200 except ConnectionException as e: print("A connect timeout exception occurs while the Config performs some operations, exception: ", e.error_msg) return e.status_code except RequestTimeoutException as e: print("A request timeout exception occurs while the Config performs some operations, exception: ", e.error_msg) return e.status_code except ServiceResponseException as e: print("There is service error, exception: ", e.status_code, e.error_msg) return e.status_code def handler(event, context): domain_id = event.get("domain_id") resource = event.get("invoking_event", {}) parameters = event.get("rule_parameter") compliance_state = evaluate_compliance(resource, parameters) request_body = UpdatePolicyStateRequest(PolicyStateRequestBody( policy_resource = PolicyResource( resource_id = resource.get("id"), resource_name = resource.get("name"), resource_provider = resource.get("provider"), resource_type = resource.get("type"), region_id = resource.get("region_id"), domain_id = domain_id ), trigger_type = event.get("trigger_type"), compliance_state = compliance_state, policy_assignment_id = event.get("policy_assignment_id"), policy_assignment_name = event.get("policy_assignment_name"), evaluation_time = event.get("evaluation_time"), evaluation_hash = event.get("evaluation_hash") )) for retry in range(5): status_code = update_policy_state(context, domain_id, request_body) if status_code == http.client.TOO_MANY_REQUESTS: print("TOO_MANY_REQUESTS: retry again") time.sleep(1) elif status_code == http.client.OK: print("Update policyState successfully.") break else: print("Failed to update policyState.") break
import time import http.client from huaweicloudsdkcore.auth.credentials import GlobalCredentials from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkcore.exceptions.exceptions import ConnectionException from huaweicloudsdkcore.exceptions.exceptions import RequestTimeoutException from huaweicloudsdkcore.exceptions.exceptions import ServiceResponseException from huaweicloudsdkconfig.v1.region.config_region import ConfigRegion from huaweicloudsdkconfig.v1.config_client import ConfigClient from huaweicloudsdkconfig.v1 import PolicyResource, PolicyStateRequestBody from huaweicloudsdkconfig.v1 import UpdatePolicyStateRequest from huaweicloudsdkiam.v3.region.iam_region import IamRegion from huaweicloudsdkiam.v3 import IamClient, ShowDomainLoginPolicyRequest """ 合规规则评估逻辑:返回“Compliant”或“NonCompliant”。 本示例中, 当账号设置的登录会话失效时间大于30分钟, 会返回不合规, 否则返回合规。 实现方式是调用IAM服务的接口ShowDomainLoginPolicy。 该场景下,可能需要适当增加函数的执行超时时间和内存限制。 """ def evaluate_compliance(context, domain_id): credentials = GlobalCredentials( ak=context.getSecurityAccessKey(), sk=context.getSecuritySecretKey(), domain_id=domain_id ).with_security_token(context.getSecurityToken()) client = IamClient.new_builder() \ .with_credentials(credentials) \ .with_region(IamRegion.value_of("cn-north-4")) \ .build() try: request = ShowDomainLoginPolicyRequest() request.domain_id = domain_id response = client.show_domain_login_policy(request) session_timeout = response.login_policy.session_timeout print("session_timeout", session_timeout) if not session_timeout: return "NonCompliant" return "NonCompliant" if session_timeout > 30 else "Compliant" except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg) def update_policy_state(context, domain_id, evaluation): auth = GlobalCredentials(ak=context.getAccessKey(), sk=context.getSecretKey(), domain_id=domain_id) client = ConfigClient.new_builder() \ .with_credentials(credentials=auth) \ .with_region(region=ConfigRegion.value_of(region_id="cn-north-4")) \ .build() try: response = client.update_policy_state(evaluation) return 200 except ConnectionException as e: print("A connect timeout exception occurs while the Config performs some operations, exception: ", e.error_msg) return e.status_code except RequestTimeoutException as e: print("A request timeout exception occurs while the Config performs some operations, exception: ", e.error_msg) return e.status_code except ServiceResponseException as e: print("There is service error, exception: ", e.status_code, e.error_msg) return e.status_code def handler(event, context): domain_id = event.get("domain_id") resource = event.get("invoking_event", {}) if resource.get("name") != "Account": return compliance_state = evaluate_compliance(context, domain_id) request_body = UpdatePolicyStateRequest(PolicyStateRequestBody( policy_resource = PolicyResource( resource_id = resource.get("id"), resource_name = resource.get("name"), resource_provider = resource.get("provider"), resource_type = resource.get("type"), region_id = resource.get("region_id"), domain_id = domain_id ), trigger_type = event.get("trigger_type"), compliance_state = compliance_state, policy_assignment_id = event.get("policy_assignment_id"), policy_assignment_name = event.get("policy_assignment_name"), evaluation_time = event.get("evaluation_time"), evaluation_hash = event.get("evaluation_hash") )) for retry in range(5): status_code = update_policy_state(context, domain_id, request_body) if status_code == http.client.TOO_MANY_REQUESTS: print("TOO_MANY_REQUESTS: retry again") time.sleep(1) elif status_code == http.client.OK: print("Update policyState successfully.") break else: print("Failed to update policyState.") break
