Updated on 2025-03-13 GMT+08:00

Sample Code of deploy.py

# -*-coding:utf-8 -*-

import os
import sys
import json
import logging
import subprocess
from yaml import load
from base64 import b64decode
from Crypto.Cipher import AES

# need: pip install pyyaml
try:
    from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
    from yaml import Loader, Dumper

logging.basicConfig(level=logging.INFO,
                    filename='function.log',
                    filemode='a',
                    format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')


def decrypt(json_input, key):
    # We assume that the key was securely shared beforehand
    try:
        b64 = json.loads(json_input)
        json_k = ['nonce', 'header', 'ciphertext', 'tag']
        jv = {k: b64decode(b64[k]) for k in json_k}
        cipher = AES.new(key.encode(), AES.MODE_GCM, nonce=jv['nonce'])
        cipher.update(jv['header'])
        plaintext = cipher.decrypt_and_verify(jv['ciphertext'], jv['tag'])
        return plaintext.decode()
    except (ValueError, KeyError) as e:
        raise e


def generate_update_function_config_cmd(new_config, old_config, key):
    # Function handler
    handler = new_config['handler']
    # Runtime (required and not modifiable)
    runtime = new_config['runtime']
    # Memory
    memory_size = new_config['memorySize']
    # Timeout
    timeout = new_config['timeout']
    # Project ID
    project_id = new_config['projectID']
    # Command for updating the function configuration
    update_cmd = f'hcloud FunctionGraph UpdateFunctionConfig'
                 f' --cli-region="{region}"'
                 f' --function_urn="{function_urn}"'
                 f' --project_id="{project_id}"'
                 f' --handler="{handler}"'
                 f' --timeout={timeout}'
                 f' --memory_size={memory_size}'
                 f' --runtime="{runtime}"'
                 f' --func_name="{function_name}"'

    # Environment variables
    # Environment variables are directly overwritten. Manually configured variables that are not updated to the cam.yaml file will be lost.
 
    user_data = new_config.get('userData', None)
    if user_data is not None:
        user_date_json_str = json.dumps(user_data)
        user_date_json_str = json.dumps(user_date_json_str)
        update_cmd = update_cmd + f' --user_data={user_date_json_str}'

    encrypted_user_data = new_config.get('encryptedUserData', None)
    if encrypted_user_data is not None:
        encrypted_user_data = decrypt(encrypted_user_data, key)
        encrypted_user_date_json_str = json.dumps(encrypted_user_data)
        update_cmd = update_cmd +
                     f' --encrypted_user_data={encrypted_user_date_json_str}'

    # Keep this part if a VPC is used.
    vpc_config = old_config.get('func_vpc', None)
    if vpc_config is not None:
        update_cmd = update_cmd +
                     f' --func_vpc.vpc_name={vpc_config["vpc_name"]}'
                     f' --func_vpc.vpc_id={vpc_config["vpc_id"]}'
                     f' --func_vpc.subnet_id={vpc_config["subnet_id"]}'
                     f' --func_vpc.cidr={vpc_config["cidr"]}'
                     f' --func_vpc.subnet_name={vpc_config["subnet_name"]}'
                     f' --func_vpc.gateway={vpc_config["gateway"]}'

    # Keep "xrole": "function-admin" and "app_xrole": "function-admin" if an agency is specified.
    xrole_config = old_config.get('xrole', None)
    if xrole_config is not None:
        update_cmd = update_cmd + f' --xrole="{xrole_config}"'

    app_xrole_config = old_config.get('app_xrole', None)
    if app_xrole_config is not None:
        update_cmd = update_cmd + f' --app_xrole="{app_xrole_config}"'

    # Configure the initializer and initialization timeout.
    initializer_handler = new_config.get('initializerHandler', None)
    initializer_timeout = new_config.get('initializerTimeout', None)
    if initializer_handler is not None and initializer_timeout is not None:
        update_cmd = update_cmd +
                     f' --initializer_handler="{initializer_handler}" '
                     f'--initializer_timeout={initializer_timeout}'

    # Concurrency settings
    strategy_config = new_config.get('strategyConfig', None)
    if strategy_config is not None:
        concurrency = strategy_config.get('concurrency', None)
        # Maximum number of concurrent requests per instance
        concurrent_num = strategy_config.get('concurrentNum', None)
        update_cmd = update_cmd +
                     f' --strategy_config.concurrency="{concurrency}" '
                     f'--strategy_config.concurrent_num={concurrent_num}'

    # Keep this part if a file system is mounted to the function.
    mount_config = old_config.get('mount_config', None)
    if mount_config is not None:
        mount_user = mount_config["mount_user"]
        update_cmd = update_cmd +
                     f' --mount_config.mount_user.user_id={mount_user["user_id"]}'
                     f' --mount_config.mount_user.user_group_id={mount_user["user_group_id"]}'
        func_mounts = mount_config["func_mounts"]
        i = 1
        for func_mount in func_mounts:
            update_cmd = update_cmd +
                         f' --mount_config.func_mounts.{i}.mount_resource="{func_mount["mount_resource"]}"'
                         f' --mount_config.func_mounts.{i}.mount_share_path="{func_mount["mount_share_path"]}"'
                         f' --mount_config.func_mounts.{i}.mount_type="{func_mount["mount_type"]}"'
                         f' --mount_config.func_mounts.{i}.local_mount_path="{func_mount["local_mount_path"]}"'
            i = i + 1

    return update_cmd


def exec_cmd(cmd):
    proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)
    outs, _ = proc.communicate()
    return outs.decode('UTF-8')


def check_result(stage, exec_result):
    if "USE_ERROR" in exec_result:
        error_info = f"failed to {stage}: {exec_result}"
        logging.error(error_info)
        raise Exception(error_info)

    if "FSS.0409" in exec_result:
        error_info = f"failed to {stage}: {exec_result}"
        logging.error(error_info)
        # Return an error if the function code has no changes to update.
        return

    try:
        result_object = json.loads(exec_result)
    except Exception:
        error_info = f"failed to {stage}: {exec_result}"
        logging.error(error_info)
        raise Exception(error_info)

    if "error_code" in result_object:
        error_message = result_object["error_msg"]
        error_info = f"failed to {stage}: {error_message}"
        logging.error(error_info)
        raise Exception(error_info)


def generate_update_function_code_cmd():
    cmd =
        f'hcloud FunctionGraph UpdateFunctionCode --cli-region="{region}"'
        f' --function_urn="{function_urn}" --project_id="{project_id}"'
        f' --code_url="{code_url}" --func_code.link="" --func_code.file="" --code_type="obs" '

    depend_list = old_function_code.get("depend_list", None)
    if depend_list is not None and len(depend_list) > 0:
        i = 1
        for depend_id in depend_list:
            cmd = cmd + f'--depend_list.{i}="{depend_id}"'

    return cmd


if __name__ == '__main__':
    deploy_function_path = sys.argv[1]
    key = sys.argv[2]
    f = open(os.path.join(deploy_function_path, "cam.yaml"))
    data = load(f, Loader=Loader)
    function_config = data['components'][0]
    function_name = function_config['name']
    function_properties = function_config['properties']
    region = function_properties['region']
    code_url = function_properties['codeUri']
    project_id = function_properties['projectID']
    # Obtain the function URN.
    function_urn = "urn:fss:" + region + ":" + project_id +
                   ":function:default:" + function_name + ":latest"
    logging.info(f"start to deploy functionURN:{function_urn}")

    # Query the function configuration.
    query_function_config_cmd =
        f'hcloud FunctionGraph ShowFunctionConfig --cli-region="{region}"'
        f' --function_urn="{function_urn}" --project_id="{project_id}"'
    result = exec_cmd(query_function_config_cmd)
    # Check whether a VPC and an agency have been configured for the function. If yes, they must be included during function updates.
    old_function_config = json.loads(result)
    check_result("query function config", result)

    # Query the function code. Keep this part if a dependency is bound to the function.
    query_function_code_cmd =
        f'hcloud FunctionGraph ShowFunctionCode --cli-region="{region}"'
        f' --function_urn="{function_urn}" --project_id="{project_id}"'
    result = exec_cmd(query_function_code_cmd)
    old_function_code = json.loads(result)
    logging.info("query function %s code result: %s", function_urn, result)
    check_result("query function code", result)

    # Update the function code.
    query_function_code_cmd = generate_update_function_code_cmd()
    result = exec_cmd(query_function_code_cmd)
    logging.info("update function %s code result: %s", function_urn, result)
    check_result("update function code", result)

    # Update the function configuration.
    update_function_config_cmd = generate_update_function_config_cmd(
        function_properties, old_function_config, key)
    result = exec_cmd(update_function_config_cmd)
    logging.info("update function %s config result: %s", function_urn, result)
    check_result("update function config", result)

    logging.info(f"succeed to deploy function {function_urn}")