deploy.py代码示例
代码示例
以下为自动化部署deploy.py文件的代码示例。
该示例用于自动化部署和更新华为云FunctionGraph函数,涵盖配置和代码的更新。通过解析配置文件,调用命令行工具执行更新命令,并处理加密数据的解密,同时记录日志以确保操作的可追溯性。可参考代码注释使用。
# -*-coding:utf-8 -*-
import os
import sys
import json
import logging
import subprocess
from yaml import load
from base64 import b64decode
from Crypto.Cipher import AES
# need: pip install pyyaml
try:
from yaml import CLoader as Loader, CDumper as Dumper
except ImportError:
from yaml import Loader, Dumper
logging.basicConfig(level=logging.INFO,
filename='function.log',
filemode='a',
format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')
def decrypt(json_input, key):
# We assume that the key was securely shared beforehand
try:
b64 = json.loads(json_input)
json_k = ['nonce', 'header', 'ciphertext', 'tag']
jv = {k: b64decode(b64[k]) for k in json_k}
cipher = AES.new(key.encode(), AES.MODE_GCM, nonce=jv['nonce'])
cipher.update(jv['header'])
plaintext = cipher.decrypt_and_verify(jv['ciphertext'], jv['tag'])
return plaintext.decode()
except (ValueError, KeyError) as e:
raise e
def generate_update_function_config_cmd(new_config, old_config, key):
# 函数执行入口
handler = new_config['handler']
# 函数runtime配置(必填但不支持修改)
runtime = new_config['runtime']
# 函数内存规格配置
memory_size = new_config['memorySize']
# 函数执行超时配置
timeout = new_config['timeout']
# 函数所属project_id
project_id = new_config['projectID']
# 拼装更新函数配置命令
update_cmd = f'hcloud FunctionGraph UpdateFunctionConfig' \
f' --cli-region="{region}"' \
f' --function_urn="{function_urn}"' \
f' --project_id="{project_id}"' \
f' --handler="{handler}"' \
f' --timeout={timeout}' \
f' --memory_size={memory_size}' \
f' --runtime="{runtime}"' \
f' --func_name="{function_name}"'
# 用户环境变量配置
# 更新用户环境变量为直接覆盖,如果有手动在函数界面配置环境变量没有更新到cam.yaml文件内
# 则手动添加环境变量配置则丢失
user_data = new_config.get('userData', None)
if user_data is not None:
user_date_json_str = json.dumps(user_data)
user_date_json_str = json.dumps(user_date_json_str)
update_cmd = update_cmd + f' --user_data={user_date_json_str}'
encrypted_user_data = new_config.get('encryptedUserData', None)
if encrypted_user_data is not None:
encrypted_user_data = decrypt(encrypted_user_data, key)
encrypted_user_date_json_str = json.dumps(encrypted_user_data)
update_cmd = update_cmd + \
f' --encrypted_user_data={encrypted_user_date_json_str}'
# 如果有vpc则保留
vpc_config = old_config.get('func_vpc', None)
if vpc_config is not None:
update_cmd = update_cmd + \
f' --func_vpc.vpc_name={vpc_config["vpc_name"]}' \
f' --func_vpc.vpc_id={vpc_config["vpc_id"]}' \
f' --func_vpc.subnet_id={vpc_config["subnet_id"]}' \
f' --func_vpc.cidr={vpc_config["cidr"]}' \
f' --func_vpc.subnet_name={vpc_config["subnet_name"]}' \
f' --func_vpc.gateway={vpc_config["gateway"]}'
# 如果有委托配置则保留 "xrole": "function-admin"和"app_xrole": "function-admin",
xrole_config = old_config.get('xrole', None)
if xrole_config is not None:
update_cmd = update_cmd + f' --xrole="{xrole_config}"'
app_xrole_config = old_config.get('app_xrole', None)
if app_xrole_config is not None:
update_cmd = update_cmd + f' --app_xrole="{app_xrole_config}"'
# 配置初始化入口和初始化超时时间
initializer_handler = new_config.get('initializerHandler', None)
initializer_timeout = new_config.get('initializerTimeout', None)
if initializer_handler is not None and initializer_timeout is not None:
update_cmd = update_cmd + \
f' --initializer_handler="{initializer_handler}" ' \
f'--initializer_timeout={initializer_timeout}'
# 并发配置
strategy_config = new_config.get('strategyConfig', None)
if strategy_config is not None:
concurrency = strategy_config.get('concurrency', None)
# 单实例并发数
concurrent_num = strategy_config.get('concurrentNum', None)
update_cmd = update_cmd + \
f' --strategy_config.concurrency="{concurrency}" ' \
f'--strategy_config.concurrent_num={concurrent_num}'
# 如果有磁盘挂载则保留
mount_config = old_config.get('mount_config', None)
if mount_config is not None:
mount_user = mount_config["mount_user"]
update_cmd = update_cmd + \
f' --mount_config.mount_user.user_id={mount_user["user_id"]}' \
f' --mount_config.mount_user.user_group_id={mount_user["user_group_id"]}'
func_mounts = mount_config["func_mounts"]
i = 1
for func_mount in func_mounts:
update_cmd = update_cmd + \
f' --mount_config.func_mounts.{i}.mount_resource="{func_mount["mount_resource"]}"' \
f' --mount_config.func_mounts.{i}.mount_share_path="{func_mount["mount_share_path"]}"' \
f' --mount_config.func_mounts.{i}.mount_type="{func_mount["mount_type"]}"' \
f' --mount_config.func_mounts.{i}.local_mount_path="{func_mount["local_mount_path"]}"'
i = i + 1
return update_cmd
def exec_cmd(cmd):
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
outs, _ = proc.communicate()
return outs.decode('UTF-8')
def check_result(stage, exec_result):
if "USE_ERROR" in exec_result:
error_info = f"failed to {stage}: {exec_result}"
logging.error(error_info)
raise Exception(error_info)
if "FSS.0409" in exec_result:
error_info = f"failed to {stage}: {exec_result}"
logging.error(error_info)
# 返回错误为函数代码没变不需要更新错误则返回
return
try:
result_object = json.loads(exec_result)
except Exception:
error_info = f"failed to {stage}: {exec_result}"
logging.error(error_info)
raise Exception(error_info)
if "error_code" in result_object:
error_message = result_object["error_msg"]
error_info = f"failed to {stage}: {error_message}"
logging.error(error_info)
raise Exception(error_info)
def generate_update_function_code_cmd():
cmd = \
f'hcloud FunctionGraph UpdateFunctionCode --cli-region="{region}"' \
f' --function_urn="{function_urn}" --project_id="{project_id}"' \
f' --code_url="{code_url}" --func_code.link="" --func_code.file="" --code_type="obs" '
depend_list = old_function_code.get("depend_list", None)
if depend_list is not None and len(depend_list) > 0:
i = 1
for depend_id in depend_list:
cmd = cmd + f'--depend_list.{i}="{depend_id}"'
return cmd
if __name__ == '__main__':
deploy_function_path = sys.argv[1]
key = sys.argv[2]
f = open(os.path.join(deploy_function_path, "cam.yaml"))
data = load(f, Loader=Loader)
function_config = data['components'][0]
function_name = function_config['name']
function_properties = function_config['properties']
region = function_properties['region']
code_url = function_properties['codeUri']
project_id = function_properties['projectID']
# 拼接获取函数urn
function_urn = "urn:fss:" + region + ":" + project_id + \
":function:default:" + function_name + ":latest"
logging.info(f"start to deploy functionURN:{function_urn}")
# 查询函数的配置信息
query_function_config_cmd = \
f'hcloud FunctionGraph ShowFunctionConfig --cli-region="{region}"' \
f' --function_urn="{function_urn}" --project_id="{project_id}"'
result = exec_cmd(query_function_config_cmd)
# 主要是查看函数是否有配置VPC和委托,如果有更新函数配置时需要带上,避免更新导VPC或委托配置丢失
old_function_config = json.loads(result)
check_result("query function config", result)
# 查询函数代码,主要是函数绑定依赖包信息保留
query_function_code_cmd = \
f'hcloud FunctionGraph ShowFunctionCode --cli-region="{region}"' \
f' --function_urn="{function_urn}" --project_id="{project_id}"'
result = exec_cmd(query_function_code_cmd)
old_function_code = json.loads(result)
logging.info("query function %s code result: %s", function_urn, result)
check_result("query function code", result)
# 更新函数代码
query_function_code_cmd = generate_update_function_code_cmd()
result = exec_cmd(query_function_code_cmd)
logging.info("update function %s code result: %s", function_urn, result)
check_result("update function code", result)
# 更新函数配置
update_function_config_cmd = generate_update_function_config_cmd(
function_properties, old_function_config, key)
result = exec_cmd(update_function_config_cmd)
logging.info("update function %s config result: %s", function_urn, result)
check_result("update function config", result)
logging.info(f"succeed to deploy function {function_urn}")