函数工作流 FunctionGraph
函数工作流 FunctionGraph
- 最新动态
- 功能总览
- 产品介绍
- 计费说明
- 快速入门
- 用户指南
-
最佳实践
- FunctionGraph最佳实践汇总
-
数据处理类实践
- 使用FunctionGraph函数对OBS中的图片进行压缩
- 使用FunctionGraph函数为OBS中的图片打水印
- 使用FunctionGraph函数对DIS数据进行格式转换并存储到CloudTable
- 使用FunctionGraph函数实现通过API方式上传文件
- 使用FunctionGraph函数对IoTDA中的设备坐标数据进行转换
- 使用FunctionGraph函数对OBS中的文件进行加解密
- 使用FunctionGraph函数识别LTS中的异常业务日志并存储到OBS
- 使用FunctionGraph函数对LTS中的日志进行实时过滤
- 使用FunctionGraph函数流对OBS中的图片进行旋转
- 使用FunctionGraph函数流对图片进行压缩和打水印
- 功能应用类实践
- 函数构建类实践
- 开发指南
- API参考
- SDK参考
- 场景代码示例
-
常见问题
-
产品咨询
- 使用FunctionGraph是否需要开通计算、存储、网络等服务?
- 使用FunctionGraph开发程序之后是否需要部署?
- FunctionGraph为函数分配的磁盘空间有多少?
- 是否支持在函数中启动TCP的监听端口,通过EIP接收外部发送过来的TCP请求?
- 函数发起HTTP请求的源地址如何获取?
- FunctionGraph是否支持对上传的zip文件进行反编译?
- FunctionGraph的函数是否支持功能扩展?
- FunctionGraph中的代码是如何隔离的?
- 函数常规信息中的“应用”如何理解?
- 用户需要为函数的冷启动时间付费吗?
- 函数计费中的调用次数,是某一账号下在不同region的所有函数的调用次数总和吗?
- Python语言的函数从V1版本迁移到V2版本时需注意哪些兼容性问题?
- FunctionGraph函数支持哪些编程语言?
- 创建函数
-
配置函数
- 能否在函数环境变量中存储敏感信息?
- FunctionGraph的函数如何读写上传的文件?
- 为函数挂载文件系统时,报“failed to mount exist system path”
- FunctionGraph如何实现域名解析?
- FunctionGraph如何通过域名访问专享版APIG中注册的接口?
- FunctionGraph函数通过域名访问APIG中注册的接口时,报域名无法解析?
- 使用定制运行时语言的函数能操作哪些目录?
- FunctionGraph的函数支持哪些中文字体?
- 能否在函数代码中使用线程和进程?
- 函数如何访问MySQL数据库?
- 函数无法通过VPC连接对应的Redis?
- 如何读取函数的请求头?
- Python语言的函数中,中文注释报乱码错误
-
调用函数
- FunctionGraph的函数执行需要多长时间?
- FunctionGraph的函数执行包含了哪些过程?
- FunctionGraph函数长时间不执行时,相关的实例会如何处理?
- 如何获取函数运行过程中的内存使用量信息?
- 为什么首次调用函数时速度会比较慢?
- 为什么函数实际使用内存大于预估内存,甚至触发内存溢出OOM?
- 函数执行失败返回“runtime memory limit exceeded”时,如何查看内存占用大小?
- 自定义镜像函数执行失败报“CrashLoopBackOff”
- 同步调用函数时,未收到调用响应的可能原因?
- 函数中os.system("command &")命令的执行日志未采集,应如何处理?
- 函数执行超时的可能原因有哪些?
- 使用APIG触发器调用一个返回String的FunctionGraph函数时,报500错误
- Python2.7在执行reload(sys)后无法通过print打印日志
- 运行函数时报错error while loading shared libraries时如何处理?
- 配置触发器
- 配置依赖包
-
产品咨询
- 视频帮助
- 文档下载
- 通用参考
本文导读
展开导读
链接复制成功!
deploy.py代码示例
# -*-coding:utf-8 -*- import os import sys import json import logging import subprocess from yaml import load from base64 import b64decode from Crypto.Cipher import AES # need: pip install pyyaml try: from yaml import CLoader as Loader, CDumper as Dumper except ImportError: from yaml import Loader, Dumper logging.basicConfig(level=logging.INFO, filename='function.log', filemode='a', format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s') def decrypt(json_input, key): # We assume that the key was securely shared beforehand try: b64 = json.loads(json_input) json_k = ['nonce', 'header', 'ciphertext', 'tag'] jv = {k: b64decode(b64[k]) for k in json_k} cipher = AES.new(key.encode(), AES.MODE_GCM, nonce=jv['nonce']) cipher.update(jv['header']) plaintext = cipher.decrypt_and_verify(jv['ciphertext'], jv['tag']) return plaintext.decode() except (ValueError, KeyError) as e: raise e def generate_update_function_config_cmd(new_config, old_config, key): # 函数执行入口 handler = new_config['handler'] # 函数runtime配置(必填但不支持修改) runtime = new_config['runtime'] # 函数内存规格配置 memory_size = new_config['memorySize'] # 函数执行超时配置 timeout = new_config['timeout'] # 函数所属project_id project_id = new_config['projectID'] # 拼装更新函数配置命令 update_cmd = f'hcloud FunctionGraph UpdateFunctionConfig' \ f' --cli-region="{region}"' \ f' --function_urn="{function_urn}"' \ f' --project_id="{project_id}"' \ f' --handler="{handler}"' \ f' --timeout={timeout}' \ f' --memory_size={memory_size}' \ f' --runtime="{runtime}"' \ f' --func_name="{function_name}"' # 用户环境变量配置 # 更新用户环境变量为直接覆盖,如果有手动在函数界面配置环境变量没有更新到cam.yaml文件内 # 则手动添加环境变量配置则丢失 user_data = new_config.get('userData', None) if user_data is not None: user_date_json_str = json.dumps(user_data) user_date_json_str = json.dumps(user_date_json_str) update_cmd = update_cmd + f' --user_data={user_date_json_str}' encrypted_user_data = new_config.get('encryptedUserData', None) if encrypted_user_data is not None: encrypted_user_data = decrypt(encrypted_user_data, key) encrypted_user_date_json_str = json.dumps(encrypted_user_data) update_cmd = update_cmd + \ f' --encrypted_user_data={encrypted_user_date_json_str}' # 如果有vpc则保留 vpc_config = old_config.get('func_vpc', None) if vpc_config is not None: update_cmd = update_cmd + \ f' --func_vpc.vpc_name={vpc_config["vpc_name"]}' \ f' --func_vpc.vpc_id={vpc_config["vpc_id"]}' \ f' --func_vpc.subnet_id={vpc_config["subnet_id"]}' \ f' --func_vpc.cidr={vpc_config["cidr"]}' \ f' --func_vpc.subnet_name={vpc_config["subnet_name"]}' \ f' --func_vpc.gateway={vpc_config["gateway"]}' # 如果有委托配置则保留 "xrole": "function-admin"和"app_xrole": "function-admin", xrole_config = old_config.get('xrole', None) if xrole_config is not None: update_cmd = update_cmd + f' --xrole="{xrole_config}"' app_xrole_config = old_config.get('app_xrole', None) if app_xrole_config is not None: update_cmd = update_cmd + f' --app_xrole="{app_xrole_config}"' # 配置初始化入口和初始化超时时间 initializer_handler = new_config.get('initializerHandler', None) initializer_timeout = new_config.get('initializerTimeout', None) if initializer_handler is not None and initializer_timeout is not None: update_cmd = update_cmd + \ f' --initializer_handler="{initializer_handler}" ' \ f'--initializer_timeout={initializer_timeout}' # 并发配置 strategy_config = new_config.get('strategyConfig', None) if strategy_config is not None: concurrency = strategy_config.get('concurrency', None) # 单实例并发数 concurrent_num = strategy_config.get('concurrentNum', None) update_cmd = update_cmd + \ f' --strategy_config.concurrency="{concurrency}" ' \ f'--strategy_config.concurrent_num={concurrent_num}' # 如果有磁盘挂载则保留 mount_config = old_config.get('mount_config', None) if mount_config is not None: mount_user = mount_config["mount_user"] update_cmd = update_cmd + \ f' --mount_config.mount_user.user_id={mount_user["user_id"]}' \ f' --mount_config.mount_user.user_group_id={mount_user["user_group_id"]}' func_mounts = mount_config["func_mounts"] i = 1 for func_mount in func_mounts: update_cmd = update_cmd + \ f' --mount_config.func_mounts.{i}.mount_resource="{func_mount["mount_resource"]}"' \ f' --mount_config.func_mounts.{i}.mount_share_path="{func_mount["mount_share_path"]}"' \ f' --mount_config.func_mounts.{i}.mount_type="{func_mount["mount_type"]}"' \ f' --mount_config.func_mounts.{i}.local_mount_path="{func_mount["local_mount_path"]}"' i = i + 1 return update_cmd def exec_cmd(cmd): proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) outs, _ = proc.communicate() return outs.decode('UTF-8') def check_result(stage, exec_result): if "USE_ERROR" in exec_result: error_info = f"failed to {stage}: {exec_result}" logging.error(error_info) raise Exception(error_info) if "FSS.0409" in exec_result: error_info = f"failed to {stage}: {exec_result}" logging.error(error_info) # 返回错误为函数代码没变不需要更新错误则返回 return try: result_object = json.loads(exec_result) except Exception: error_info = f"failed to {stage}: {exec_result}" logging.error(error_info) raise Exception(error_info) if "error_code" in result_object: error_message = result_object["error_msg"] error_info = f"failed to {stage}: {error_message}" logging.error(error_info) raise Exception(error_info) def generate_update_function_code_cmd(): cmd = \ f'hcloud FunctionGraph UpdateFunctionCode --cli-region="{region}"' \ f' --function_urn="{function_urn}" --project_id="{project_id}"' \ f' --code_url="{code_url}" --func_code.link="" --func_code.file="" --code_type="obs" ' depend_list = old_function_code.get("depend_list", None) if depend_list is not None and len(depend_list) > 0: i = 1 for depend_id in depend_list: cmd = cmd + f'--depend_list.{i}="{depend_id}"' return cmd if __name__ == '__main__': deploy_function_path = sys.argv[1] key = sys.argv[2] f = open(os.path.join(deploy_function_path, "cam.yaml")) data = load(f, Loader=Loader) function_config = data['components'][0] function_name = function_config['name'] function_properties = function_config['properties'] region = function_properties['region'] code_url = function_properties['codeUri'] project_id = function_properties['projectID'] # 拼接获取函数urn function_urn = "urn:fss:" + region + ":" + project_id + \ ":function:default:" + function_name + ":latest" logging.info(f"start to deploy functionURN:{function_urn}") # 查询函数的配置信息 query_function_config_cmd = \ f'hcloud FunctionGraph ShowFunctionConfig --cli-region="{region}"' \ f' --function_urn="{function_urn}" --project_id="{project_id}"' result = exec_cmd(query_function_config_cmd) # 主要是查看函数是否有配置VPC和委托,如果有更新函数配置时需要带上,避免更新导VPC或委托配置丢失 old_function_config = json.loads(result) check_result("query function config", result) # 查询函数代码,主要是函数绑定依赖包信息保留 query_function_code_cmd = \ f'hcloud FunctionGraph ShowFunctionCode --cli-region="{region}"' \ f' --function_urn="{function_urn}" --project_id="{project_id}"' result = exec_cmd(query_function_code_cmd) old_function_code = json.loads(result) logging.info("query function %s code result: %s", function_urn, result) check_result("query function code", result) # 更新函数代码 query_function_code_cmd = generate_update_function_code_cmd() result = exec_cmd(query_function_code_cmd) logging.info("update function %s code result: %s", function_urn, result) check_result("update function code", result) # 更新函数配置 update_function_config_cmd = generate_update_function_config_cmd( function_properties, old_function_config, key) result = exec_cmd(update_function_config_cmd) logging.info("update function %s config result: %s", function_urn, result) check_result("update function config", result) logging.info(f"succeed to deploy function {function_urn}")
父主题: 自动化部署