更新时间:2023-04-17 GMT+08:00
分享

deploy.py代码示例

# -*-coding:utf-8 -*-

import os
import sys
import json
import logging
import subprocess
from yaml import load
from base64 import b64decode
from Crypto.Cipher import AES

# need: pip install pyyaml
try:
    from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
    from yaml import Loader, Dumper

logging.basicConfig(level=logging.INFO,
                    filename='function.log',
                    filemode='a',
                    format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')


def decrypt(json_input, key):
    # We assume that the key was securely shared beforehand
    try:
        b64 = json.loads(json_input)
        json_k = ['nonce', 'header', 'ciphertext', 'tag']
        jv = {k: b64decode(b64[k]) for k in json_k}
        cipher = AES.new(key.encode(), AES.MODE_GCM, nonce=jv['nonce'])
        cipher.update(jv['header'])
        plaintext = cipher.decrypt_and_verify(jv['ciphertext'], jv['tag'])
        return plaintext.decode()
    except (ValueError, KeyError) as e:
        raise e


def generate_update_function_config_cmd(new_config, old_config, key):
    # 函数执行入口
    handler = new_config['handler']
    # 函数runtime配置(必填但不支持修改)
    runtime = new_config['runtime']
    # 函数内存规格配置
    memory_size = new_config['memorySize']
    # 函数执行超时配置
    timeout = new_config['timeout']
    # 函数所属project_id
    project_id = new_config['projectID']
    # 拼装更新函数配置命令
    update_cmd = f'hcloud FunctionGraph UpdateFunctionConfig' \
                 f' --cli-region="{region}"' \
                 f' --function_urn="{function_urn}"' \
                 f' --project_id="{project_id}"' \
                 f' --handler="{handler}"' \
                 f' --timeout={timeout}' \
                 f' --memory_size={memory_size}' \
                 f' --runtime="{runtime}"' \
                 f' --func_name="{function_name}"'

    # 用户环境变量配置
    # 更新用户环境变量为直接覆盖,如果有手动在函数界面配置环境变量没有更新到cam.yaml文件内
    # 则手动添加环境变量配置则丢失
    user_data = new_config.get('userData', None)
    if user_data is not None:
        user_date_json_str = json.dumps(user_data)
        user_date_json_str = json.dumps(user_date_json_str)
        update_cmd = update_cmd + f' --user_data={user_date_json_str}'

    encrypted_user_data = new_config.get('encryptedUserData', None)
    if encrypted_user_data is not None:
        encrypted_user_data = decrypt(encrypted_user_data, key)
        encrypted_user_date_json_str = json.dumps(encrypted_user_data)
        update_cmd = update_cmd + \
                     f' --encrypted_user_data={encrypted_user_date_json_str}'

    # 如果有vpc则保留
    vpc_config = old_config.get('func_vpc', None)
    if vpc_config is not None:
        update_cmd = update_cmd + \
                     f' --func_vpc.vpc_name={vpc_config["vpc_name"]}' \
                     f' --func_vpc.vpc_id={vpc_config["vpc_id"]}' \
                     f' --func_vpc.subnet_id={vpc_config["subnet_id"]}' \
                     f' --func_vpc.cidr={vpc_config["cidr"]}' \
                     f' --func_vpc.subnet_name={vpc_config["subnet_name"]}' \
                     f' --func_vpc.gateway={vpc_config["gateway"]}'

    # 如果有委托配置则保留 "xrole": "function-admin"和"app_xrole": "function-admin",
    xrole_config = old_config.get('xrole', None)
    if xrole_config is not None:
        update_cmd = update_cmd + f' --xrole="{xrole_config}"'

    app_xrole_config = old_config.get('app_xrole', None)
    if app_xrole_config is not None:
        update_cmd = update_cmd + f' --app_xrole="{app_xrole_config}"'

    # 配置初始化入口和初始化超时时间
    initializer_handler = new_config.get('initializerHandler', None)
    initializer_timeout = new_config.get('initializerTimeout', None)
    if initializer_handler is not None and initializer_timeout is not None:
        update_cmd = update_cmd + \
                     f' --initializer_handler="{initializer_handler}" ' \
                     f'--initializer_timeout={initializer_timeout}'

    # 并发配置
    strategy_config = new_config.get('strategyConfig', None)
    if strategy_config is not None:
        concurrency = strategy_config.get('concurrency', None)
        # 单实例并发数
        concurrent_num = strategy_config.get('concurrentNum', None)
        update_cmd = update_cmd + \
                     f' --strategy_config.concurrency="{concurrency}" ' \
                     f'--strategy_config.concurrent_num={concurrent_num}'

    # 如果有磁盘挂载则保留
    mount_config = old_config.get('mount_config', None)
    if mount_config is not None:
        mount_user = mount_config["mount_user"]
        update_cmd = update_cmd + \
                     f' --mount_config.mount_user.user_id={mount_user["user_id"]}' \
                     f' --mount_config.mount_user.user_group_id={mount_user["user_group_id"]}'
        func_mounts = mount_config["func_mounts"]
        i = 1
        for func_mount in func_mounts:
            update_cmd = update_cmd + \
                         f' --mount_config.func_mounts.{i}.mount_resource="{func_mount["mount_resource"]}"' \
                         f' --mount_config.func_mounts.{i}.mount_share_path="{func_mount["mount_share_path"]}"' \
                         f' --mount_config.func_mounts.{i}.mount_type="{func_mount["mount_type"]}"' \
                         f' --mount_config.func_mounts.{i}.local_mount_path="{func_mount["local_mount_path"]}"'
            i = i + 1

    return update_cmd


def exec_cmd(cmd):
    proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                            stderr=subprocess.STDOUT)
    outs, _ = proc.communicate()
    return outs.decode('UTF-8')


def check_result(stage, exec_result):
    if "USE_ERROR" in exec_result:
        error_info = f"failed to {stage}: {exec_result}"
        logging.error(error_info)
        raise Exception(error_info)

    if "FSS.0409" in exec_result:
        error_info = f"failed to {stage}: {exec_result}"
        logging.error(error_info)
        # 返回错误为函数代码没变不需要更新错误则返回
        return

    try:
        result_object = json.loads(exec_result)
    except Exception:
        error_info = f"failed to {stage}: {exec_result}"
        logging.error(error_info)
        raise Exception(error_info)

    if "error_code" in result_object:
        error_message = result_object["error_msg"]
        error_info = f"failed to {stage}: {error_message}"
        logging.error(error_info)
        raise Exception(error_info)


def generate_update_function_code_cmd():
    cmd = \
        f'hcloud FunctionGraph UpdateFunctionCode --cli-region="{region}"' \
        f' --function_urn="{function_urn}" --project_id="{project_id}"' \
        f' --code_url="{code_url}" --func_code.link="" --func_code.file="" --code_type="obs" '

    depend_list = old_function_code.get("depend_list", None)
    if depend_list is not None and len(depend_list) > 0:
        i = 1
        for depend_id in depend_list:
            cmd = cmd + f'--depend_list.{i}="{depend_id}"'

    return cmd


if __name__ == '__main__':
    deploy_function_path = sys.argv[1]
    key = sys.argv[2]
    f = open(os.path.join(deploy_function_path, "cam.yaml"))
    data = load(f, Loader=Loader)
    function_config = data['components'][0]
    function_name = function_config['name']
    function_properties = function_config['properties']
    region = function_properties['region']
    code_url = function_properties['codeUri']
    project_id = function_properties['projectID']
    # 拼接获取函数urn
    function_urn = "urn:fss:" + region + ":" + project_id + \
                   ":function:default:" + function_name + ":latest"
    logging.info(f"start to deploy functionURN:{function_urn}")

    # 查询函数的配置信息
    query_function_config_cmd = \
        f'hcloud FunctionGraph ShowFunctionConfig --cli-region="{region}"' \
        f' --function_urn="{function_urn}" --project_id="{project_id}"'
    result = exec_cmd(query_function_config_cmd)
    # 主要是查看函数是否有配置VPC和委托,如果有更新函数配置时需要带上,避免更新导VPC或委托配置丢失
    old_function_config = json.loads(result)
    check_result("query function config", result)

    # 查询函数代码,主要是函数绑定依赖包信息保留
    query_function_code_cmd = \
        f'hcloud FunctionGraph ShowFunctionCode --cli-region="{region}"' \
        f' --function_urn="{function_urn}" --project_id="{project_id}"'
    result = exec_cmd(query_function_code_cmd)
    old_function_code = json.loads(result)
    logging.info("query function %s code result: %s", function_urn, result)
    check_result("query function code", result)

    # 更新函数代码
    query_function_code_cmd = generate_update_function_code_cmd()
    result = exec_cmd(query_function_code_cmd)
    logging.info("update function %s code result: %s", function_urn, result)
    check_result("update function code", result)

    # 更新函数配置
    update_function_config_cmd = generate_update_function_config_cmd(
        function_properties, old_function_config, key)
    result = exec_cmd(update_function_config_cmd)
    logging.info("update function %s config result: %s", function_urn, result)
    check_result("update function config", result)

    logging.info(f"succeed to deploy function {function_urn}")

相关文档