Updated on 2025-12-10 GMT+08:00

Sample Code of deploy.py

Sample Code

The following is a sample code of deploy.py for automated deployment.

This example is used for automating the deployment and update of Huawei Cloud FunctionGraph functions, covering both configuration and code updates. The script parses configuration files, runs update commands via the CLI, decrypts encrypted data, and records logs for traceability. For details, see the code comments.

# -*-coding:utf-8 -*-

import os
import sys
import json
import logging
import subprocess
from yaml import load
from base64 import b64decode
from Crypto.Cipher import AES

# need: pip install pyyaml
try:
    from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
    from yaml import Loader, Dumper

logging.basicConfig(level=logging.INFO,
                    filename='function.log',
                    filemode='a',
                    format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')


def decrypt(json_input, key):
    # We assume that the key was securely shared beforehand
    try:
        b64 = json.loads(json_input)
        json_k = ['nonce', 'header', 'ciphertext', 'tag']
        jv = {k: b64decode(b64[k]) for k in json_k}
        cipher = AES.new(key.encode(), AES.MODE_GCM, nonce=jv['nonce'])
        cipher.update(jv['header'])
        plaintext = cipher.decrypt_and_verify(jv['ciphertext'], jv['tag'])
        return plaintext.decode()
    except (ValueError, KeyError) as e:
        raise e


def generate_update_function_config_cmd(new_config, old_config, key):
    # Function handler
    handler = new_config['handler']
    # Runtime (required and not modifiable)
    runtime = new_config['runtime']
    # Memory
    memory_size = new_config['memorySize']
    # Timeout
    timeout = new_config['timeout']
    # Project ID
    project_id = new_config['projectID']
    # Command for updating the function configuration
    update_cmd = f'hcloud FunctionGraph UpdateFunctionConfig' \
                 f' --cli-region="{region}"' \
                 f' --function_urn="{function_urn}"' \
                 f' --project_id="{project_id}"' \
                 f' --handler="{handler}"' \
                 f' --timeout={timeout}' \
                 f' --memory_size={memory_size}' \
                 f' --runtime="{runtime}"' \
                 f' --func_name="{function_name}"'

    # Environment variables
    # Environment variables are directly overwritten. Manually configured variables that are not updated to the cam.yaml file will be lost.
 
    user_data = new_config.get('userData', None)
    if user_data is not None:
        user_date_json_str = json.dumps(user_data)
        user_date_json_str = json.dumps(user_date_json_str)
        update_cmd = update_cmd + f' --user_data={user_date_json_str}'

    encrypted_user_data = new_config.get('encryptedUserData', None)
    if encrypted_user_data is not None:
        encrypted_user_data = decrypt(encrypted_user_data, key)
        encrypted_user_date_json_str = json.dumps(encrypted_user_data)
        update_cmd = update_cmd + \
                     f' --encrypted_user_data={encrypted_user_date_json_str}'

    # Keep this part if a VPC is used.
    vpc_config = old_config.get('func_vpc', None)
    if vpc_config is not None:
        update_cmd = update_cmd + \
                     f' --func_vpc.vpc_name={vpc_config["vpc_name"]}' \
                     f' --func_vpc.vpc_id={vpc_config["vpc_id"]}' \
                     f' --func_vpc.subnet_id={vpc_config["subnet_id"]}' \
                     f' --func_vpc.cidr={vpc_config["cidr"]}' \
                     f' --func_vpc.subnet_name={vpc_config["subnet_name"]}' \
                     f' --func_vpc.gateway={vpc_config["gateway"]}'

    # Keep "xrole": "function-admin" and "app_xrole": "function-admin" if an agency is specified.
    xrole_config = old_config.get('xrole', None)
    if xrole_config is not None:
        update_cmd = update_cmd + f' --xrole="{xrole_config}"'

    app_xrole_config = old_config.get('app_xrole', None)
    if app_xrole_config is not None:
        update_cmd = update_cmd + f' --app_xrole="{app_xrole_config}"'

    # Configure the initializer and initialization timeout.
    initializer_handler = new_config.get('initializerHandler', None)
    initializer_timeout = new_config.get('initializerTimeout', None)
    if initializer_handler is not None and initializer_timeout is not None:
        update_cmd = update_cmd + \
                     f' --initializer_handler="{initializer_handler}" ' \
                     f'--initializer_timeout={initializer_timeout}'

    # Concurrency settings
    strategy_config = new_config.get('strategyConfig', None)
    if strategy_config is not None:
        concurrency = strategy_config.get('concurrency', None)
        # Maximum number of concurrent requests per instance
        concurrent_num = strategy_config.get('concurrentNum', None)
        update_cmd = update_cmd + \
                     f' --strategy_config.concurrency="{concurrency}" ' \
                     f'--strategy_config.concurrent_num={concurrent_num}'

    # Keep this part if a file system is mounted to the function.
    mount_config = old_config.get('mount_config', None)
    if mount_config is not None:
        mount_user = mount_config["mount_user"]
        update_cmd = update_cmd + \
                     f' --mount_config.mount_user.user_id={mount_user["user_id"]}' \
                     f' --mount_config.mount_user.user_group_id={mount_user["user_group_id"]}'
        func_mounts = mount_config["func_mounts"]
        i = 1
        for func_mount in func_mounts:
            update_cmd = update_cmd + \
                         f' --mount_config.func_mounts.{i}.mount_resource="{func_mount["mount_resource"]}"' \
                         f' --mount_config.func_mounts.{i}.mount_share_path="{func_mount["mount_share_path"]}"' \
                         f' --mount_config.func_mounts.{i}.mount_type="{func_mount["mount_type"]}"' \
                         f' --mount_config.func_mounts.{i}.local_mount_path="{func_mount["local_mount_path"]}"'
            i = i + 1

    return update_cmd


def exec_cmd(cmd):
    proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)
    outs, _ = proc.communicate()
    return outs.decode('UTF-8')


def check_result(stage, exec_result):
    if "USE_ERROR" in exec_result:
        error_info = f"failed to {stage}: {exec_result}"
        logging.error(error_info)
        raise Exception(error_info)

    if "FSS.0409" in exec_result:
        error_info = f"failed to {stage}: {exec_result}"
        logging.error(error_info)
        # Return an error if the function code has no changes to update.
        return

    try:
        result_object = json.loads(exec_result)
    except Exception:
        error_info = f"failed to {stage}: {exec_result}"
        logging.error(error_info)
        raise Exception(error_info)

    if "error_code" in result_object:
        error_message = result_object["error_msg"]
        error_info = f"failed to {stage}: {error_message}"
        logging.error(error_info)
        raise Exception(error_info)


def generate_update_function_code_cmd():
    cmd = \
        f'hcloud FunctionGraph UpdateFunctionCode --cli-region="{region}"' \
        f' --function_urn="{function_urn}" --project_id="{project_id}"' \
        f' --code_url="{code_url}" --func_code.link="" --func_code.file="" --code_type="obs" '

    depend_list = old_function_code.get("depend_list", None)
    if depend_list is not None and len(depend_list) > 0:
        i = 1
        for depend_id in depend_list:
            cmd = cmd + f'--depend_list.{i}="{depend_id}"'

    return cmd


if __name__ == '__main__':
    deploy_function_path = sys.argv[1]
    key = sys.argv[2]
    f = open(os.path.join(deploy_function_path, "cam.yaml"))
    data = load(f, Loader=Loader)
    function_config = data['components'][0]
    function_name = function_config['name']
    function_properties = function_config['properties']
    region = function_properties['region']
    code_url = function_properties['codeUri']
    project_id = function_properties['projectID']
    # Obtain the function URN.
    function_urn = "urn:fss:" + region + ":" + project_id + \
                   ":function:default:" + function_name + ":latest"
    logging.info(f"start to deploy functionURN:{function_urn}")

    # Query the function configuration.
    query_function_config_cmd = \
        f'hcloud FunctionGraph ShowFunctionConfig --cli-region="{region}"' \
        f' --function_urn="{function_urn}" --project_id="{project_id}"'
    result = exec_cmd(query_function_config_cmd)
    # Check whether a VPC and an agency have been configured for the function. If yes, they must be included during function updates.
    old_function_config = json.loads(result)
    check_result("query function config", result)

    # Query the function code. Keep this part if a dependency is bound to the function.
    query_function_code_cmd = \
        f'hcloud FunctionGraph ShowFunctionCode --cli-region="{region}"' \
        f' --function_urn="{function_urn}" --project_id="{project_id}"'
    result = exec_cmd(query_function_code_cmd)
    old_function_code = json.loads(result)
    logging.info("query function %s code result: %s", function_urn, result)
    check_result("query function code", result)

    # Update the function code.
    query_function_code_cmd = generate_update_function_code_cmd()
    result = exec_cmd(query_function_code_cmd)
    logging.info("update function %s code result: %s", function_urn, result)
    check_result("update function code", result)

    # Update the function configuration.
    update_function_config_cmd = generate_update_function_config_cmd(
        function_properties, old_function_config, key)
    result = exec_cmd(update_function_config_cmd)
    logging.info("update function %s config result: %s", function_urn, result)
    check_result("update function config", result)

    logging.info(f"succeed to deploy function {function_urn}")