Example Functions (Python)
Example Function of Evaluations Triggered by Configuration Changes
Config will invoke a function like the following example when it detects a configuration change for a target resource.
import time import http.client from huaweicloudsdkcore.auth.credentials import GlobalCredentials from huaweicloudsdkcore.exceptions.exceptions import ConnectionException from huaweicloudsdkcore.exceptions.exceptions import RequestTimeoutException from huaweicloudsdkcore.exceptions.exceptions import ServiceResponseException from huaweicloudsdkconfig.v1.region.config_region import ConfigRegion from huaweicloudsdkconfig.v1.config_client import ConfigClient from huaweicloudsdkconfig.v1 import PolicyResource, PolicyStateRequestBody from huaweicloudsdkconfig.v1 import UpdatePolicyStateRequest ''' The evaluation result of a rule will be either Compliant or NonCompliant. In this example, if the vpcId of an ECS does not match the specified VPC ID, NonCompliant is returned. Otherwise, Compliant is returned. ''' def evaluate_compliance(resource, parameter): if resource.get("provider") != "ecs" or resource.get("type") != "cloudservers": return "Compliant" vpc_id = resource.get("properties", {}).get("metadata", {}).get("vpcId") return "Compliant" if vpc_id == parameter.get("vpcId").get("value") else "NonCompliant" def update_policy_state(context, domain_id, evaluation): auth = GlobalCredentials( ak=context.getSecurityAccessKey(), sk=context.getSecuritySecretKey(), domain_id=domain_id ).with_security_token(context.getSecurityToken()) client = ConfigClient.new_builder() \ .with_credentials(credentials=auth) \ .with_region(region=ConfigRegion.value_of(region_id="cn-north-4")) \ .build() try: response = client.update_policy_state(evaluation) return 200 except ConnectionException as e: print("A connect timeout exception occurs while the Config performs some operations, exception: ", e.error_msg) return e.status_code except RequestTimeoutException as e: print("A request timeout exception occurs while the Config performs some operations, exception: ", e.error_msg) return e.status_code except ServiceResponseException as e: print("There is service error, exception: ", e.status_code, e.error_msg) return e.status_code def handler(event, context): domain_id = event.get("domain_id") resource = event.get("invoking_event", {}) parameters = event.get("rule_parameter") compliance_state = evaluate_compliance(resource, parameters) request_body = UpdatePolicyStateRequest(PolicyStateRequestBody( policy_resource = PolicyResource( resource_id = resource.get("id"), resource_name = resource.get("name"), resource_provider = resource.get("provider"), resource_type = resource.get("type"), region_id = resource.get("region_id"), domain_id = domain_id ), trigger_type = event.get("trigger_type"), compliance_state = compliance_state, policy_assignment_id = event.get("policy_assignment_id"), policy_assignment_name = event.get("policy_assignment_name"), evaluation_time = event.get("evaluation_time"), evaluation_hash = event.get("evaluation_hash") )) for retry in range(5): status_code = update_policy_state(context, domain_id, request_body) if status_code == http.client.TOO_MANY_REQUESTS: print("TOO_MANY_REQUESTS: retry again") time.sleep(1) elif status_code == http.client.OK: print("Update policyState successfully.") break else: print("Failed to update policyState.") break
Example Function of Evaluations Triggered by Periodic Execution
Config will invoke a function like the following example for a custom rule that is executed periodically.
import time import http.client from huaweicloudsdkcore.auth.credentials import GlobalCredentials from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkcore.exceptions.exceptions import ConnectionException from huaweicloudsdkcore.exceptions.exceptions import RequestTimeoutException from huaweicloudsdkcore.exceptions.exceptions import ServiceResponseException from huaweicloudsdkconfig.v1.region.config_region import ConfigRegion from huaweicloudsdkconfig.v1.config_client import ConfigClient from huaweicloudsdkconfig.v1 import PolicyResource, PolicyStateRequestBody from huaweicloudsdkconfig.v1 import UpdatePolicyStateRequest from huaweicloudsdkiam.v3.region.iam_region import IamRegion from huaweicloudsdkiam.v3 import IamClient, ShowDomainLoginPolicyRequest """ The evaluation result will be either compliant or noncompliant. In this example, if the session timeout configured for the account is greater than 30 minutes, Compliant is returned. Otherwise, NonCompliant is returned. The method is to call the API, ShowDomainLoginPolicy, of IAM. In this case, you may need to set a timeout and memory limit for the function. """ def evaluate_compliance(context, domain_id): credentials = GlobalCredentials( ak=context.getSecurityAccessKey(), sk=context.getSecuritySecretKey(), domain_id=domain_id ).with_security_token(context.getSecurityToken()) client = IamClient.new_builder() \ .with_credentials(credentials) \ .with_region(IamRegion.value_of("cn-north-4")) \ .build() try: request = ShowDomainLoginPolicyRequest() request.domain_id = domain_id response = client.show_domain_login_policy(request) session_timeout = response.login_policy.session_timeout print("session_timeout", session_timeout) if not session_timeout: return "NonCompliant" return "NonCompliant" if session_timeout > 30 else "Compliant" except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg) def update_policy_state(context, domain_id, evaluation): auth = GlobalCredentials(ak=context.getAccessKey(), sk=context.getSecretKey(), domain_id=domain_id) client = ConfigClient.new_builder() \ .with_credentials(credentials=auth) \ .with_region(region=ConfigRegion.value_of(region_id="cn-north-4")) \ .build() try: response = client.update_policy_state(evaluation) return 200 except ConnectionException as e: print("A connect timeout exception occurs while the Config performs some operations, exception: ", e.error_msg) return e.status_code except RequestTimeoutException as e: print("A request timeout exception occurs while the Config performs some operations, exception: ", e.error_msg) return e.status_code except ServiceResponseException as e: print("There is service error, exception: ", e.status_code, e.error_msg) return e.status_code def handler(event, context): domain_id = event.get("domain_id") resource = event.get("invoking_event", {}) if resource.get("name") != "Account": return compliance_state = evaluate_compliance(context, domain_id) request_body = UpdatePolicyStateRequest(PolicyStateRequestBody( policy_resource = PolicyResource( resource_id = resource.get("id"), resource_name = resource.get("name"), resource_provider = resource.get("provider"), resource_type = resource.get("type"), region_id = resource.get("region_id"), domain_id = domain_id ), trigger_type = event.get("trigger_type"), compliance_state = compliance_state, policy_assignment_id = event.get("policy_assignment_id"), policy_assignment_name = event.get("policy_assignment_name"), evaluation_time = event.get("evaluation_time"), evaluation_hash = event.get("evaluation_hash") )) for retry in range(5): status_code = update_policy_state(context, domain_id, request_body) if status_code == http.client.TOO_MANY_REQUESTS: print("TOO_MANY_REQUESTS: retry again") time.sleep(1) elif status_code == http.client.OK: print("Update policyState successfully.") break else: print("Failed to update policyState.") break
Dependency Package
If dependency packages are missing, you need to manually import them. For details, see Configuring Dependency Packages. In the preceding example, the dependency packages are huaweicloudsdkiam and huaweicloudsdkconfig.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot