更新时间:2024-10-24 GMT+08:00

Python

分段上传Python语言的示例代码,如下所示:

import base64
import hashlib
import os
import re

import xml.etree.ElementTree as ET
import requests

from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvod.v1 import *
from huaweicloudsdkvod.v1.region.vod_region import VodRegion


class PartUploadDemo:
    """
    分段上传示例
    """
    # 设置缓冲区大小,每次读取文件分段的大小,根据情况自行设定
    # 1MB
    buffer_size = 1024 * 1024

    # 区域
    region_north4 = "cn-north-4"
    region_north1 = "cn-north-1"
    region_east2 = "cn-east-2"

    region = ""

    # ak/sk,示例以ak/sk做认证鉴权
    ak = ""
    sk = ""

    def __init__(self):
        pass

    def upload_file(self, file_path):
        """
        分段上传
        :param file_path: 文件的本地路径
        :type file_path: str
        :return:
        """
        # 校验一下文件路径和文件
        if not self.valid_file(file_path):
            return

        # 取文件名
        filename = os.path.basename(file_path)

        # 此处仅以MP4文件示例,其他格式可参考官网说明
        video_type = "MP4"
        file_content_type = "video/mp4"

        print("开始上传媒资:" + filename)
        # 1. 初始化鉴权并获取vodClient
        client = self.create_vod_client()

        # 2.创建点播媒资
        asset_response = self.create_asset(client=client,
                                           file_name=filename,
                                           video_type=video_type)

        # 3.获取初始化上传任务授权
        init_auth_response = self.init_part_upload_authority(client=client,
                                                             asset_response=asset_response,
                                                             file_content_type=file_content_type)

        # 4.初始化上传任务
        upload_id = self.init_part_upload(sign_str=init_auth_response.sign_str,
                                          file_content_type=file_content_type)

        # 文件分段计数
        part_number = 1

        # 7.读取文件内容,循环5-6步上传所有分段
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(self.buffer_size), b''):
                # 先md5,再base64,生成content_md5
                md5 = hashlib.md5()
                md5.update(chunk)
                content_md5 = str(base64.b64encode(md5.digest()), 'utf-8')
                # print(content_md5)

                # 5. 获取上传分段的授权
                upload_auth_response = self.get_part_upload_authority(client=client,
                                                                      asset_response=asset_response,
                                                                      file_content_type=file_content_type,
                                                                      content_md5=content_md5,
                                                                      upload_id=upload_id,
                                                                      part_number=part_number)

                # 6.上传分段
                self.upload_part_file(sign_str=upload_auth_response.sign_str,
                                      chunk=chunk,
                                      content_md5=content_md5,
                                      part_number=part_number)

                # 段号自增
                part_number += 1

        # 8.获取已上传分段的授权
        list_part_upload_authority_response = self.list_uploaded_part_authority(client=client,
                                                                                asset_response=asset_response,
                                                                                upload_id=upload_id)

        # 9.获取已上传的分段
        part_info = self.list_uploaded_part(sign_str=list_part_upload_authority_response.sign_str)

        # 10.获取合并段授权
        merge_part_upload_authority_response = self.merge_uploaded_part_authority(client=client,
                                                                                  asset_response=asset_response,
                                                                                  upload_id=upload_id)

        # 11. 合并上传分段
        self.merge_uploaded_part(sign_str=merge_part_upload_authority_response.sign_str,
                                 part_info=part_info)

        # 12. 确认媒资上传
        self.confirm_uploaded(client=client, asset_response=asset_response)
        print("上传媒资结束 assetId:" + asset_response.asset_id)

    # 校验文件是否存在
    def valid_file(self, file_path):
        valid_result = True
        if not file_path:
            print("路径为空")
            valid_result = False
        elif os.path.isdir(file_path):
            print("是个目录")
            valid_result = False
        elif not os.path.isfile(file_path):
            print("文件不存在")
            valid_result = False
        return valid_result

    # 1.初始化鉴权
    def create_vod_client(self):
        print("初始化鉴权。。。")
        credentials = BasicCredentials(self.ak, self.sk)
        client = VodClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(VodRegion.value_of(self.region)) \
            .build()
        return client

    def create_asset(self, client, file_name, video_type):
        """
        2.创建点播媒资
        :param client
        :param file_name 音视频文件名
        :type file_name: str
        :param video_type 上传音视频文件的格式
        :type video_type: str
        """
        print("create_asset start; ")
        create_asset_request = CreateAssetByFileUploadRequest()
        # 设置创建媒资参数,创建媒资参数最小集,其他参数参考官网文档按需添加
        create_asset_request.body = CreateAssetByFileUploadReq(
            video_type=video_type,
            video_name=file_name,
            title=file_name
        )
        # 调用创建媒资方法
        asset_response = client.create_asset_by_file_upload(create_asset_request)
        print("createAssetResponse:" + asset_response)
        print("create_asset end")
        return asset_response

    def init_part_upload_authority(self, client, asset_response, file_content_type):
        """
        3.获取初始化上传任务授权
        :param client:
        :param asset_response: 创建媒资返回的结果
        :param file_content_type: 文件类型对应的content-type,如MP4对应video/mp4
        :type file_content_type: str
        :return:
        """
        print("获取初始化上传任务授权 init_part_upload_authority start")
        init_auth_request = ShowAssetTempAuthorityRequest()
        # 设置初始化参数
        init_auth_request.http_verb = "POST"
        init_auth_request.bucket = asset_response.target.bucket
        init_auth_request.object_key = asset_response.target.object
        init_auth_request.content_type = file_content_type
        # 发送初始化请求
        init_auth_response = client.show_asset_temp_authority(init_auth_request)
        print("init_auth_response:" + init_auth_response)
        print("获取初始化上传任务授权 init_part_upload_authority end")
        return init_auth_response

    def init_part_upload(self, sign_str, file_content_type):
        """
        4.初始化上传任务
        :param sign_str: 第3步返回结果中的sign_str,初始化上传任务连接
        :type sign_str: str
        :param file_content_type: 文件类型对应的content-type,如MP4对应video/mp4
        :type file_content_type: str
        :return: 返回upload_id
        """
        print("初始化分段上传 init_part_upload start")
        # 发送初始化请求
        init_response = requests.request(method="POST",
                                         url=sign_str,
                                         headers={"Content-Type": file_content_type})
        print(init_response.text)
        # 解析响应内容,获取uploadId
        root = ET.fromstring(init_response.text)
        namespace_str = root.tag
        match = re.search(r'\{(.*?)}', namespace_str)
        namespace_uri = match.group(1)
        upload_id = root.find("{" + namespace_uri + "}UploadId").text
        print("初始化分段上传 init_part_upload end; UploadId:" + upload_id)
        return upload_id

    def get_part_upload_authority(self, client, asset_response, file_content_type, content_md5, upload_id, part_number):
        """
        5.获取上传分段的授权
        :param client:
        :param asset_response: 创建媒资返回的结果
        :param file_content_type: 文件类型对应的content-type,如MP4对应video/mp4
        :type file_content_type: str
        :param content_md5: 文件当前分段的content-md5值
        :type content_md5: str
        :param upload_id:
        :type upload_id: str
        :param part_number: 段号
        :type part_number: int
        :return:
        """
        print("获取分段上传授权 get_part_upload_authority start; partNumber:", part_number)
        upload_auth_request = ShowAssetTempAuthorityRequest()
        # 设置上传授权参数
        upload_auth_request.http_verb = "PUT"
        upload_auth_request.bucket = asset_response.target.bucket
        upload_auth_request.object_key = asset_response.target.object
        upload_auth_request.content_type = file_content_type
        upload_auth_request.content_md5 = content_md5
        upload_auth_request.upload_id = upload_id
        upload_auth_request.part_number = part_number
        upload_auth_response = client.show_asset_temp_authority(upload_auth_request)
        print(upload_auth_response)
        print("获取分段上传授权 get_part_upload_authority end; partNumber:", part_number)
        return upload_auth_response

    def upload_part_file(self, sign_str, chunk, content_md5, part_number):
        """
        6.上传分段
        :param sign_str: 第5步返回结果中的sign_str,上传连接
        :type sign_str: str
        :param chunk: 文件当前分段的二进制数据
        :type chunk: bytes
        :param content_md5: 文件当前分段的content-md5
        :type content_md5: str
        :param part_number: 当前段号
        :type part_number: int
        :return:
        """
        print("上传分段 upload_part_file start; partNumber:", part_number)
        # 发送上传分段请求
        upload_response = requests.request(method="PUT",
                                           url=sign_str,
                                           headers={
                                               "Content-Type": "application/octet-stream",
                                               "Content-MD5": content_md5
                                           },
                                           data=chunk)
        if upload_response.status_code != 200:
            print("上传分段 end;上传失败!partNumber:", part_number)
            raise Exception("上传分段 end;上传失败!partNumber:", part_number)
        print("上传分段 upload_part_file end!partNumber:", part_number)

    def list_uploaded_part_authority(self, client, asset_response, upload_id):
        """
        8.获取已上传分段的授权
        :param client:
        :param asset_response: 创建媒资返回的结果
        :param upload_id:
        :return:
        """
        print("获取列举已上传段的授权 list_uploaded_part_authority start")
        # 设置参数
        list_upload_part_auth_request = ShowAssetTempAuthorityRequest()
        list_upload_part_auth_request.http_verb = "GET"
        list_upload_part_auth_request.bucket = asset_response.target.bucket
        list_upload_part_auth_request.object_key = asset_response.target.object
        list_upload_part_auth_request.upload_id = upload_id
        list_upload_part_auth_response = client.show_asset_temp_authority(list_upload_part_auth_request)
        print(list_upload_part_auth_response)
        print("获取列举已上传段的授权 list_uploaded_part_authority end;")
        return list_upload_part_auth_response

    def list_uploaded_part(self, sign_str):
        """
        9.获取已上传的分段
        :param sign_str: 第8步返回的授权连接
        :type sign_str: str
        :return:
        """
        print("查询已上传的分段 list_uploaded_part start")
        # 查询分段的起始段号
        part_number_marker = 0
        # 组装合并段时根节点
        merger_root = ET.Element("CompleteMultipartUpload")
        # 每次取列表最大返回1000段信息,分段超过1000要多次调用列举分段接口
        while True:
            # 列举分段
            list_upload_part_auth_response = requests.request(method="GET",
                                                              url=sign_str + "&part-number-marker=" + str(
                                                                  part_number_marker))
            print(list_upload_part_auth_response)

            # 按照xml格式化响应内容
            response_document = ET.fromstring(list_upload_part_auth_response.text)
            # 解析xml内容,取xmlns信息
            namespace_str_m = response_document.tag
            match_m = re.search(r'\{(.*?)}', namespace_str_m)
            namespace_uri_m = match_m.group(1)

            # 遍历Part节点
            for part in response_document.findall("{" + namespace_uri_m + "}Part"):
                # 取PartNumber和ETag信息
                part_number_value = part.find("{" + namespace_uri_m + "}PartNumber").text
                e_tag_value = part.find("{" + namespace_uri_m + "}ETag").text

                # 组装合并信息
                # 在根节点下创建Part节点
                part_node = ET.SubElement(merger_root, "Part")
                # Part节点下创建PartNumber,并设置PartNumber值
                part_number_node = ET.SubElement(part_node, "PartNumber")
                part_number_node.text = part_number_value
                # Part节点下创建ETag,并设置ETag值
                e_tag_node = ET.SubElement(part_node, "ETag")
                e_tag_node.text = e_tag_value

            # 查找设置下个列表起始值
            part_number_marker_element = response_document.find("{" + namespace_uri_m + "}NextPartNumberMarker")
            part_number_marker = int(part_number_marker_element.text)
            # 当段号不为1000整数倍,则已取完分段
            if part_number_marker % 1000 != 0:
                break

        part_info = ET.tostring(merger_root, encoding='utf8')
        print(part_info)
        print("查询已上传的分段 list_uploaded_part end")
        return part_info

    def merge_uploaded_part_authority(self, client, asset_response, upload_id):
        """
        10.获取合并段授权
        :param client:
        :param asset_response: 创建媒资返回的结果
        :param upload_id: upload_id
        :type upload_id: str
        :return:
        """
        print("获取合并段授权 merge_uploaded_part_authority start")
        # 设置参数
        merger_part_upload_auth_request = ShowAssetTempAuthorityRequest()
        merger_part_upload_auth_request.http_verb = "POST"
        merger_part_upload_auth_request.bucket = asset_response.target.bucket
        merger_part_upload_auth_request.object_key = asset_response.target.object
        merger_part_upload_auth_request.upload_id = upload_id
        merger_part_upload_auth_response = client.show_asset_temp_authority(merger_part_upload_auth_request)
        print(merger_part_upload_auth_response)
        print("获取合并段授权 merge_uploaded_part_authority end")
        return merger_part_upload_auth_response

    def merge_uploaded_part(self, sign_str, part_info):
        """
        11.合并上传分段
        :param sign_str: 第10步返回的合并授权连接
        :type sign_str: str
        :param part_info: 合并段信息
        :type part_info: str
        :return:
        """
        print("合并分段 start")
        # 请求消息头中增加“Content-Type”,值设置为“application/xml”。
        merger_part_upload_response = requests.request(method="POST",
                                                       url=sign_str,
                                                       headers={"Content-Type": "application/xml"},
                                                       data=part_info)
        print(merger_part_upload_response)
        if merger_part_upload_response.status_code != 200:
            print("合并分段end;合并分段失败")
        print("合并分段 end")

    # 12.确认媒资上传
    def confirm_uploaded(self, client, asset_response):
        print("确认上传完成start")
        confirm_asset_upload_request = ConfirmAssetUploadRequest()
        confirm_asset_upload_request.body = ConfirmAssetUploadReq(status="CREATED",
                                                                  asset_id=asset_response.asset_id)
        confirm_asset_upload_response = client.confirm_asset_upload(confirm_asset_upload_request)
        print(confirm_asset_upload_response)


if __name__ == '__main__':
    # 本地要上传的媒资路径
    filePath = ""
    partUploadDemo = PartUploadDemo()
    partUploadDemo.ak = ""
    partUploadDemo.sk = ""
    partUploadDemo.region = partUploadDemo.region_north4
    # 上传媒资文件
    partUploadDemo.upload_file(file_path=filePath)