更新时间:2024-02-04 GMT+08:00
分享

示例函数(Python)

评估由配置变更触发的示例函数

Config服务检测到自定义合规规则范围内的资源发生更改时,会调用函数的示例如下:

import requests  
import http.client  
import time  

requests.packages.urllib3.disable_warnings()  


def get_policy_resource(domain_id, resource):  
    return {  
        "domain_id": domain_id,  
        "region_id": resource.get("region_id"),  
        "resource_id": resource.get("id"),  
        "resource_name": resource.get("name"),  
        "resource_provider": resource.get("provider"),  
        "resource_type": resource.get("type")  
    }  


''' 
合规规则评估逻辑:返回“Compliant”或“NonCompliant”。
本示例中,当资源类型为ecs.cloudservers,且该ecs的vpcId字段不是合规规则参数所指定的vpcId时,会返回不合规,否则返回合规。  
'''    
def evaluate_compliance(resource, parameter):  
    if resource.get("provider") != "ecs" or resource.get("type") != "cloudservers":  
        return "Compliant"  
    vpc_id = resource.get("properties", {}).get("metadata", {}).get("vpcId")  
    return "Compliant" if vpc_id == parameter.get("vpcId") else "NonCompliant"  


def update_policy_state(token, domain_id, evaluation):  
    endpoint = "https://rms.myhuaweicloud.com"  
    url = "{}/v1/resource-manager/domains/{}/policy-states".format(endpoint, domain_id)  
    return requests.put(  
        url=url,  
        headers={  
            "X-Auth-Token": token  
        },  
        json=evaluation,  
        verify=False,  
    )  


def handler(event, context):  
    resource = event.get("invoking_event", {})  
    parameters = event.get("rule_parameter")  
    compliance_state = evaluate_compliance(resource, parameters)  

    requests = {  
        "policy_resource": get_policy_resource(event.get("domain_id"), resource),  
        "trigger_type": event.get("trigger_type"),  
        "compliance_state": compliance_state,  
        "policy_assignment_id": event.get("policy_assignment_id"),  
        "policy_assignment_name": event.get("policy_assignment_name"),  
        "function_urn": event.get("function_urn"),  
        "evaluation_time": event.get("evaluation_time"),  
        "evaluation_hash": event.get("evaluation_hash")  
    }  

    for retry in range(3):  
        response = update_policy_state(context.getToken(), event.get("domain_id"), requests)  
        if response.status_code == http.client.TOO_MANY_REQUESTS:  
            print("TOO_MANY_REQUESTS: retry again")  
            time.sleep(1)  
        else:  
            if response.status_code == http.client.OK:  
                print("Update policyState successfully.")  
            else:  
                print("Failed to update policyState.")  
                print(response.json())  
            break

评估由周期执行触发的示例函数

Config针对周期执行的自定义合规规则,会调用函数的示例如下:

import requests  
import http.client  
import time  

requests.packages.urllib3.disable_warnings()  


def get_policy_resource(domain_id, resource):  
    return {  
        "domain_id": domain_id,  
        "region_id": resource.get("region_id"),  
        "resource_id": resource.get("id"),  
        "resource_name": resource.get("name"),  
        "resource_provider": resource.get("provider"),  
        "resource_type": resource.get("type")  
    }  


"""
合规规则评估逻辑:返回“Compliant” 或 “NonCompliant”。
本示例中, 当账号设置的登录会话失效时间大于30分钟, 会返回不合规, 否则返回合规。 
实现方式是调用IAM服务的接口ShowDomainLoginPolicy。 
"""  
def evaluate_compliance(token, domain_id):  
    endpoint = "https://iam.cn-north-4.myhuaweicloud.com"  
    url = "{}/v3.0/OS-SECURITYPOLICY/domains/{}/login-policy".format(endpoint, domain_id)  
    r = requests.get(  
        url=url,  
        headers={  
            "X-Auth-Token": token,  
            "User-Agent": "API Explorer",  
            "Content-Type": "application/json;charset=UTF-8"  
        },  
        verify=False,  
    )  
    session_timeout = r.json().get("login_policy", {}).get("session_timeout", 60)  
    return "NonCompliant" if session_timeout > 30 else "Compliant"  


def update_policy_state(token, domain_id, evaluation):  
    endpoint = "https://rms.myhuaweicloud.com"  
    url = "{}/v1/resource-manager/domains/{}/policy-states".format(endpoint, domain_id)  
    return requests.put(  
        url=url,  
        headers={  
            "X-Auth-Token": token  
        },  
        json=evaluation,  
        verify=False,  
    )  


def handler(event, context):  
    resource = event.get("invoking_event", {})  
    if resource.get("name") != "Account":  
        return  
    compliance_state = evaluate_compliance(context.getToken(), event.get("domain_id"))  

    requests = {  
        "policy_resource": get_policy_resource(event.get("domain_id"), resource),  
        "trigger_type": event.get("trigger_type"),  
        "compliance_state": compliance_state,  
        "policy_assignment_id": event.get("policy_assignment_id"),  
        "policy_assignment_name": event.get("policy_assignment_name"),  
        "function_urn": event.get("function_urn"),  
        "evaluation_time": event.get("evaluation_time"),  
        "evaluation_hash": event.get("evaluation_hash")  
    }  

    for retry in range(3):  
        response = update_policy_state(context.getToken(), event.get("domain_id"), requests)  
        if response.status_code == http.client.TOO_MANY_REQUESTS:  
            print("TOO_MANY_REQUESTS: retry again")  
            time.sleep(1)  
        else:  
            if response.status_code == http.client.OK:  
                print("Update policyState successfully.")  
            else:  
                print("Failed to update policyState.")  
                print(response.json())  
            break
分享:

    相关文档

    相关产品