Updated on 2024-12-04 GMT+08:00

Python

Sample code of multipart upload using Python:

import base64
import hashlib
import os
import re

import xml.etree.ElementTree as ET
import requests

from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvod.v1 import *
from huaweicloudsdkvod.v1.region.vod_region import VodRegion


class PartUploadDemo:
    """
    Example of multipart upload
    """
    # Set the buffer size as needed, that is, the size of the file part read each time.
    # 1 MB
    buffer_size = 1024 * 1024

    # Region
    region_north4 = "cn-north-4"
    region_north1 = "cn-north-1"
    region_east2 = "cn-east-2"

    region = ""

    # AK/SK, which is used for authentication in this example.
    ak = ""
    sk = ""

    def __init__(self):
        pass

    def upload_file(self, file_path):
        """
        Multipart upload
        :param file_path: local path of the file
        :type file_path: str
        :return:
        """
        # Verify the file and its path.
        if not self.valid_file(file_path):
            return

        # Obtain the file name.
        filename = os.path.basename(file_path)

        # An MP4 file is used as an example. For details about other formats, see the official website.
        video_type = "MP4"
        file_content_type = "video/mp4"

        print("Start uploading media assets:" + filename)
        # 1. Initialize authentication and obtain vodClient.
        client = self.create_vod_client()

        # 2. Create a VOD media asset.
        asset_response = self.create_asset(client=client,
                                           file_name=filename,
                                           video_type=video_type)

        # 3. Obtain authorization for initializing an upload task.
        init_auth_response = self.init_part_upload_authority(client=client,
                                                             asset_response=asset_response,
                                                             file_content_type=file_content_type)

        # 4. Initialize the upload task.
        upload_id = self.init_part_upload(sign_str=init_auth_response.sign_str,
                                          file_content_type=file_content_type)

        # Count the number of file parts.
        part_number = 1

        # 7. Read the file content and repeat steps 5 and 6 to upload all parts.
        with open(file_path, 'rb') as f:
            for chunk in iter(lambda: f.read(self.buffer_size), b''):
                # Generate content_md5 using MD5 and then Base64.
                md5 = hashlib.md5()
                md5.update(chunk)
                content_md5 = str(base64.b64encode(md5.digest()), 'utf-8')
                # print(content_md5)

                # 5. Obtain authorization for multipart upload.
                upload_auth_response = self.get_part_upload_authority(client=client,
                                                                      asset_response=asset_response,
                                                                      file_content_type=file_content_type,
                                                                      content_md5=content_md5,
                                                                      upload_id=upload_id,
                                                                      part_number=part_number)

                # 6. Upload parts.
                self.upload_part_file(sign_str=upload_auth_response.sign_str,
                                      chunk=chunk,
                                      content_md5=content_md5,
                                      part_number=part_number)

                # The part number automatically increments by one.
                part_number += 1

        # 8. Obtain authorization for obtaining uploaded parts.
        list_part_upload_authority_response = self.list_uploaded_part_authority(client=client,
                                                                                asset_response=asset_response,
                                                                                upload_id=upload_id)

        # 9. Obtain uploaded parts.
        part_info = self.list_uploaded_part(sign_str=list_part_upload_authority_response.sign_str)

        # 10. Obtain authorization for merging parts.
        merge_part_upload_authority_response = self.merge_uploaded_part_authority(client=client,
                                                                                  asset_response=asset_response,
                                                                                  upload_id=upload_id)

        # 11. Merge uploaded parts.
        self.merge_uploaded_part(sign_str=merge_part_upload_authority_response.sign_str,
                                 part_info=part_info)

        # 12. Confirm media asset upload.
        self.confirm_uploaded(client=client, asset_response=asset_response)
        print("Media asset uploaded. assetId:" + asset_response.asset_id)

    # Check whether the file exists.
    def valid_file(self, file_path):
        valid_result = True
        if not file_path:
            print("The path is empty.")
            valid_result = False
        elif os.path.isdir(file_path):
            print("It is a directory.")
            valid_result = False
        elif not os.path.isfile(file_path):
            print("The file does not exist.")
            valid_result = False
        return valid_result

    # 1. Initialize authentication.
    def create_vod_client(self):
        print("Initializing authentication...")
        credentials = BasicCredentials(self.ak, self.sk)
        client = VodClient.new_builder() \
            .with_credentials(credentials) \
            .with_region(VodRegion.value_of(self.region)) \
            .build()
        return client

    def create_asset(self, client, file_name, video_type):
        """
        2. Create a VOD media asset.
        :param client
        :param file_name: audio/video file name
        :type file_name: str
        :param video_type: uploaded audio/video file format
        :type video_type: str
        """
        print("create_asset start; ")
        create_asset_request = CreateAssetByFileUploadRequest()
        # Create the minimum set of parameters for media asset creation. For details about other parameters, see documents on the official website.
        create_asset_request.body = CreateAssetByFileUploadReq(
            video_type=video_type,
            video_name=file_name,
            title=file_name
        )
        # Call the media asset creation method.
        asset_response = client.create_asset_by_file_upload(create_asset_request)
        print("createAssetResponse:" + asset_response)
        print("create_asset end")
        return asset_response

    def init_part_upload_authority(self, client, asset_response, file_content_type):
        """
        3. Obtain authorization for initializing an upload task.
        :param client:
        :param asset_response: returned media asset creation result
        :param file_content_type: content-type of a file type, such as video/mp4 for MP4
        :type file_content_type: str
        :return:
        """
        print("Obtain authorization for initializing an upload task. init_part_upload_authority start")
        init_auth_request = ShowAssetTempAuthorityRequest()
        # Configure initialization parameters.
        init_auth_request.http_verb = "POST"
        init_auth_request.bucket = asset_response.target.bucket
        init_auth_request.object_key = asset_response.target.object
        init_auth_request.content_type = file_content_type
        # Send an initialization request.
        init_auth_response = client.show_asset_temp_authority(init_auth_request)
        print("init_auth_response:" + init_auth_response)
        print("Obtain authorization for initializing an upload task. init_part_upload_authority end")
        return init_auth_response

    def init_part_upload(self, sign_str, file_content_type):
        """
        4. Initialize the upload task.
        :param sign_str: sign_str in the result returned in step 3, which is the URL for initializing the upload task
        :type sign_str: str
        :param file_content_type: content-type of a file type, such as video/mp4 for MP4
        :type file_content_type: str
        :return: returns upload_id
        """
        print("Initialize multipart upload. init_part_upload start")
        # Send an initialization request.
        init_response = requests.request(method="POST",
                                         url=sign_str,
                                         headers={"Content-Type": file_content_type})
        print(init_response.text)
        # Parse the response to obtain the uploadId.
        root = ET.fromstring(init_response.text)
        namespace_str = root.tag
        match = re.search(r'\{(.*?)}', namespace_str)
        namespace_uri = match.group(1)
        upload_id = root.find("{" + namespace_uri + "}UploadId").text
        print("Initialize multipart upload. init_part_upload end; UploadId:" + upload_id)
        return upload_id

    def get_part_upload_authority(self, client, asset_response, file_content_type, content_md5, upload_id, part_number):
        """
        5. Obtain authorization for multipart upload.
        :param client:
        :param asset_response: returned media asset creation result
        :param file_content_type: content-type of a file type, such as video/mp4 for MP4
        :type file_content_type: str
        :param content_md5: content-md5 value of the current file part
        :type content_md5: str
        :param upload_id:
        :type upload_id: str
        :param part_number: part number
        :type part_number: int
        :return:
        """
        print("Obtain authorization for multipart upload. get_part_upload_authority start; partNumber:", part_number)
        upload_auth_request = ShowAssetTempAuthorityRequest()
        # Configure upload authorization parameters.
        upload_auth_request.http_verb = "PUT"
        upload_auth_request.bucket = asset_response.target.bucket
        upload_auth_request.object_key = asset_response.target.object
        upload_auth_request.content_type = file_content_type
        upload_auth_request.content_md5 = content_md5
        upload_auth_request.upload_id = upload_id
        upload_auth_request.part_number = part_number
        upload_auth_response = client.show_asset_temp_authority(upload_auth_request)
        print(upload_auth_response)
        print("Obtain authorization for multipart upload. get_part_upload_authority end; partNumber:", part_number)
        return upload_auth_response

    def upload_part_file(self, sign_str, chunk, content_md5, part_number):
        """
        6. Upload parts.
        :param sign_str: sign_str in the result returned in step 5, which is the URL for upload
        :type sign_str: str
        :param chunk: binary data of the current file part
        :type chunk: bytes
        :param content_md5: content-md5 value of the current file part
        :type content_md5: str
        :param part_number: number of the current file part
        :type part_number: int
        :return:
        """
        print("Upload parts. upload_part_file start; partNumber:", part_number)
        # Send a multipart upload request.
        upload_response = requests.request(method="PUT",
                                           url=sign_str,
                                           headers={
                                               "Content-Type": "application/octet-stream",
                                               "Content-MD5": content_md5
                                           },
                                           data=chunk)
        if upload_response.status_code != 200:
            print("Multipart upload end; upload failed! partNumber:", part_number)
            raise Exception("Multipart upload end; upload failed! partNumber:", part_number)
        print("Upload parts. upload_part_file end! partNumber:", part_number)

    def list_uploaded_part_authority(self, client, asset_response, upload_id):
        """
        8. Obtain authorization for obtaining uploaded parts.
        :param client:
        :param asset_response: returned media asset creation result
        :param upload_id:
        :return:
        """
        print("Obtain authorization for listing uploaded parts. list_uploaded_part_authority start")
        # Configure parameters.
        list_upload_part_auth_request = ShowAssetTempAuthorityRequest()
        list_upload_part_auth_request.http_verb = "GET"
        list_upload_part_auth_request.bucket = asset_response.target.bucket
        list_upload_part_auth_request.object_key = asset_response.target.object
        list_upload_part_auth_request.upload_id = upload_id
        list_upload_part_auth_response = client.show_asset_temp_authority(list_upload_part_auth_request)
        print(list_upload_part_auth_response)
        print("Obtain authorization for listing uploaded parts. list_uploaded_part_authority end")
        return list_upload_part_auth_response

    def list_uploaded_part(self, sign_str):
        """
        9. Obtain uploaded parts.
        :param sign_str: authorized URL returned in step 8
        :type sign_str: str
        :return:
        """
        print("Query uploaded parts. list_uploaded_part start")
        # Query the start number of file parts.
        part_number_marker = 0
        # Assemble the root nodes for merging parts.
        merger_root = ET.Element("CompleteMultipartUpload")
        # Information about a maximum of 1,000 parts can be returned each time. If there are more than 1,000 parts, call the API for listing parts multiple times.
        while True:
            # List parts.
            list_upload_part_auth_response = requests.request(method="GET",
                                                              url=sign_str + "&part-number-marker=" + str(
                                                                  part_number_marker))
            print(list_upload_part_auth_response)

            # Format the response using XML.
            response_document = ET.fromstring(list_upload_part_auth_response.text)
            # Parse the XML content and obtain the XMLNS information.
            namespace_str_m = response_document.tag
            match_m = re.search(r'\{(.*?)}', namespace_str_m)
            namespace_uri_m = match_m.group(1)

            # Check all Part nodes.
            for part in response_document.findall("{" + namespace_uri_m + "}Part"):
                # Obtain PartNumber and ETag.
                part_number_value = part.find("{" + namespace_uri_m + "}PartNumber").text
                e_tag_value = part.find("{" + namespace_uri_m + "}ETag").text

                # Assemble information about merging.
                # Create a Part node under the root node.
                part_node = ET.SubElement(merger_root, "Part")
                # Create PartNumber under the Part node and set PartNumber.
                part_number_node = ET.SubElement(part_node, "PartNumber")
                part_number_node.text = part_number_value
                # Create ETag under the Part node and set ETag.
                e_tag_node = ET.SubElement(part_node, "ETag")
                e_tag_node.text = e_tag_value

            # Find and set the start number of the next list.
            part_number_marker_element = response_document.find("{" + namespace_uri_m + "}NextPartNumberMarker")
            part_number_marker = int(part_number_marker_element.text)
            # If the part number is not an integer multiple of 1,000, all parts have been obtained.
            if part_number_marker % 1000 != 0:
                break

        part_info = ET.tostring(merger_root, encoding='utf8')
        print(part_info)
        print("Query uploaded parts. list_uploaded_part end")
        return part_info

    def merge_uploaded_part_authority(self, client, asset_response, upload_id):
        """
        10. Obtain authorization for merging parts.
        :param client:
        :param asset_response: returned media asset creation result
        :param upload_id: upload_id
        :type upload_id: str
        :return:
        """
        print("Obtain authorization for merging parts. merge_uploaded_part_authority start")
        # Configure parameters.
        merger_part_upload_auth_request = ShowAssetTempAuthorityRequest()
        merger_part_upload_auth_request.http_verb = "POST"
        merger_part_upload_auth_request.bucket = asset_response.target.bucket
        merger_part_upload_auth_request.object_key = asset_response.target.object
        merger_part_upload_auth_request.upload_id = upload_id
        merger_part_upload_auth_response = client.show_asset_temp_authority(merger_part_upload_auth_request)
        print(merger_part_upload_auth_response)
        print("Obtain authorization for merging parts. merge_uploaded_part_authority end")
        return merger_part_upload_auth_response

    def merge_uploaded_part(self, sign_str, part_info):
        """
        11. Merge uploaded parts.
        :param sign_str: URL for authorized merging returned in step 10
        :type sign_str: str
        :param part_info: information about merged parts
        :type part_info: str
        :return:
        """
        print("Merge parts. start")
        # Add Content-Type to the request header and set the value to application/xml.
        merger_part_upload_response = requests.request(method="POST",
                                                       url=sign_str,
                                                       headers={"Content-Type": "application/xml"},
                                                       data=part_info)
        print(merger_part_upload_response)
        if merger_part_upload_response.status_code != 200:
            print("Part merging end; merging parts failed.")
        print("Part merging end")

    # 12. Confirm media asset upload.
    def confirm_uploaded(self, client, asset_response):
        print("Confirm the upload completion. Start")
        confirm_asset_upload_request = ConfirmAssetUploadRequest()
        confirm_asset_upload_request.body = ConfirmAssetUploadReq(status="CREATED",
                                                                  asset_id=asset_response.asset_id)
        confirm_asset_upload_response = client.confirm_asset_upload(confirm_asset_upload_request)
        print(confirm_asset_upload_response)


if __name__ == '__main__':
    # Path of the local media asset to be uploaded
    filePath = ""
    partUploadDemo = PartUploadDemo()
    partUploadDemo.ak = ""
    partUploadDemo.sk = ""
    partUploadDemo.region = partUploadDemo.region_north4
    # Upload the media asset.
    partUploadDemo.upload_file(file_path=filePath)