更新时间:2025-02-11 GMT+08:00
Python
分段上传Python语言的示例代码,如下所示:
import base64
import hashlib
import os
import re
import xml.etree.ElementTree as ET
import requests
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvod.v1 import *
from huaweicloudsdkvod.v1.region.vod_region import VodRegion
class PartUploadDemo:
"""
分段上传示例
"""
# 设置缓冲区大小,每次读取文件分段的大小,根据情况自行设定
# 1MB
buffer_size = 1024 * 1024
# 区域
region_north4 = "cn-north-4"
region_north1 = "cn-north-1"
region_east2 = "cn-east-2"
region = ""
# ak/sk,示例以ak/sk做认证鉴权
ak = ""
sk = ""
def __init__(self):
pass
def upload_file(self, file_path):
"""
分段上传
:param file_path: 文件的本地路径
:type file_path: str
:return:
"""
# 校验一下文件路径和文件
if not self.valid_file(file_path):
return
# 取文件名
filename = os.path.basename(file_path)
# 此处仅以MP4文件示例,其他格式可参考官网说明
video_type = "MP4"
file_content_type = "video/mp4"
print("开始上传媒资:" + filename)
# 1. 初始化鉴权并获取vodClient
client = self.create_vod_client()
# 2.创建点播媒资
asset_response = self.create_asset(client=client,
file_name=filename,
video_type=video_type)
# 3.获取初始化上传任务授权
init_auth_response = self.init_part_upload_authority(client=client,
asset_response=asset_response,
file_content_type=file_content_type)
# 4.初始化上传任务
upload_id = self.init_part_upload(sign_str=init_auth_response.sign_str,
file_content_type=file_content_type)
# 文件分段计数
part_number = 1
# 7.读取文件内容,循环5-6步上传所有分段
with open(file_path, 'rb') as f:
for chunk in iter(lambda: f.read(self.buffer_size), b''):
# 先md5,再base64,生成content_md5
md5 = hashlib.md5()
md5.update(chunk)
content_md5 = str(base64.b64encode(md5.digest()), 'utf-8')
# print(content_md5)
# 5. 获取上传分段的授权
upload_auth_response = self.get_part_upload_authority(client=client,
asset_response=asset_response,
file_content_type=file_content_type,
content_md5=content_md5,
upload_id=upload_id,
part_number=part_number)
# 6.上传分段
self.upload_part_file(sign_str=upload_auth_response.sign_str,
chunk=chunk,
content_md5=content_md5,
part_number=part_number)
# 段号自增
part_number += 1
# 8.获取已上传分段的授权
list_part_upload_authority_response = self.list_uploaded_part_authority(client=client,
asset_response=asset_response,
upload_id=upload_id)
# 9.获取已上传的分段
part_info = self.list_uploaded_part(sign_str=list_part_upload_authority_response.sign_str)
# 10.获取合并段授权
merge_part_upload_authority_response = self.merge_uploaded_part_authority(client=client,
asset_response=asset_response,
upload_id=upload_id)
# 11. 合并上传分段
self.merge_uploaded_part(sign_str=merge_part_upload_authority_response.sign_str,
part_info=part_info)
# 12. 确认媒资上传
self.confirm_uploaded(client=client, asset_response=asset_response)
print("上传媒资结束 assetId:" + asset_response.asset_id)
# 校验文件是否存在
def valid_file(self, file_path):
valid_result = True
if not file_path:
print("路径为空")
valid_result = False
elif os.path.isdir(file_path):
print("是个目录")
valid_result = False
elif not os.path.isfile(file_path):
print("文件不存在")
valid_result = False
return valid_result
# 1.初始化鉴权
def create_vod_client(self):
print("初始化鉴权。。。")
credentials = BasicCredentials(self.ak, self.sk)
client = VodClient.new_builder() \
.with_credentials(credentials) \
.with_region(VodRegion.value_of(self.region)) \
.build()
return client
def create_asset(self, client, file_name, video_type):
"""
2.创建点播媒资
:param client
:param file_name 音视频文件名
:type file_name: str
:param video_type 上传音视频文件的格式
:type video_type: str
"""
print("create_asset start; ")
create_asset_request = CreateAssetByFileUploadRequest()
# 设置创建媒资参数,创建媒资参数最小集,其他参数参考官网文档按需添加
create_asset_request.body = CreateAssetByFileUploadReq(
video_type=video_type,
video_name=file_name,
title=file_name
)
# 调用创建媒资方法
asset_response = client.create_asset_by_file_upload(create_asset_request)
print("create_asset end")
return asset_response
def init_part_upload_authority(self, client, asset_response, file_content_type):
"""
3.获取初始化上传任务授权
:param client:
:param asset_response: 创建媒资返回的结果
:param file_content_type: 文件类型对应的content-type,如MP4对应video/mp4
:type file_content_type: str
:return:
"""
print("获取初始化上传任务授权 init_part_upload_authority start")
init_auth_request = ShowAssetTempAuthorityRequest()
# 设置初始化参数
init_auth_request.http_verb = "POST"
init_auth_request.bucket = asset_response.target.bucket
init_auth_request.object_key = asset_response.target.object
init_auth_request.content_type = file_content_type
# 发送初始化请求
init_auth_response = client.show_asset_temp_authority(init_auth_request)
print("获取初始化上传任务授权 init_part_upload_authority end")
return init_auth_response
def init_part_upload(self, sign_str, file_content_type):
"""
4.初始化上传任务
:param sign_str: 第3步返回结果中的sign_str,初始化上传任务连接
:type sign_str: str
:param file_content_type: 文件类型对应的content-type,如MP4对应video/mp4
:type file_content_type: str
:return: 返回upload_id
"""
print("初始化分段上传 init_part_upload start")
# 发送初始化请求
init_response = requests.request(method="POST",
url=sign_str,
headers={"Content-Type": file_content_type})
print(init_response.text)
# 解析响应内容,获取uploadId
root = ET.fromstring(init_response.text)
namespace_str = root.tag
match = re.search(r'\{(.*?)}', namespace_str)
namespace_uri = match.group(1)
upload_id = root.find("{" + namespace_uri + "}UploadId").text
print("初始化分段上传 init_part_upload end; UploadId:" + upload_id)
return upload_id
def get_part_upload_authority(self, client, asset_response, file_content_type, content_md5, upload_id, part_number):
"""
5.获取上传分段的授权
:param client:
:param asset_response: 创建媒资返回的结果
:param file_content_type: 文件类型对应的content-type,如MP4对应video/mp4
:type file_content_type: str
:param content_md5: 文件当前分段的content-md5值
:type content_md5: str
:param upload_id:
:type upload_id: str
:param part_number: 段号
:type part_number: int
:return:
"""
print("获取分段上传授权 get_part_upload_authority start; partNumber:", part_number)
upload_auth_request = ShowAssetTempAuthorityRequest()
# 设置上传授权参数
upload_auth_request.http_verb = "PUT"
upload_auth_request.bucket = asset_response.target.bucket
upload_auth_request.object_key = asset_response.target.object
upload_auth_request.content_type = file_content_type
upload_auth_request.content_md5 = content_md5
upload_auth_request.upload_id = upload_id
upload_auth_request.part_number = part_number
upload_auth_response = client.show_asset_temp_authority(upload_auth_request)
print(upload_auth_response)
print("获取分段上传授权 get_part_upload_authority end; partNumber:", part_number)
return upload_auth_response
def upload_part_file(self, sign_str, chunk, content_md5, part_number):
"""
6.上传分段
:param sign_str: 第5步返回结果中的sign_str,上传连接
:type sign_str: str
:param chunk: 文件当前分段的二进制数据
:type chunk: bytes
:param content_md5: 文件当前分段的content-md5
:type content_md5: str
:param part_number: 当前段号
:type part_number: int
:return:
"""
print("上传分段 upload_part_file start; partNumber:", part_number)
# 发送上传分段请求
upload_response = requests.request(method="PUT",
url=sign_str,
headers={
"Content-Type": "application/octet-stream",
"Content-MD5": content_md5
},
data=chunk)
if upload_response.status_code != 200:
print("上传分段 end;上传失败!partNumber:", part_number)
raise Exception("上传分段 end;上传失败!partNumber:", part_number)
print("上传分段 upload_part_file end!partNumber:", part_number)
def list_uploaded_part_authority(self, client, asset_response, upload_id):
"""
8.获取已上传分段的授权
:param client:
:param asset_response: 创建媒资返回的结果
:param upload_id:
:return:
"""
print("获取列举已上传段的授权 list_uploaded_part_authority start")
# 设置参数
list_upload_part_auth_request = ShowAssetTempAuthorityRequest()
list_upload_part_auth_request.http_verb = "GET"
list_upload_part_auth_request.bucket = asset_response.target.bucket
list_upload_part_auth_request.object_key = asset_response.target.object
list_upload_part_auth_request.upload_id = upload_id
list_upload_part_auth_response = client.show_asset_temp_authority(list_upload_part_auth_request)
print(list_upload_part_auth_response)
print("获取列举已上传段的授权 list_uploaded_part_authority end;")
return list_upload_part_auth_response
def list_uploaded_part(self, sign_str):
"""
9.获取已上传的分段
:param sign_str: 第8步返回的授权连接
:type sign_str: str
:return:
"""
print("查询已上传的分段 list_uploaded_part start")
# 查询分段的起始段号
part_number_marker = 0
# 组装合并段时根节点
merger_root = ET.Element("CompleteMultipartUpload")
# 每次取列表最大返回1000段信息,分段超过1000要多次调用列举分段接口
while True:
# 列举分段
list_upload_part_auth_response = requests.request(method="GET",
url=sign_str + "&part-number-marker=" + str(
part_number_marker))
print(list_upload_part_auth_response)
# 按照xml格式化响应内容
response_document = ET.fromstring(list_upload_part_auth_response.text)
# 解析xml内容,取xmlns信息
namespace_str_m = response_document.tag
match_m = re.search(r'\{(.*?)}', namespace_str_m)
namespace_uri_m = match_m.group(1)
# 遍历Part节点
for part in response_document.findall("{" + namespace_uri_m + "}Part"):
# 取PartNumber和ETag信息
part_number_value = part.find("{" + namespace_uri_m + "}PartNumber").text
e_tag_value = part.find("{" + namespace_uri_m + "}ETag").text
# 组装合并信息
# 在根节点下创建Part节点
part_node = ET.SubElement(merger_root, "Part")
# Part节点下创建PartNumber,并设置PartNumber值
part_number_node = ET.SubElement(part_node, "PartNumber")
part_number_node.text = part_number_value
# Part节点下创建ETag,并设置ETag值
e_tag_node = ET.SubElement(part_node, "ETag")
e_tag_node.text = e_tag_value
# 查找设置下个列表起始值
part_number_marker_element = response_document.find("{" + namespace_uri_m + "}NextPartNumberMarker")
part_number_marker = int(part_number_marker_element.text)
# 当段号不为1000整数倍,则已取完分段
if part_number_marker % 1000 != 0:
break
part_info = ET.tostring(merger_root, encoding='utf8')
print(part_info)
print("查询已上传的分段 list_uploaded_part end")
return part_info
def merge_uploaded_part_authority(self, client, asset_response, upload_id):
"""
10.获取合并段授权
:param client:
:param asset_response: 创建媒资返回的结果
:param upload_id: upload_id
:type upload_id: str
:return:
"""
print("获取合并段授权 merge_uploaded_part_authority start")
# 设置参数
merger_part_upload_auth_request = ShowAssetTempAuthorityRequest()
merger_part_upload_auth_request.http_verb = "POST"
merger_part_upload_auth_request.bucket = asset_response.target.bucket
merger_part_upload_auth_request.object_key = asset_response.target.object
merger_part_upload_auth_request.upload_id = upload_id
merger_part_upload_auth_response = client.show_asset_temp_authority(merger_part_upload_auth_request)
print(merger_part_upload_auth_response)
print("获取合并段授权 merge_uploaded_part_authority end")
return merger_part_upload_auth_response
def merge_uploaded_part(self, sign_str, part_info):
"""
11.合并上传分段
:param sign_str: 第10步返回的合并授权连接
:type sign_str: str
:param part_info: 合并段信息
:type part_info: str
:return:
"""
print("合并分段 start")
# 请求消息头中增加“Content-Type”,值设置为“application/xml”。
merger_part_upload_response = requests.request(method="POST",
url=sign_str,
headers={"Content-Type": "application/xml"},
data=part_info)
print(merger_part_upload_response)
if merger_part_upload_response.status_code != 200:
print("合并分段end;合并分段失败")
print("合并分段 end")
# 12.确认媒资上传
def confirm_uploaded(self, client, asset_response):
print("确认上传完成start")
confirm_asset_upload_request = ConfirmAssetUploadRequest()
confirm_asset_upload_request.body = ConfirmAssetUploadReq(status="CREATED",
asset_id=asset_response.asset_id)
confirm_asset_upload_response = client.confirm_asset_upload(confirm_asset_upload_request)
print(confirm_asset_upload_response)
if __name__ == '__main__':
# 本地要上传的媒资路径
filePath = ""
partUploadDemo = PartUploadDemo()
partUploadDemo.ak = ""
partUploadDemo.sk = ""
partUploadDemo.region = partUploadDemo.region_north4
# 上传媒资文件
partUploadDemo.upload_file(file_path=filePath)
父主题: 分段上传代码示例