通过命令python .\main.py执行main.py文件,开始备份数据恢复主流程。main.py示例如下:
import requests
import json
import time
import config
class IamLogin:
"""
login by iam, return token
"""
def __init__(self, iamDomain: str, iamUser: str, iamPassword: str, projectName: str):
self.iamDomain = iamDomain
self.iamUser = iamUser
self.iamPassword = iamPassword
self.projectName = projectName
self.iamTokenUrl = "https://iam.{}.myhuaweicloud.com/v3/auth/tokens".format(self.projectName)
def get_auth_token(self) -> dict:
"""
return token
"""
payload = {
"auth": {
"identity": {
"methods": [
"password"
],
"password": {
"user": {
"domain": {
"name": self.iamDomain
},
"name": self.iamUser,
"password": self.iamPassword
}
}
},
"scope": {
"project": {
"name": self.projectName
}
}
}
}
json_data = json.dumps(payload)
try:
response = requests.post(self.iamTokenUrl,
data=json_data,
headers={'Content-Type': 'application/json'},
verify=False)
# 检查请求是否成功
response.raise_for_status()
# 获取响应头的token
access_token = response.headers.get("X-Subject-Token")
return {
"token": access_token,
"code": 0
}
except requests.exceptions.RequestException as e:
print("get token error: ", e)
return {
"code": 1,
"msg": "get token error"
}
class GetBackup:
"""
获取Backup信息
"""
def __init__(self, token, projectName, projectId, backupId) -> None:
self.token = token
self.header = {
'Content-Type': 'application/json', 'X-Auth-Token': self.token
}
self.projectName = projectName
self.projectId = projectId
self.backupId = backupId
def get_backup(self) -> dict:
"""
根据id查询备份数据
"""
print("get backup data")
self.getBackupDataUrl = "https://cbr.{}.myhuaweicloud.com/v3/{}/backups/{}".format(self.projectName,
self.projectId,
self.backupId)
try:
response = requests.get(url=self.getBackupDataUrl, headers=self.header, verify=False)
# 检查请求是否成功
response.raise_for_status()
# 获取响应数据
result = json.loads(str(response.content, encoding="utf-8"))
# 解析响应数据
return {
"code": 0,
"backups": result
}
except requests.exceptions.RequestException as e:
print("get backup data error: ", e)
return {
"code": 1,
"msg": "get backup data error"
}
class CreateServer:
"""
创建服务器
"""
def __init__(self, token, projectName, projectId, backupId, backups, imageRef, flavorRef, vpcId,
subnetId, enterpriseProjectId, securityGroups, keyName) -> None:
self.token = token
self.header = {
'Content-Type': 'application/json', 'X-Auth-Token': self.token
}
self.projectName = projectName
self.projectId = projectId
self.backupId = backupId
self.backups = backups
self.imageRef = imageRef
self.flavorRef = flavorRef
self.vpcId = vpcId
self.subnetId = subnetId
self.enterpriseProjectId = enterpriseProjectId
self.securityGroups = securityGroups
self.keyName = keyName
def transfor_metadata_to_server(self) -> None:
"""
构造server参数,返回backup_volumes_attached
"""
print("transfor metadata to server")
backup = self.backups["backup"]
root_volume, data_volumes = handle_backup_volumes(backup["children"])
# 构建创建服务器的参数
payload = {
"server": {
"name": backup["name"],
"imageRef": self.imageRef,
"root_volume": root_volume,
"data_volumes": data_volumes,
"flavorRef": self.flavorRef,
"vpcid": self.vpcId,
"security_groups": self.securityGroups,
"nics": [{
"subnet_id": self.subnetId
}
],
"key_name": self.keyName,
"count": 1,
"extendparam": {
"enterprise_project_id": self.enterpriseProjectId
}
}
}
self.payload = payload
print(payload)
def create_server(self) -> dict:
print("create server")
self.createServerUrl = "https://ecs.{}.myhuaweicloud.com/v1/{}/cloudservers".format(self.projectName, self.projectId)
json_data = json.dumps(self.payload)
try:
response = requests.post(self.createServerUrl,
data=json_data,
headers=self.header,
verify=False)
print(json.loads(str(response.content, encoding="utf-8")))
# 检查请求是否成功
response.raise_for_status()
# 获取响应数据
result = json.loads(str(response.content, encoding="utf-8"))
# {'job_id': 'ff8080828ee22cea018f27bdd23c6477', 'serverIds': ['6cb36d34-b111-48be-9577-b52dbb74adae']}
return {
"job_id": result["job_id"],
"serverIds": result["serverIds"],
"code": 0
}
except requests.exceptions.RequestException as e:
print("create server error: ", e)
return {
"msg": "create server error",
"code": 1
}
def get_server(self, serverId):
"""
根据id查询ecs信息
"""
print("get server, serverId is ", serverId)
getServerdataUrl = "https://ecs.{}.myhuaweicloud.com/v1/{}/cloudservers/{}".format(self.projectName,
self.projectId,
serverId)
try:
response = requests.get(url=getServerdataUrl, headers=self.header, verify=False)
# 检查请求是否成功
response.raise_for_status()
# 获取响应数据
result = json.loads(str(response.content, encoding="utf-8"))
# 解析响应数据
server_volumes_attached = result["server"]["os-extended-volumes:volumes_attached"]
# [{'id': '66fc89cd-874c-480a-8e49-bd1d361ff0a0', 'delete_on_termination': 'true', 'device': '/dev/vda', 'bootIndex': '0'}]
return server_volumes_attached
except requests.exceptions.RequestException as e:
print("get server data error: ", e)
def handle_backup_volumes(backupChildren):
# 单独处理用于构造root_volume和data_volumes
root_volume = {}
data_volumes = []
# 根据Children顺序赋值root_volume和data_volumes
for child in backupChildren:
bootable = child["extend_info"]["bootable"]
volumeSize = max(40, child["resource_size"])
if bootable:
root_volume["volumetype"] = config.volumeType
root_volume["size"] = volumeSize
else:
data_volumes.append({
"volumetype": config.volumeType,
"size": volumeSize
})
return root_volume, data_volumes
def build_restore_mappings(server_volumes_attached, backup_children):
print("build restore mappings")
print(server_volumes_attached)
print(backup_children)
# 根据bootIndex构建restore的mappings
restore_mappings = []
for server_volume in server_volumes_attached:
server_volume_id = server_volume["id"]
server_bootIndex = server_volume["bootIndex"]
backup_child = backup_children[int(server_bootIndex)]
restore_mappings.append({
"backup_id": backup_child["id"],
"volume_id": server_volume_id
})
return restore_mappings
class Job:
"""
获取Job信息
"""
def __init__(self, token, projectName, projectId, jobId) -> None:
self.token = token
self.header = {
'Content-Type': 'application/json', 'X-Auth-Token': self.token
}
self.projectName = projectName
self.projectId = projectId
self.jobId = jobId
self.getJobUrl = "https://ecs.{}.myhuaweicloud.com/v1/{}/jobs/{}".format(self.projectName,
self.projectId,
self.jobId)
def get_job_status(self) -> dict:
"""
根据jobId查询任务信息
"""
print("get job status, jobId is ", self.jobId)
try:
response = requests.get(url=self.getJobUrl, headers=self.header, verify=False)
# 检查请求是否成功
response.raise_for_status()
# 获取响应数据
result = json.loads(str(response.content, encoding="utf-8"))
# 解析响应数据
return {
"code": 0,
"status": result["status"],
}
except requests.exceptions.RequestException as e:
print("get job status error: ", e)
return {
"code": 1,
"msg": "get job status error",
}
class Restore:
"""
恢复备份数据
"""
def __init__(self, token, projectName, projectId, backupId, serverId, restore_mappings) -> dict:
self.token = token
self.header = {
'Content-Type': 'application/json', 'X-Auth-Token': self.token
}
self.projectName = projectName
self.projectId = projectId
self.backupId = backupId
self.serverId = serverId
self.restore_mappings = restore_mappings
self.backupRestoreUrl = "https://cbr.{}.myhuaweicloud.com/v3/{}/backups/{}/restore".format(self.projectName,
self.projectId,
self.backupId)
def backup_restore(self) -> dict:
"""
恢复备份数据
"""
print("backup restore, backupId is {}, serverId is {}".format(self.backupId, self.serverId))
payload = {
"restore": {
"mappings": self.restore_mappings,
"power_on": True,
"server_id": self.serverId
}
}
json_data = json.dumps(payload)
try:
response = requests.post(self.backupRestoreUrl,
data=json_data,
headers=self.header,
verify=False)
# 检查请求是否成功
response.raise_for_status()
# 获取响应数据
result = json.loads(str(response.content, encoding="utf-8"))
# {}
return {
"code": 0
}
except requests.exceptions.RequestException as e:
print("backup restore error: ", e)
return {
"code": 1,
"msg": "backup restore error",
}
def http_error(code):
return code == 1
if __name__ == '__main__':
# IAM鉴权获取token
print("start to get token")
tokenObj = IamLogin(config.iamDomain, config.iamUser, config.iamPassword, config.projectName).get_auth_token()
if http_error(tokenObj["code"]):
print("backup failed, msg is {}".format(tokenObj["msg"]))
else:
token = tokenObj["token"]
# 记录创建服务器的job信息
jobInfos = []
# 根据备份副本ID列表,批量恢复数据
for backupId in config.backupIds:
print("start the backup process, backup id is ", backupId)
resultData = {"backupId": backupId}
# 初始化GetBackup参数
get_backup = GetBackup(token, config.projectName, config.projectId, backupId)
# 根据backupId查询备份元数据
backupsObj = get_backup.get_backup()
if http_error(backupsObj["code"]):
resultData["status"] = "FAILED"
resultData["msg"] = backupsObj["msg"]
print("backup process failed, msg is {}".format(backupsObj["msg"]))
jobInfos.append(resultData)
continue
else:
backups = backupsObj["backups"]
# 初始化CreateServer参数
create_server = CreateServer(token, config.projectName, config.projectId, backupId, backups,
config.imageRef, config.flavorRef, config.vpcId,
config.subnetId, config.enterpriseProjectId, config.securityGroups,
config.keyName)
create_server.transfor_metadata_to_server()
# 创建服务器
createServerData = create_server.create_server()
if http_error(createServerData["code"]):
resultData["status"] = "FAILED"
resultData["msg"] = createServerData["msg"]
print("backup process failed, msg is {}".format(createServerData["msg"]))
jobInfos.append(resultData)
continue
else:
serverId = createServerData["serverIds"][0]
resultData["serverId"] = serverId
resultData["status"] = "START"
# 监控创建服务器任务状态
print("start to get create server job status, please wait...")
get_job = Job(token, config.projectName, config.projectId, createServerData["job_id"])
while True:
time.sleep(config.delayInSeconds)
jobStatusObj = get_job.get_job_status()
if http_error(jobStatusObj["code"]):
resultData["status"] = "FAILED"
resultData["msg"] = createServerData["msg"]
print("backup process failed, msg is {}".format(createServerData["msg"]))
jobInfos.append(resultData)
break
else:
jobStatus = jobStatusObj["status"]
if jobStatus == "SUCCESS":
createServerData["status"] = "SUCCESS"
print("The create server job is executed successfully.")
break
elif jobStatus == "RUNNING" or jobStatus == "INIT":
print("job status is {}, please wait...".format(jobStatus))
if jobStatus is None or jobStatus == "FAIL":
print("The job failed.")
break
# 恢复备份数据
if createServerData["status"] == "SUCCESS":
# 根据bootIndex构建restore的mappings
server_volumes_attached = create_server.get_server(serverId)
backup_children = backups["backup"]["children"]
restore_mappings = build_restore_mappings(server_volumes_attached, backup_children)
restore = Restore(token, config.projectName, config.projectId, backupId, serverId, restore_mappings)
restoreObj = restore.backup_restore()
if http_error(restoreObj["code"]):
resultData["status"] = "FAILED"
resultData["msg"] = restoreObj["msg"]
print("backup process failed, msg is {}".format(restoreObj["msg"]))
jobInfos.append(resultData)
else:
resultData["status"] = "SUCCESS"
jobInfos.append(resultData)
else:
resultData["status"] = "FAILED"
resultData["msg"] = createServerData["msg"]
print("backup process failed, msg is {}".format(createServerData["msg"]))
jobInfos.append(resultData)
break
# 输出成功和失败的数据
print(jobInfos)
successJobList = []
failedJobList = []
for jobInfo in jobInfos:
if jobInfo["status"] == "SUCCESS":
successJobList.append(jobInfo)
else:
failedJobList.append(jobInfo)
print("backup process end")
print({
"successJobList": successJobList,
"failedJobList": failedJobList
})