如何使用凭据管理服务自动轮转安全密码
本文介绍如何通过函数工作流和凭据管理服务,定期生成和轮转强安全密码,以满足用户安全合规的密码生成、托管、以及定期自动轮换的要求。
使用流程
流程说明如下:
- 定时触发器到期后,会发布定时触发事件。
- 函数工作流接收到事件后,会生成新的随机密码,替换凭据模板内容中的占位符,随后将替换后的内容作为新版本存入凭据中。
- 应用程序定期通过调用API/SDK获取最新凭据版本。
- 凭据管理服务检索并解密凭据密文,将凭据中保存的信息通过凭据管理API安全地返回到应用程序中。
- 应用程序获取到解密后的凭据明文信息,使用新密码更新目标对象(数据库、服务器等),使新密码生效,后续使用新密码对目标对象进行访问。
约束条件
- 区域已上线凭据管理服务(CSMS)。
- 区域已上线函数工作流服务(FunctionGraph)。
创建委托
- 登录管理控制台。
- 单击页面左侧的,选择 ,进入 界面。
- 在左侧导航栏选择“委托”, 进入委托页面。
- 单击“创建委托”,进入“创建委托”页面,如图 创建委托所示,填写参数,参数说明如表 创建委托参数说明所示。
- 单击“下一步”,进入委托授权界面。
- 勾选需要授权函数工作流的“CSMS FullAccess”、“KMS CMKFullAccess”权限。
图3 选择授权权限
- 单击“下一步”,根据业务需要选择授权范围。
图4 选择授权范围
- 单击“确定”,委托创建成功。
创建密码轮转函数
- 登录管理控制台。
- 单击页面左侧的,选择 ,进入 界面。
- 单击页面右上角的“创建函数”,进入创建函数页面,如图 创建函数所示,填写参数,参数说明如表 基本信息参数配置所示。
表2 基本信息参数配置 参数
配置说明
区域
选择函数部署的区域。
项目
选择函数部署的项目。
函数名称
自定义函数名称。
委托名称
选择创建委托的委托名称。
企业项目
如果您已开通企业项目,选择需要添加函数的企业项目即可。
如果您未开通企业项目,控制台默认不显示“企业项目”,可直接跳过该参数。如需开通企业项目,请参考如何开通企业项目/企业多账号。
运行时
选择用来编写函数的语言,当前支持使用Python进行代码配置。
说明:仅支持使用Python 3.6、3.9、3.10版本。
- 单击“创建函数”,进入函数配置界面。
- 单击“设置”页签,在左侧导航栏单击“环境变量”。单击“添加环境变量”,在变量配置行添加环境变量,参数说明如表 基本信息参数配置所示。添加完成后单击“保存”。
表3 环境变量配置 参数
配置说明
示例
region
项目名称,比如北京四对应:cn-north-4。单击页面右上角用户名,在下拉框中选择我的凭证,即可查看region信息。
cn-north-4
secret_name
待轮转的凭据名称。
说明:需提前完成创建凭据操作。具体请参见创建凭据。
rds-functionGraph-rotate
secret_content
指定的凭据模板内容,需用{}指定,比如:{"password":"password_placeholder"}其中password_placeholder为占位符,函数执行后会将password_placeholder替换为生成的安全密码,并将替换后的内容整体存入凭据中。
说明:当凭据模板内容中存在多个占位符时,会生成多个密码并依次进行替换。
{"password":"password_placeholder"}
password_length
密码长度,可选范围8-128,默认为16。
16
password_format
密码格式,共支持四种,可选,默认为2:
1. 包含数字和字母;
2. 包含数字、字母和特殊字符(~!@#%^*-_=+?);
3. 只包含数字;
4. 只包含字母。
2
- 单击“代码”页签,并在编辑窗口中添加如下密码轮转函数,完成后单击“部署”。
# -*- coding:utf-8 -*- import json import secrets import string import requests import inspect def handler (event, context): global secret_content global password_length global password_format global kms_endpoint global region global secret_name global headers region = context.getUserData('region') secret_name = context.getUserData('secret_name') password_length = 16 if context.getUserData('password_length') is None else int(context.getUserData('password_length')) password_format = 2 if context.getUserData('password_format') is None else int(context.getUserData('password_format')) secret_content = context.getUserData('secret_content') headers = { 'Content-Type': 'application/json', 'x-Auth-Token': context.getToken() } try: new_content = replace_old_content(secret_content) # check region, if pass, return kms endpoint kms_endpoint = check_region(region) return update(context, new_content) except Exception as e: print("ERROR: %s" % e) return 'FAILED' # replace "password_placeholder" in secret_content by new password def replace_old_content(content): while content.find("password_placeholder") != -1: password = generate_password() while password.find("password_placeholder") != -1: password = generate_password() content = content.replace("password_placeholder", password, 1) return content def generate_password(): special_chars = "~!@#%^*-_=+?" # password format(default is 2): # 1.support letters and digits; 2.support letters, digits and special chars(~!@#%^*-_=+?); # 3.only support digits; 4.only support letters format_mapping = { 1: string.ascii_letters + string.digits, 2: string.ascii_letters + string.digits + special_chars, 3: string.digits, 4: string.ascii_letters } if password_length < 8 or password_length > 128: raise Exception("invalid password_length: %s, the password length range must be between 8-128." % password_length) try: support_chars = format_mapping[password_format] password = ''.join([secrets.choice(support_chars) for _ in range(password_length)]) return password except: raise Exception("invalid password_format: %s." % password_format) def check_region(region): endpoint_mapping = { 'cn-north-1': 'cn-north-1.myhuaweicloud.com', 'cn-north-2': 'cn-north-2.myhuaweicloud.com', 'cn-north-4': 'cn-north-4.myhuaweicloud.com', 'cn-north-7': 'cn-north-7.myhuaweicloud.com', 'cn-north-9': 'cn-north-9.myhuaweicloud.com', 'cn-east-2': 'cn-east-2.myhuaweicloud.com', 'cn-east-3': 'cn-east-3.myhuaweicloud.com', 'cn-south-1': 'cn-south-1.myhuaweicloud.com', 'cn-south-2': 'cn-south-2.myhuaweicloud.com', 'cn-southwest-2': 'cn-southwest-2.myhuaweicloud.com', 'ap-southeast-1': 'ap-southeast-1.myhuaweicloud.com', 'ap-southeast-2': 'ap-southeast-2.myhuaweicloud.com', 'ap-southeast-3': 'ap-southeast-3.myhuaweicloud.com', 'af-south-1': 'af-south-1.myhuaweicloud.com', 'la-north-2': 'la-north-2.myhuaweicloud.com', 'la-south-2': 'la-south-2.myhuaweicloud.com', 'na-mexico-1': 'na-mexico-1.myhuaweicloud.com', 'sa-brazil-1': 'sa-brazil-1.myhuaweicloud.com' } try: endpoint = endpoint_mapping[region] kms_endpoint = '%s.%s' % ('kms', endpoint) return kms_endpoint except: raise Exception("invalid region: %s" % region) def check_csms_resp(resp): if resp.status_code in (200, 201, 204): return caller_function_name = inspect.stack()[1].function json_resp = json.loads(resp.text) if 'error_msg' in json_resp: error_message = 'function:%s , reason: %s' % ( caller_function_name, json_resp['error_msg']) raise Exception(error_message) error_message = 'function:%s , reason: %s' % ( caller_function_name, resp.text) raise Exception(error_message) def update(context, new_content): project_id = context.getProjectID() url = 'https://%s/v1/%s/secrets/%s/versions' % (kms_endpoint, project_id, secret_name) payload = {'secret_string': new_content} payload = json.dumps(payload) resp = requests.post(url, headers=headers, data=payload) check_csms_resp(resp) return 'SUCCESS'
调试
需对创建的函数工作流进行调试,具体请参见在线调试。
创建触发器
需创建触发器,具体请参见使用定时触发器。