Updated on 2024-12-10 GMT+08:00

Example Functions (Python)

Example Function of Evaluations Triggered by Configuration Changes

Config will invoke a function like the following example when it detects a configuration change for a target resource.

import time
import http.client
from huaweicloudsdkcore.auth.credentials import GlobalCredentials
from huaweicloudsdkcore.exceptions.exceptions import ConnectionException
from huaweicloudsdkcore.exceptions.exceptions import RequestTimeoutException
from huaweicloudsdkcore.exceptions.exceptions import ServiceResponseException
from huaweicloudsdkconfig.v1.region.config_region import ConfigRegion
from huaweicloudsdkconfig.v1.config_client import ConfigClient
from huaweicloudsdkconfig.v1 import PolicyResource, PolicyStateRequestBody
from huaweicloudsdkconfig.v1 import UpdatePolicyStateRequest

''' 
The evaluation result of a rule will be either Compliant or NonCompliant.
In this example, if the vpcId of an ECS does not match the specified VPC ID, NonCompliant is returned. Otherwise, Compliant is returned.
'''
def evaluate_compliance(resource, parameter):
    if resource.get("provider") != "ecs" or resource.get("type") != "cloudservers":  
        return "Compliant"  
    vpc_id = resource.get("properties", {}).get("metadata", {}).get("vpcId")  
    return "Compliant" if vpc_id == parameter.get("vpcId").get("value") else "NonCompliant"  


def update_policy_state(context, domain_id, evaluation):
    auth = GlobalCredentials(
        ak=context.getSecurityAccessKey(),
        sk=context.getSecuritySecretKey(),
        domain_id=domain_id
    ).with_security_token(context.getSecurityToken())
    client = ConfigClient.new_builder() \
        .with_credentials(credentials=auth) \
        .with_region(region=ConfigRegion.value_of(region_id="cn-north-4")) \
        .build()

    try:
        response = client.update_policy_state(evaluation)
        return 200
    except ConnectionException as e:
        print("A connect timeout exception occurs while the Config performs some operations, exception: ", e.error_msg)
        return e.status_code
    except RequestTimeoutException as e:
        print("A request timeout exception occurs while the Config performs some operations, exception: ", e.error_msg)
        return e.status_code
    except ServiceResponseException as e:
        print("There is service error, exception: ", e.status_code, e.error_msg)
        return e.status_code


def handler(event, context):
    domain_id = event.get("domain_id")
    resource = event.get("invoking_event", {})
    parameters = event.get("rule_parameter")
    compliance_state = evaluate_compliance(resource, parameters)

    request_body = UpdatePolicyStateRequest(PolicyStateRequestBody(
        policy_resource = PolicyResource(
            resource_id = resource.get("id"),
            resource_name = resource.get("name"),
            resource_provider = resource.get("provider"),
            resource_type = resource.get("type"),
            region_id = resource.get("region_id"),
            domain_id = domain_id
        ),  
        trigger_type = event.get("trigger_type"),  
        compliance_state = compliance_state,  
        policy_assignment_id = event.get("policy_assignment_id"),  
        policy_assignment_name = event.get("policy_assignment_name"),
        evaluation_time = event.get("evaluation_time"),  
        evaluation_hash = event.get("evaluation_hash")  
    ))

    for retry in range(5):
        status_code = update_policy_state(context, domain_id, request_body)
        if status_code == http.client.TOO_MANY_REQUESTS:
            print("TOO_MANY_REQUESTS: retry again")
            time.sleep(1)
        elif status_code == http.client.OK:
            print("Update policyState successfully.")
            break
        else:
            print("Failed to update policyState.")
            break

Example Function of Evaluations Triggered by Periodic Execution

Config will invoke a function like the following example for a custom rule that is executed periodically.

import time
import http.client
from huaweicloudsdkcore.auth.credentials import GlobalCredentials
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkcore.exceptions.exceptions import ConnectionException
from huaweicloudsdkcore.exceptions.exceptions import RequestTimeoutException
from huaweicloudsdkcore.exceptions.exceptions import ServiceResponseException
from huaweicloudsdkconfig.v1.region.config_region import ConfigRegion
from huaweicloudsdkconfig.v1.config_client import ConfigClient
from huaweicloudsdkconfig.v1 import PolicyResource, PolicyStateRequestBody
from huaweicloudsdkconfig.v1 import UpdatePolicyStateRequest
from huaweicloudsdkiam.v3.region.iam_region import IamRegion
from huaweicloudsdkiam.v3 import IamClient, ShowDomainLoginPolicyRequest

"""
The evaluation result will be either compliant or noncompliant.
In this example, if the session timeout configured for the account is greater than 30 minutes, Compliant is returned. Otherwise, NonCompliant is returned.
The method is to call the API, ShowDomainLoginPolicy, of IAM.
In this case, you may need to set a timeout and memory limit for the function.
"""  
def evaluate_compliance(context, domain_id):  
    credentials = GlobalCredentials(
        ak=context.getSecurityAccessKey(),
        sk=context.getSecuritySecretKey(),
        domain_id=domain_id
    ).with_security_token(context.getSecurityToken())
    client = IamClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(IamRegion.value_of("cn-north-4")) \
        .build()

    try:
        request = ShowDomainLoginPolicyRequest()
        request.domain_id = domain_id
        response = client.show_domain_login_policy(request)
        session_timeout = response.login_policy.session_timeout
        print("session_timeout", session_timeout)
        if not session_timeout:
            return "NonCompliant"
        return "NonCompliant" if session_timeout > 30 else "Compliant"  
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)


def update_policy_state(context, domain_id, evaluation):
    auth = GlobalCredentials(ak=context.getAccessKey(), sk=context.getSecretKey(), domain_id=domain_id)
    client = ConfigClient.new_builder() \
        .with_credentials(credentials=auth) \
        .with_region(region=ConfigRegion.value_of(region_id="cn-north-4")) \
        .build()
    try:
        response = client.update_policy_state(evaluation)
        return 200
    except ConnectionException as e:
        print("A connect timeout exception occurs while the Config performs some operations, exception: ", e.error_msg)
        return e.status_code
    except RequestTimeoutException as e:
        print("A request timeout exception occurs while the Config performs some operations, exception: ", e.error_msg)
        return e.status_code
    except ServiceResponseException as e:
        print("There is service error, exception: ", e.status_code, e.error_msg)
        return e.status_code

def handler(event, context):
    domain_id = event.get("domain_id")
    resource = event.get("invoking_event", {})  
    if resource.get("name") != "Account":  
        return
    compliance_state = evaluate_compliance(context, domain_id)

    request_body = UpdatePolicyStateRequest(PolicyStateRequestBody(
        policy_resource = PolicyResource(
            resource_id = resource.get("id"),
            resource_name = resource.get("name"),
            resource_provider = resource.get("provider"),
            resource_type = resource.get("type"),
            region_id = resource.get("region_id"),
            domain_id = domain_id
        ),  
        trigger_type = event.get("trigger_type"),  
        compliance_state = compliance_state,  
        policy_assignment_id = event.get("policy_assignment_id"),  
        policy_assignment_name = event.get("policy_assignment_name"),
        evaluation_time = event.get("evaluation_time"),  
        evaluation_hash = event.get("evaluation_hash")  
    ))

    for retry in range(5):
        status_code = update_policy_state(context, domain_id, request_body)
        if status_code == http.client.TOO_MANY_REQUESTS:
            print("TOO_MANY_REQUESTS: retry again")
            time.sleep(1)
        elif status_code == http.client.OK:
            print("Update policyState successfully.")
            break
        else:
            print("Failed to update policyState.")
            break

Dependency Package

If dependency packages are missing, you need to manually import them. For details, see Configuring Dependency Packages. In the preceding example, the dependency packages are huaweicloudsdkiam and huaweicloudsdkconfig.