使用函数工作流自动获取并更新ECS服务器证书
应用场景
本文以表 示例信息所示为例介绍如何通过使用函数工作流自动获取并更新ECS服务器证书。对于续费签发的新证书,无需手动重新部署,即可更新ECS服务器证书。
约束与限制
- 已开通弹性云服务器(Elastic Cloud Server,ECS),且在ECS中配置了SSL证书。
- SSL证书为云证书管理服务中购买且续费的证书。
步骤一:创建委托
使用函数工作流更新ECS服务器证书需要将SCM FullAccess、IAM ReadOnlyAccess权限授权给函数工作流服务。
- 登录管理控制台。
- 单击页面左上方的,选择 ,进入统一身份认证服务界面。
- 在左侧导航栏选择“委托”,并在委托界面右上角单击“创建委托”,进入创建委托界面。
- 在创建委托界面,按表 创建云服务委托参数说明所示设置委托信息,如图 创建云服务委托所示。
- 单击“下一步”,进入委托授权界面。
- 选择并勾选需要授权函数工作流的“SCM FullAccess”、“IAM ReadOnlyAccess”权限。
图2 选择权限
- 单击“下一步”,设置权限的作用范围。
- 单击“确定”,委托创建成功。
步骤二:使用空白模板创建函数
- 登录管理控制台。
- 单击页面左上方的,选择 ,进入函数工作流界面。
- 单击函数工作流界面右上方的“创建函数”,进入创建函数界面。
- 按表 创建空白事件函数参数配置所示信息创建空白函数,如图 创建空白事件函数所示。
- 单击“创建函数”,跳转至函数界面,创建空白函数成功。
步骤三:创建定时触发器
创建定时触发器,在固定时间间隔触发函数。
步骤四:制作并配置函数依赖包
部署证书至ECS的函数代码需要依赖paramiko依赖包,您需要为函数制作并配置paramiko依赖包。
本小节以Python 3.9为例介绍制作和配置依赖包的方法。其他代码编辑语言制作依赖包的方法请参见如何制作依赖包。
为Python制作依赖包
- 打包环境中的Python版本要和对应函数的运行时版本相同。
如Python 3.9建议使用3.9.0及以上版本,Python2.7建议使用2.7.12及以上版本,Python3.6建议使用3.6.3以上版本。
- 执行如下命令,为Python 3.9安装paramiko依赖包,并指定此依赖包的安装路径为本地的/tmp/paramiko下。
pip install paramiko --root /tmp/paramiko
- 执行如下命令切换到/tmp/paramiko下。
cd /tmp/paramiko/
- 进入子目录直到site-packages路径下(一般路径为usr/lib64/python3.9/site-packages/),并执行如下命令。
zip -rq paramiko.zip *
所生成的包即为最终需要的依赖包。
如果需要安装存放在的本地wheel安装包,直接执行如下命令:
pip install piexif-1.1.0b0-py2.py3-none-any.whl --root /tmp/piexif //安装包名称以piexif-1.1.0b0-py2.py3-none-any.whl为例,请以实际安装包名称为准
配置依赖包
步骤五:在函数中配置代码源
在函数中配置代码源,本节以在线编辑的方式为例。更多创建代码源的方式请参见创建程序包。
- 在函数工作流界面左侧导航栏选择 ,进入函数列表界面。
- 单击函数名称,进入函数详情界面。
- 选择 ,进入环境变量页签,
- 单击“添加环境变量”,如图 设置环境变量所示,添加“endpoint”、“region”两个环境变量。
环境变量1:
- 键:endpoint
- 值:scm.cn-north-4.myhuaweicloud.com
环境变量2:
- 键:region
- 值:cn-north-4
- 单击“保存”,选择代码页签。
- 在代码页签,如图 添加代码所示,将以下两段代码整合添加到一个代码源文件。
获取当前账号下SSL证书的应用程序代码示例如下:
import json import requests import datetime import time def getCertHeaders(context): return { 'Content-Type': 'application/json', 'region': context.getUserData("region"), 'X-Language': 'zh-cn', 'X-Auth-Token': context.getToken() } def isValidCert(cert): # TODO 用户可根据业务场景自定义 以下仅示例 certDomain = cert.get('domain') # 判断是否是对应域名的续费证书 if (certDomain != 'XXXX'): return False # 以下实例筛选出签发时间为昨天并且域名符合要求的证书 currentTime = time.localtime() currentTimeStr = str(currentTime[0]) + ',' + str(currentTime[1]) + ',' + str(currentTime[2]) certTime = datetime.datetime.strptime(cert.get('expire_time'), '%Y-%m-%d %H:%M:%S.%f') # 获取证书签发时间 certTimeStr = str(certTime.year - int(cert.get('validity_period')/12)) + ',' + str(certTime.month) + ',' + str(certTime.day - 1) return currentTimeStr == certTimeStr def getCertList(context): preUrl = 'https://' + context.getUserData("endpoint") url = preUrl + '/v3/scm/certificates?order_status=ISSUED&content=&sort_key=certUpdateTime&sort_dir=DESC&limit=&enterprise_project_id=' certHeaders = getCertHeaders(context) rep = requests.get(url, headers = certHeaders) totalCount = json.loads(rep.text).get('total_count') discuss = int(totalCount/10) reminder = totalCount-discuss*10 rep = [] for i in range(discuss): tempUrl = url + '&offset=' + str(10*i) tempRep = requests.get(tempUrl, headers = certHeaders) for cert in json.loads(tempRep.text).get('certificates'): if(isValidCert(cert)): rep.append(cert) if reminder > 0: tempUrl = url + '&offset=' + str(totalCount-reminder) tempRep = requests.get(tempUrl, headers = certHeaders) for cert in json.loads(tempRep.text).get('certificates'): if(isValidCert(cert)): rep.append(cert) return json.dumps(rep) def exportCert(context, certId): preUrl = 'https://' + context.getUserData("endpoint") url = '/v3/scm/certificates/'+ certId + '/export' rep = requests.post(preUrl + url, headers = getCertHeaders(context)) os.makedirs("/tmp/" + certId) entireCertificate = json.loads(rep.text).get('entire_certificate') entireCertFileName = '/tmp/' + certId + '/certificate.pem' certFile = open(entireCertFileName,'w') certFile.write(entireCertificate) privateKey = json.loads(rep.text).get('private_key') privateKeyFileName = '/tmp/' + certId + '/privateKey.key' keyFile = open(privateKeyFileName,'w') keyFile.write(privateKey) def handler (event, context): # TODO 需基于业务背景结合函数调用 以下仅示例 totalRep = getCertList(context) certList = json.loads(totalRep) certIdList = [] for cert in certList: exportCert(context, cert.get("id")) certIdList.append(cert.get("id")) for cert in certList: deploy('**.***.*', 22, 'root', '*.', '/tmp', '/tmp', certIdList)
部署SSL证书到弹性服务器(Nginx)的应用程序代码示例如下:
import os import paramiko import time def isExists(path, function): path = path.replace("\\","/") try: function(path) except Exception as error: return False else: return True def copy(ssh, sftp, local, remote): if isExists(remote, function=sftp.chdir): filename = os.path.basename(os.path.normpath(local)) remote = os.path.join(remote, filename).replace("\\","/") if os.path.isdir(local): isExists(remote, function=sftp.mkdir) for file in os.listdir(local): localfile = os.path.join(local, file).replace("\\","/") copy(ssh=ssh, sftp=sftp, local=localfile, remote=remote) if os.path.isfile(local): try: ssh.exec_command("rm -rf %s"%(remote)) sftp.put(local,remote) except Exception as error: print('put:', local, "==>",remote, 'FAILED') else: print('put:', local, "==>",remote, 'success') def deploy(ip, port, username, password, local, remote, certIdList): transport = paramiko.Transport((ip,port)) transport.connect(username=username, password=password) ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh._transport = transport ftp_client = paramiko.SFTPClient.from_transport(transport) for certId in certIdList: copy(ssh=ssh, sftp=ftp_client, local=local + '/' + certId, remote=remote) # 前提证书位置已经写入nginx.conf文件 cmd="/usr/local/nginx/sbin/nginx -s reload" stdin,stdout,stderr = ssh.exec_command(cmd) ssh.close()
- 以上两段代码为示例,请根据您的实际情况修改后使用。代码中的“XXXX”表示域名,请改为您的实际域名。
- 以上代码中涉及的如下函数需要根据您的业务背景编辑代码且在做代码源合并时需要放在所有代码结尾部分。
def handler (event, context): # TODO 需基于业务背景结合函数调用
- 单击“测试”,测试函数,确认函数能正常执行。
测试函数的详细操作请参见在线调试。
- 代码源添加并测试完成后,函数会根据定时触发器设置的触发规则运行,如有续费证书签发会被自动获取并更新至ECS。
- 您可以在函数详情页选择
,进入监控指标页签,查看函数运行情况。
可以查看到“调用次数”、“运行时间”、“错误次数”和“被拒绝次数”等指标。有关监控更详细的说明请参见函数监控。