更新时间:2024-10-24 GMT+08:00
Python
分段上传Python语言的示例代码,如下所示:
import base64 import hashlib import os import re import xml.etree.ElementTree as ET import requests from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkvod.v1 import * from huaweicloudsdkvod.v1.region.vod_region import VodRegion class PartUploadDemo: """ 分段上传示例 """ # 设置缓冲区大小,每次读取文件分段的大小,根据情况自行设定 # 1MB buffer_size = 1024 * 1024 # 区域 region_north4 = "cn-north-4" region_north1 = "cn-north-1" region_east2 = "cn-east-2" region = "" # ak/sk,示例以ak/sk做认证鉴权 ak = "" sk = "" def __init__(self): pass def upload_file(self, file_path): """ 分段上传 :param file_path: 文件的本地路径 :type file_path: str :return: """ # 校验一下文件路径和文件 if not self.valid_file(file_path): return # 取文件名 filename = os.path.basename(file_path) # 此处仅以MP4文件示例,其他格式可参考官网说明 video_type = "MP4" file_content_type = "video/mp4" print("开始上传媒资:" + filename) # 1. 初始化鉴权并获取vodClient client = self.create_vod_client() # 2.创建点播媒资 asset_response = self.create_asset(client=client, file_name=filename, video_type=video_type) # 3.获取初始化上传任务授权 init_auth_response = self.init_part_upload_authority(client=client, asset_response=asset_response, file_content_type=file_content_type) # 4.初始化上传任务 upload_id = self.init_part_upload(sign_str=init_auth_response.sign_str, file_content_type=file_content_type) # 文件分段计数 part_number = 1 # 7.读取文件内容,循环5-6步上传所有分段 with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(self.buffer_size), b''): # 先md5,再base64,生成content_md5 md5 = hashlib.md5() md5.update(chunk) content_md5 = str(base64.b64encode(md5.digest()), 'utf-8') # print(content_md5) # 5. 获取上传分段的授权 upload_auth_response = self.get_part_upload_authority(client=client, asset_response=asset_response, file_content_type=file_content_type, content_md5=content_md5, upload_id=upload_id, part_number=part_number) # 6.上传分段 self.upload_part_file(sign_str=upload_auth_response.sign_str, chunk=chunk, content_md5=content_md5, part_number=part_number) # 段号自增 part_number += 1 # 8.获取已上传分段的授权 list_part_upload_authority_response = self.list_uploaded_part_authority(client=client, asset_response=asset_response, upload_id=upload_id) # 9.获取已上传的分段 part_info = self.list_uploaded_part(sign_str=list_part_upload_authority_response.sign_str) # 10.获取合并段授权 merge_part_upload_authority_response = self.merge_uploaded_part_authority(client=client, asset_response=asset_response, upload_id=upload_id) # 11. 合并上传分段 self.merge_uploaded_part(sign_str=merge_part_upload_authority_response.sign_str, part_info=part_info) # 12. 确认媒资上传 self.confirm_uploaded(client=client, asset_response=asset_response) print("上传媒资结束 assetId:" + asset_response.asset_id) # 校验文件是否存在 def valid_file(self, file_path): valid_result = True if not file_path: print("路径为空") valid_result = False elif os.path.isdir(file_path): print("是个目录") valid_result = False elif not os.path.isfile(file_path): print("文件不存在") valid_result = False return valid_result # 1.初始化鉴权 def create_vod_client(self): print("初始化鉴权。。。") credentials = BasicCredentials(self.ak, self.sk) client = VodClient.new_builder() \ .with_credentials(credentials) \ .with_region(VodRegion.value_of(self.region)) \ .build() return client def create_asset(self, client, file_name, video_type): """ 2.创建点播媒资 :param client :param file_name 音视频文件名 :type file_name: str :param video_type 上传音视频文件的格式 :type video_type: str """ print("create_asset start; ") create_asset_request = CreateAssetByFileUploadRequest() # 设置创建媒资参数,创建媒资参数最小集,其他参数参考官网文档按需添加 create_asset_request.body = CreateAssetByFileUploadReq( video_type=video_type, video_name=file_name, title=file_name ) # 调用创建媒资方法 asset_response = client.create_asset_by_file_upload(create_asset_request) print("createAssetResponse:" + asset_response) print("create_asset end") return asset_response def init_part_upload_authority(self, client, asset_response, file_content_type): """ 3.获取初始化上传任务授权 :param client: :param asset_response: 创建媒资返回的结果 :param file_content_type: 文件类型对应的content-type,如MP4对应video/mp4 :type file_content_type: str :return: """ print("获取初始化上传任务授权 init_part_upload_authority start") init_auth_request = ShowAssetTempAuthorityRequest() # 设置初始化参数 init_auth_request.http_verb = "POST" init_auth_request.bucket = asset_response.target.bucket init_auth_request.object_key = asset_response.target.object init_auth_request.content_type = file_content_type # 发送初始化请求 init_auth_response = client.show_asset_temp_authority(init_auth_request) print("init_auth_response:" + init_auth_response) print("获取初始化上传任务授权 init_part_upload_authority end") return init_auth_response def init_part_upload(self, sign_str, file_content_type): """ 4.初始化上传任务 :param sign_str: 第3步返回结果中的sign_str,初始化上传任务连接 :type sign_str: str :param file_content_type: 文件类型对应的content-type,如MP4对应video/mp4 :type file_content_type: str :return: 返回upload_id """ print("初始化分段上传 init_part_upload start") # 发送初始化请求 init_response = requests.request(method="POST", url=sign_str, headers={"Content-Type": file_content_type}) print(init_response.text) # 解析响应内容,获取uploadId root = ET.fromstring(init_response.text) namespace_str = root.tag match = re.search(r'\{(.*?)}', namespace_str) namespace_uri = match.group(1) upload_id = root.find("{" + namespace_uri + "}UploadId").text print("初始化分段上传 init_part_upload end; UploadId:" + upload_id) return upload_id def get_part_upload_authority(self, client, asset_response, file_content_type, content_md5, upload_id, part_number): """ 5.获取上传分段的授权 :param client: :param asset_response: 创建媒资返回的结果 :param file_content_type: 文件类型对应的content-type,如MP4对应video/mp4 :type file_content_type: str :param content_md5: 文件当前分段的content-md5值 :type content_md5: str :param upload_id: :type upload_id: str :param part_number: 段号 :type part_number: int :return: """ print("获取分段上传授权 get_part_upload_authority start; partNumber:", part_number) upload_auth_request = ShowAssetTempAuthorityRequest() # 设置上传授权参数 upload_auth_request.http_verb = "PUT" upload_auth_request.bucket = asset_response.target.bucket upload_auth_request.object_key = asset_response.target.object upload_auth_request.content_type = file_content_type upload_auth_request.content_md5 = content_md5 upload_auth_request.upload_id = upload_id upload_auth_request.part_number = part_number upload_auth_response = client.show_asset_temp_authority(upload_auth_request) print(upload_auth_response) print("获取分段上传授权 get_part_upload_authority end; partNumber:", part_number) return upload_auth_response def upload_part_file(self, sign_str, chunk, content_md5, part_number): """ 6.上传分段 :param sign_str: 第5步返回结果中的sign_str,上传连接 :type sign_str: str :param chunk: 文件当前分段的二进制数据 :type chunk: bytes :param content_md5: 文件当前分段的content-md5 :type content_md5: str :param part_number: 当前段号 :type part_number: int :return: """ print("上传分段 upload_part_file start; partNumber:", part_number) # 发送上传分段请求 upload_response = requests.request(method="PUT", url=sign_str, headers={ "Content-Type": "application/octet-stream", "Content-MD5": content_md5 }, data=chunk) if upload_response.status_code != 200: print("上传分段 end;上传失败!partNumber:", part_number) raise Exception("上传分段 end;上传失败!partNumber:", part_number) print("上传分段 upload_part_file end!partNumber:", part_number) def list_uploaded_part_authority(self, client, asset_response, upload_id): """ 8.获取已上传分段的授权 :param client: :param asset_response: 创建媒资返回的结果 :param upload_id: :return: """ print("获取列举已上传段的授权 list_uploaded_part_authority start") # 设置参数 list_upload_part_auth_request = ShowAssetTempAuthorityRequest() list_upload_part_auth_request.http_verb = "GET" list_upload_part_auth_request.bucket = asset_response.target.bucket list_upload_part_auth_request.object_key = asset_response.target.object list_upload_part_auth_request.upload_id = upload_id list_upload_part_auth_response = client.show_asset_temp_authority(list_upload_part_auth_request) print(list_upload_part_auth_response) print("获取列举已上传段的授权 list_uploaded_part_authority end;") return list_upload_part_auth_response def list_uploaded_part(self, sign_str): """ 9.获取已上传的分段 :param sign_str: 第8步返回的授权连接 :type sign_str: str :return: """ print("查询已上传的分段 list_uploaded_part start") # 查询分段的起始段号 part_number_marker = 0 # 组装合并段时根节点 merger_root = ET.Element("CompleteMultipartUpload") # 每次取列表最大返回1000段信息,分段超过1000要多次调用列举分段接口 while True: # 列举分段 list_upload_part_auth_response = requests.request(method="GET", url=sign_str + "&part-number-marker=" + str( part_number_marker)) print(list_upload_part_auth_response) # 按照xml格式化响应内容 response_document = ET.fromstring(list_upload_part_auth_response.text) # 解析xml内容,取xmlns信息 namespace_str_m = response_document.tag match_m = re.search(r'\{(.*?)}', namespace_str_m) namespace_uri_m = match_m.group(1) # 遍历Part节点 for part in response_document.findall("{" + namespace_uri_m + "}Part"): # 取PartNumber和ETag信息 part_number_value = part.find("{" + namespace_uri_m + "}PartNumber").text e_tag_value = part.find("{" + namespace_uri_m + "}ETag").text # 组装合并信息 # 在根节点下创建Part节点 part_node = ET.SubElement(merger_root, "Part") # Part节点下创建PartNumber,并设置PartNumber值 part_number_node = ET.SubElement(part_node, "PartNumber") part_number_node.text = part_number_value # Part节点下创建ETag,并设置ETag值 e_tag_node = ET.SubElement(part_node, "ETag") e_tag_node.text = e_tag_value # 查找设置下个列表起始值 part_number_marker_element = response_document.find("{" + namespace_uri_m + "}NextPartNumberMarker") part_number_marker = int(part_number_marker_element.text) # 当段号不为1000整数倍,则已取完分段 if part_number_marker % 1000 != 0: break part_info = ET.tostring(merger_root, encoding='utf8') print(part_info) print("查询已上传的分段 list_uploaded_part end") return part_info def merge_uploaded_part_authority(self, client, asset_response, upload_id): """ 10.获取合并段授权 :param client: :param asset_response: 创建媒资返回的结果 :param upload_id: upload_id :type upload_id: str :return: """ print("获取合并段授权 merge_uploaded_part_authority start") # 设置参数 merger_part_upload_auth_request = ShowAssetTempAuthorityRequest() merger_part_upload_auth_request.http_verb = "POST" merger_part_upload_auth_request.bucket = asset_response.target.bucket merger_part_upload_auth_request.object_key = asset_response.target.object merger_part_upload_auth_request.upload_id = upload_id merger_part_upload_auth_response = client.show_asset_temp_authority(merger_part_upload_auth_request) print(merger_part_upload_auth_response) print("获取合并段授权 merge_uploaded_part_authority end") return merger_part_upload_auth_response def merge_uploaded_part(self, sign_str, part_info): """ 11.合并上传分段 :param sign_str: 第10步返回的合并授权连接 :type sign_str: str :param part_info: 合并段信息 :type part_info: str :return: """ print("合并分段 start") # 请求消息头中增加“Content-Type”,值设置为“application/xml”。 merger_part_upload_response = requests.request(method="POST", url=sign_str, headers={"Content-Type": "application/xml"}, data=part_info) print(merger_part_upload_response) if merger_part_upload_response.status_code != 200: print("合并分段end;合并分段失败") print("合并分段 end") # 12.确认媒资上传 def confirm_uploaded(self, client, asset_response): print("确认上传完成start") confirm_asset_upload_request = ConfirmAssetUploadRequest() confirm_asset_upload_request.body = ConfirmAssetUploadReq(status="CREATED", asset_id=asset_response.asset_id) confirm_asset_upload_response = client.confirm_asset_upload(confirm_asset_upload_request) print(confirm_asset_upload_response) if __name__ == '__main__': # 本地要上传的媒资路径 filePath = "" partUploadDemo = PartUploadDemo() partUploadDemo.ak = "" partUploadDemo.sk = "" partUploadDemo.region = partUploadDemo.region_north4 # 上传媒资文件 partUploadDemo.upload_file(file_path=filePath)
父主题: 分段上传代码示例