Updated on 2024-12-04 GMT+08:00
Python
Sample code of multipart upload using Python:
import base64 import hashlib import os import re import xml.etree.ElementTree as ET import requests from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkvod.v1 import * from huaweicloudsdkvod.v1.region.vod_region import VodRegion class PartUploadDemo: """ Example of multipart upload """ # Set the buffer size as needed, that is, the size of the file part read each time. # 1 MB buffer_size = 1024 * 1024 # Region region_north4 = "cn-north-4" region_north1 = "cn-north-1" region_east2 = "cn-east-2" region = "" # AK/SK, which is used for authentication in this example. ak = "" sk = "" def __init__(self): pass def upload_file(self, file_path): """ Multipart upload :param file_path: local path of the file :type file_path: str :return: """ # Verify the file and its path. if not self.valid_file(file_path): return # Obtain the file name. filename = os.path.basename(file_path) # An MP4 file is used as an example. For details about other formats, see the official website. video_type = "MP4" file_content_type = "video/mp4" print("Start uploading media assets:" + filename) # 1. Initialize authentication and obtain vodClient. client = self.create_vod_client() # 2. Create a VOD media asset. asset_response = self.create_asset(client=client, file_name=filename, video_type=video_type) # 3. Obtain authorization for initializing an upload task. init_auth_response = self.init_part_upload_authority(client=client, asset_response=asset_response, file_content_type=file_content_type) # 4. Initialize the upload task. upload_id = self.init_part_upload(sign_str=init_auth_response.sign_str, file_content_type=file_content_type) # Count the number of file parts. part_number = 1 # 7. Read the file content and repeat steps 5 and 6 to upload all parts. with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(self.buffer_size), b''): # Generate content_md5 using MD5 and then Base64. md5 = hashlib.md5() md5.update(chunk) content_md5 = str(base64.b64encode(md5.digest()), 'utf-8') # print(content_md5) # 5. Obtain authorization for multipart upload. upload_auth_response = self.get_part_upload_authority(client=client, asset_response=asset_response, file_content_type=file_content_type, content_md5=content_md5, upload_id=upload_id, part_number=part_number) # 6. Upload parts. self.upload_part_file(sign_str=upload_auth_response.sign_str, chunk=chunk, content_md5=content_md5, part_number=part_number) # The part number automatically increments by one. part_number += 1 # 8. Obtain authorization for obtaining uploaded parts. list_part_upload_authority_response = self.list_uploaded_part_authority(client=client, asset_response=asset_response, upload_id=upload_id) # 9. Obtain uploaded parts. part_info = self.list_uploaded_part(sign_str=list_part_upload_authority_response.sign_str) # 10. Obtain authorization for merging parts. merge_part_upload_authority_response = self.merge_uploaded_part_authority(client=client, asset_response=asset_response, upload_id=upload_id) # 11. Merge uploaded parts. self.merge_uploaded_part(sign_str=merge_part_upload_authority_response.sign_str, part_info=part_info) # 12. Confirm media asset upload. self.confirm_uploaded(client=client, asset_response=asset_response) print("Media asset uploaded. assetId:" + asset_response.asset_id) # Check whether the file exists. def valid_file(self, file_path): valid_result = True if not file_path: print("The path is empty.") valid_result = False elif os.path.isdir(file_path): print("It is a directory.") valid_result = False elif not os.path.isfile(file_path): print("The file does not exist.") valid_result = False return valid_result # 1. Initialize authentication. def create_vod_client(self): print("Initializing authentication...") credentials = BasicCredentials(self.ak, self.sk) client = VodClient.new_builder() \ .with_credentials(credentials) \ .with_region(VodRegion.value_of(self.region)) \ .build() return client def create_asset(self, client, file_name, video_type): """ 2. Create a VOD media asset. :param client :param file_name: audio/video file name :type file_name: str :param video_type: uploaded audio/video file format :type video_type: str """ print("create_asset start; ") create_asset_request = CreateAssetByFileUploadRequest() # Create the minimum set of parameters for media asset creation. For details about other parameters, see documents on the official website. create_asset_request.body = CreateAssetByFileUploadReq( video_type=video_type, video_name=file_name, title=file_name ) # Call the media asset creation method. asset_response = client.create_asset_by_file_upload(create_asset_request) print("createAssetResponse:" + asset_response) print("create_asset end") return asset_response def init_part_upload_authority(self, client, asset_response, file_content_type): """ 3. Obtain authorization for initializing an upload task. :param client: :param asset_response: returned media asset creation result :param file_content_type: content-type of a file type, such as video/mp4 for MP4 :type file_content_type: str :return: """ print("Obtain authorization for initializing an upload task. init_part_upload_authority start") init_auth_request = ShowAssetTempAuthorityRequest() # Configure initialization parameters. init_auth_request.http_verb = "POST" init_auth_request.bucket = asset_response.target.bucket init_auth_request.object_key = asset_response.target.object init_auth_request.content_type = file_content_type # Send an initialization request. init_auth_response = client.show_asset_temp_authority(init_auth_request) print("init_auth_response:" + init_auth_response) print("Obtain authorization for initializing an upload task. init_part_upload_authority end") return init_auth_response def init_part_upload(self, sign_str, file_content_type): """ 4. Initialize the upload task. :param sign_str: sign_str in the result returned in step 3, which is the URL for initializing the upload task :type sign_str: str :param file_content_type: content-type of a file type, such as video/mp4 for MP4 :type file_content_type: str :return: returns upload_id """ print("Initialize multipart upload. init_part_upload start") # Send an initialization request. init_response = requests.request(method="POST", url=sign_str, headers={"Content-Type": file_content_type}) print(init_response.text) # Parse the response to obtain the uploadId. root = ET.fromstring(init_response.text) namespace_str = root.tag match = re.search(r'\{(.*?)}', namespace_str) namespace_uri = match.group(1) upload_id = root.find("{" + namespace_uri + "}UploadId").text print("Initialize multipart upload. init_part_upload end; UploadId:" + upload_id) return upload_id def get_part_upload_authority(self, client, asset_response, file_content_type, content_md5, upload_id, part_number): """ 5. Obtain authorization for multipart upload. :param client: :param asset_response: returned media asset creation result :param file_content_type: content-type of a file type, such as video/mp4 for MP4 :type file_content_type: str :param content_md5: content-md5 value of the current file part :type content_md5: str :param upload_id: :type upload_id: str :param part_number: part number :type part_number: int :return: """ print("Obtain authorization for multipart upload. get_part_upload_authority start; partNumber:", part_number) upload_auth_request = ShowAssetTempAuthorityRequest() # Configure upload authorization parameters. upload_auth_request.http_verb = "PUT" upload_auth_request.bucket = asset_response.target.bucket upload_auth_request.object_key = asset_response.target.object upload_auth_request.content_type = file_content_type upload_auth_request.content_md5 = content_md5 upload_auth_request.upload_id = upload_id upload_auth_request.part_number = part_number upload_auth_response = client.show_asset_temp_authority(upload_auth_request) print(upload_auth_response) print("Obtain authorization for multipart upload. get_part_upload_authority end; partNumber:", part_number) return upload_auth_response def upload_part_file(self, sign_str, chunk, content_md5, part_number): """ 6. Upload parts. :param sign_str: sign_str in the result returned in step 5, which is the URL for upload :type sign_str: str :param chunk: binary data of the current file part :type chunk: bytes :param content_md5: content-md5 value of the current file part :type content_md5: str :param part_number: number of the current file part :type part_number: int :return: """ print("Upload parts. upload_part_file start; partNumber:", part_number) # Send a multipart upload request. upload_response = requests.request(method="PUT", url=sign_str, headers={ "Content-Type": "application/octet-stream", "Content-MD5": content_md5 }, data=chunk) if upload_response.status_code != 200: print("Multipart upload end; upload failed! partNumber:", part_number) raise Exception("Multipart upload end; upload failed! partNumber:", part_number) print("Upload parts. upload_part_file end! partNumber:", part_number) def list_uploaded_part_authority(self, client, asset_response, upload_id): """ 8. Obtain authorization for obtaining uploaded parts. :param client: :param asset_response: returned media asset creation result :param upload_id: :return: """ print("Obtain authorization for listing uploaded parts. list_uploaded_part_authority start") # Configure parameters. list_upload_part_auth_request = ShowAssetTempAuthorityRequest() list_upload_part_auth_request.http_verb = "GET" list_upload_part_auth_request.bucket = asset_response.target.bucket list_upload_part_auth_request.object_key = asset_response.target.object list_upload_part_auth_request.upload_id = upload_id list_upload_part_auth_response = client.show_asset_temp_authority(list_upload_part_auth_request) print(list_upload_part_auth_response) print("Obtain authorization for listing uploaded parts. list_uploaded_part_authority end") return list_upload_part_auth_response def list_uploaded_part(self, sign_str): """ 9. Obtain uploaded parts. :param sign_str: authorized URL returned in step 8 :type sign_str: str :return: """ print("Query uploaded parts. list_uploaded_part start") # Query the start number of file parts. part_number_marker = 0 # Assemble the root nodes for merging parts. merger_root = ET.Element("CompleteMultipartUpload") # Information about a maximum of 1,000 parts can be returned each time. If there are more than 1,000 parts, call the API for listing parts multiple times. while True: # List parts. list_upload_part_auth_response = requests.request(method="GET", url=sign_str + "&part-number-marker=" + str( part_number_marker)) print(list_upload_part_auth_response) # Format the response using XML. response_document = ET.fromstring(list_upload_part_auth_response.text) # Parse the XML content and obtain the XMLNS information. namespace_str_m = response_document.tag match_m = re.search(r'\{(.*?)}', namespace_str_m) namespace_uri_m = match_m.group(1) # Check all Part nodes. for part in response_document.findall("{" + namespace_uri_m + "}Part"): # Obtain PartNumber and ETag. part_number_value = part.find("{" + namespace_uri_m + "}PartNumber").text e_tag_value = part.find("{" + namespace_uri_m + "}ETag").text # Assemble information about merging. # Create a Part node under the root node. part_node = ET.SubElement(merger_root, "Part") # Create PartNumber under the Part node and set PartNumber. part_number_node = ET.SubElement(part_node, "PartNumber") part_number_node.text = part_number_value # Create ETag under the Part node and set ETag. e_tag_node = ET.SubElement(part_node, "ETag") e_tag_node.text = e_tag_value # Find and set the start number of the next list. part_number_marker_element = response_document.find("{" + namespace_uri_m + "}NextPartNumberMarker") part_number_marker = int(part_number_marker_element.text) # If the part number is not an integer multiple of 1,000, all parts have been obtained. if part_number_marker % 1000 != 0: break part_info = ET.tostring(merger_root, encoding='utf8') print(part_info) print("Query uploaded parts. list_uploaded_part end") return part_info def merge_uploaded_part_authority(self, client, asset_response, upload_id): """ 10. Obtain authorization for merging parts. :param client: :param asset_response: returned media asset creation result :param upload_id: upload_id :type upload_id: str :return: """ print("Obtain authorization for merging parts. merge_uploaded_part_authority start") # Configure parameters. merger_part_upload_auth_request = ShowAssetTempAuthorityRequest() merger_part_upload_auth_request.http_verb = "POST" merger_part_upload_auth_request.bucket = asset_response.target.bucket merger_part_upload_auth_request.object_key = asset_response.target.object merger_part_upload_auth_request.upload_id = upload_id merger_part_upload_auth_response = client.show_asset_temp_authority(merger_part_upload_auth_request) print(merger_part_upload_auth_response) print("Obtain authorization for merging parts. merge_uploaded_part_authority end") return merger_part_upload_auth_response def merge_uploaded_part(self, sign_str, part_info): """ 11. Merge uploaded parts. :param sign_str: URL for authorized merging returned in step 10 :type sign_str: str :param part_info: information about merged parts :type part_info: str :return: """ print("Merge parts. start") # Add Content-Type to the request header and set the value to application/xml. merger_part_upload_response = requests.request(method="POST", url=sign_str, headers={"Content-Type": "application/xml"}, data=part_info) print(merger_part_upload_response) if merger_part_upload_response.status_code != 200: print("Part merging end; merging parts failed.") print("Part merging end") # 12. Confirm media asset upload. def confirm_uploaded(self, client, asset_response): print("Confirm the upload completion. Start") confirm_asset_upload_request = ConfirmAssetUploadRequest() confirm_asset_upload_request.body = ConfirmAssetUploadReq(status="CREATED", asset_id=asset_response.asset_id) confirm_asset_upload_response = client.confirm_asset_upload(confirm_asset_upload_request) print(confirm_asset_upload_response) if __name__ == '__main__': # Path of the local media asset to be uploaded filePath = "" partUploadDemo = PartUploadDemo() partUploadDemo.ak = "" partUploadDemo.sk = "" partUploadDemo.region = partUploadDemo.region_north4 # Upload the media asset. partUploadDemo.upload_file(file_path=filePath)
Parent topic: Sample Code for Multipart Upload
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
The system is busy. Please try again later.
For any further questions, feel free to contact us through the chatbot.
Chatbot