更新时间:2025-12-11 GMT+08:00
使用CRC64校验文件一致性(Python SDK)
- CRC计算是一个计算密集型的过程,需要进行大量的位移、异或 (XOR) 和查表操作。开启后会耗费额外时间,数据量越大,所需的CPU时间越多,C扩展模式1GB数据计算时间大约在4-6秒之间。
- OBS Python SDK需要crcmod计算CRC校验码,而crcmod依赖于python-devel包中的Python.h文件。crcmod包括C拓展模式和Python模式,当系统缺少Python.h文件时,虽然不影响OBS Python SDK正常安装,但会导致crcmod的C扩展模式安装失败,系统在上传、下载计算CRC校验码时会使用纯Python模式计算CRC校验码,其性能和效率远低于使用C扩展模式。因此在安装OBS Python SDK之前,请确保系统已安装python-devel,如果未安装的,请关闭CRC64校验。
您可以通过以下接口在上传时使用crc64校验文件一致性:
您可以通过以下接口在下载时使用crc64校验文件一致性:
您可以通过以下接口获取对象crc64:
代码样例一:文本上传
本示例用于上传文本对象时使用crc64校验文件一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
from obs import ObsClient from obs import PutObjectHeader import os import traceback # 推荐通过环境变量获取AKSK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险 # 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/intl/zh-cn/usermanual-ca/ca_01_0003.html # 运行本代码示例之前,请确保已设置环境变量AccessKeyID和SecretAccessKey ak = os.getenv("AccessKeyID") sk = os.getenv("SecretAccessKey") # 【可选】如果使用临时AKSK和SecurityToken访问OBS,则同样推荐通过环境变量获取 # security_token = os.getenv("SecurityToken") # server填写Bucket对应的Endpoint, 这里以中国-香港为例,其他地区请按实际情况填写 server = "https://obs.ap-southeast-1.myhuaweicloud.com" # 创建obsClient实例 # 如果使用临时AKSK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值 obsClient = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) try: bucketName = "examplebucket" objectKey = "objectname" # 设置待上传的文本内容 content = 'Hello OBS' # 上传对象的附加头域 headers = PutObjectHeader() # 计算上传数据的ctc64值,并提交服务端校验 headers.isAttachCrc64 = True # 上传文本对象 resp = obsClient.putContent(bucketName, objectKey, content, headers=headers) # 返回码为2xx时,接口调用成功,否则接口调用失败 if resp.status < 300: print('Put Content Succeeded') print('requestId:', resp.requestId) print('crc64:', resp.body.crc64) else: print('Put Content Failed') print('requestId:', resp.requestId) print('errorCode:', resp.errorCode) print('errorMessage:', resp.errorMessage) except: print('Put Content Failed') print(traceback.format_exc()) |
代码样例二:文件上传
本示例用于上传单个文件时使用crc64校验文件一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
from obs import ObsClient from obs import PutObjectHeader import os import traceback # 推荐通过环境变量获取AKSK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险 # 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/intl/zh-cn/usermanual-ca/ca_01_0003.html。 # 运行本代码示例之前,请确保已设置环境变量AccessKeyID和SecretAccessKey ak = os.getenv("AccessKeyID") sk = os.getenv("SecretAccessKey") # 【可选】如果使用临时AKSK和SecurityToken访问OBS,则同样推荐通过环境变量获取 # security_token = os.getenv("SecurityToken") # server填写Bucket对应的Endpoint, 这里以中国-香港为例,其他地区请按实际情况填写 server = "https://obs.ap-southeast-1.myhuaweicloud.com" # 创建obsClient实例 # 如果使用临时AKSK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值 obsClient = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) try: # 上传对象的附加头域 headers = PutObjectHeader() # 【可选】待上传对象的MIME类型 headers.contentType = 'text/plain' # 计算上传数据的ctc64值,并提交服务端校验 headers.isAttachCrc64 = True bucketName = "examplebucket" # 对象名,即上传后的文件名 objectKey = "objectname" # 待上传文件的完整路径,如aa/bb.txt file_path = 'localfile' # 上传文件的自定义元数据 metadata = {'meta1': 'value1', 'meta2': 'value2'} # 文件上传 resp = obsClient.putFile(bucketName, objectKey, file_path, metadata, headers) # 返回码为2xx时,接口调用成功,否则接口调用失败 if resp.status < 300: print('Put File Succeeded') print('requestId:', resp.requestId) print('crc64:', resp.body.crc64) print('versionId:', resp.body.versionId) print('storageClass:', resp.body.storageClass) else: print('Put File Failed') print('requestId:', resp.requestId) print('errorCode:', resp.errorCode) print('errorMessage:', resp.errorMessage) except: print('Put File Failed') print(traceback.format_exc()) |
代码样例三:追加上传
本示例用于追加上传对象时使用crc64校验文件一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
from obs import ObsClient from obs import AppendObjectContent, AppendObjectHeader import os import traceback # 推荐通过环境变量获取AKSK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险 # 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/intl/zh-cn/usermanual-ca/ca_01_0003.html ak = os.getenv("AccessKeyID") sk = os.getenv("SecretAccessKey") # 【可选】如果使用临时AKSK和SecurityToken访问OBS,则同样推荐通过环境变量获取 # security_token = os.getenv("SecurityToken") # server填写Bucket对应的Endpoint, 这里以中国-香港为例,其他地区请按实际情况填写 server = "https://obs.ap-southeast-1.myhuaweicloud.com" # 创建obsClient实例 # 如果使用临时AKSK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值 obsClient = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) try: headers = AppendObjectHeader() # 计算上传数据的ctc64值,并提交服务端校验 headers.isAttachCrc64 = True # 追加上传的消息体 content = AppendObjectContent() # 设置追加内容 content.content = 'Hello OBS' # 设置追加内容起始位置,此处以对象的0个字节为起始位置 content.position = 0 # 第一次调用追加上传时,如果已存在同名的普通对象,则抛出异常(状态码为409) bucketName = "examplebucket" # 追加上传的对象名 objectKey = "objectname" # 追加上传 resp = obsClient.appendObject(bucketName, objectKey, content, headers=headers) # 返回码为2xx时,接口调用成功,否则接口调用失败 if resp.status < 300: print('Append Object Succeeded') print('requestId:', resp.requestId) print('crc64:', resp.body.crc64) print('nextPosition:', resp.body.nextPosition) else: print('Append Object Failed') print('requestId:', resp.requestId) print('errorCode:', resp.errorCode) print('errorMessage:', resp.errorMessage) except: print('Append Object Failed') print(traceback.format_exc()) |
代码样例四:断点续传上传
本示例用于断点续传上传时使用crc64校验文件一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
from obs import ObsClient import os import traceback # 推荐通过环境变量获取AKSK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险 # 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/intl/zh-cn/usermanual-ca/ca_01_0003.html ak = os.getenv("AccessKeyID") sk = os.getenv("SecretAccessKey") # 【可选】如果使用临时AKSK和SecurityToken访问OBS,则同样推荐通过环境变量获取 # security_token = os.getenv("SecurityToken") # server填写Bucket对应的Endpoint, 这里以中国-香港为例,其他地区请按实际情况填写 server = "https://obs.ap-southeast-1.myhuaweicloud.com" # 创建obsClient实例 # 如果使用临时AKSK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值 obsClient = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) try: bucketName = "examplebucket" # 对象名,即上传后的文件名 objectKey = "objectname" # 待上传的文件路径 uploadFile = 'localfile' # 分段上传的并发数 taskNum = 5 # 分段的大小,单位字节,此处为10MB partSize = 10 * 1024 * 1024 # True表示开启断点续传 enableCheckpoint = True # 断点续传上传 resp = obsClient.uploadFile(bucketName, objectKey, uploadFile, partSize, taskNum, enableCheckpoint, encoding_type='url', isAttachCrc64=True) # 返回码为2xx时,接口调用成功,否则接口调用失败 if resp.status < 300: print('Upload File Succeeded') print('requestId:', resp.requestId) print('crc64:', resp.body.crc64) else: print('Upload File Failed') print('requestId:', resp.requestId) print('errorCode:', resp.errorCode) print('errorMessage:', resp.errorMessage) except: print('Upload File Failed') print(traceback.format_exc()) |
代码样例五:分段上传
本示例用于展示分段上传本地文件时使用crc64校验文件一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# -*- coding:utf-8 -*- from obs import ObsClient,CompleteMultipartUploadRequest, CompletePart import os import traceback # 推荐通过环境变量获取AKSK,这里也可以使用其他外部引入方式传入。如果使用硬编码可能会存在泄露风险 # 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/intl/zh-cn/usermanual-ca/ca_01_0003.html ak = os.getenv("AccessKeyID") sk = os.getenv("SecretAccessKey") # 【可选】如果使用临时AKSK和SecurityToken访问OBS,则同样推荐通过环境变量获取 # security_token = os.getenv("SecurityToken") # server填写Bucket对应的Endpoint, 这里以中国-香港为例,其他地区请按实际情况填写 server = "https://obs.ap-southeast-1.myhuaweicloud.com" # 创建obsClient实例 # 如果使用临时AKSK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值 obsClient = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) try: #桶名 bucketName = "examplebucket" #对象名 objectKey = "objectname" # 对象的MIME类型 contentType = 'text/plain' # 初始化分段上传任务 resp = obsClient.initiateMultipartUpload(bucketName, objectKey, contentType=contentType) #获取初始化上传任务的uploadId uploadId = resp.body["uploadId"] #上传段大小 partSize = 10 * 1024 * 1024 #段号 partNum = 1 # 指明object字段是否代表文件路径,默认为False、此处为True isFile = True #本地要上传的对象文件 filepath = r"D:\tmp\file.txt" contentLength = os.path.getsize(filepath) #源文件中某一分段的起始偏移大小。 offset = 0 etags = {} crc64s = {} while offset < contentLength: partSize = min(partSize, (contentLength - offset)); # 用于上传段 resp1 = obsClient.uploadPart(bucketName, objectKey, partNum, uploadId, filepath, isFile, partSize, offset, isAttachCrc64=True) etags[partNum] = resp1.body.etag crc64s[partNum] = resp1.body.crc64 offset = offset + partSize partNum = partNum + 1 completes = [] for i in range(1, partNum): completes.append(CompletePart(i, etags[i], crc64s[i], partSize)) # 用于合并段 completeMultipartUploadRequest = CompleteMultipartUploadRequest(parts = completes) resp = obsClient.completeMultipartUpload(bucketName, objectKey, uploadId, completeMultipartUploadRequest, isAttachCrc64=True) # 返回码为2xx时,接口调用成功,否则接口调用失败 if resp.status < 300: print('Upload Part Succeeded') print('requestId:', resp.requestId) print('crc64:', resp.body.crc64) else: print('Upload Part Failed') print('requestId:', resp.requestId) print('errorCode:', resp.errorCode) print('errorMessage:', resp.errorMessage) except: print('multPartsUpload Failed') print(traceback.format_exc()) |
代码样例六:文件下载
本示例用于下载文件对象时使用crc64校验文件一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
from obs import ObsClient import os import traceback # 推荐通过环境变量获取AKSK,这里也可以使用其他外部引入方式传入。如果使用硬编码可能会存在泄露风险 # 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/intl/zh-cn/usermanual-ca/ca_01_0003.html ak = os.getenv("AccessKeyID") sk = os.getenv("SecretAccessKey") # 【可选】如果使用临时AKSK和SecurityToken访问OBS,则同样推荐通过环境变量获取 # security_token = os.getenv("SecurityToken") # server填写Bucket对应的Endpoint, 这里以中国-香港为例,其他地区请按实际情况填写 server = "https://obs.ap-southeast-1.myhuaweicloud.com" # 创建obsClient实例 # 如果使用临时AKSK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值 obsClient = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) try: # 下载到本地的路径,localfile为包含本地文件名称的全路径 downloadPath = 'localfile' bucketName = "examplebucket" objectKey = "objectname" # 文件下载 resp = obsClient.getObject(bucketName, objectKey, downloadPath, isAttachCrc64=True) # 返回码为2xx时,接口调用成功,否则接口调用失败 if resp.status < 300: print('Get Object Succeeded') print('requestId:', resp.requestId) print('crc64:', resp.body.crc64) else: print('Get Object Failed') print('requestId:', resp.requestId) print('errorCode:', resp.errorCode) print('errorMessage:', resp.errorMessage) except: print('Get Object Failed') print(traceback.format_exc()) |
代码样例七:断点续传下载
本示例用于断点续传下载对象时使用crc64校验文件一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
from obs import ObsClient import os import traceback # 推荐通过环境变量获取AKSK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险 # 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/intl/zh-cn/usermanual-ca/ca_01_0003.html ak = os.getenv("AccessKeyID") sk = os.getenv("SecretAccessKey") # 【可选】如果使用临时AKSK和SecurityToken访问OBS,则同样推荐通过环境变量获取 # security_token = os.getenv("SecurityToken") # server填写Bucket对应的Endpoint, 这里以中国-香港为例,其他地区请按实际情况填写 server = "https://obs.ap-southeast-1.myhuaweicloud.com" # 创建obsClient实例 # 如果使用临时AKSK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值 obsClient = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) try: bucketName = "examplebucket" objectKey = "objectname" # 下载到本地的路径,包含本地文件名称的全路径 downloadFile = 'localfile' # 分段下载的并发数 taskNum = 5 # 分段的大小 partSize = 10 * 1024 * 1024 # True表示开启断点续传 enableCheckpoint = True # 断点续传下载对象 resp = obsClient.downloadFile(bucketName, objectKey, downloadFile, partSize, taskNum, enableCheckpoint, isAttachCrc64=True) # 返回码为2xx时,接口调用成功,否则接口调用失败 if resp.status < 300: print('Download File Succeeded') print('requestId:', resp.requestId) print('crc64:', resp.body.crc64) else: print('Download File Failed') print('requestId:', resp.requestId) print('errorCode:', resp.errorCode) print('errorMessage:', resp.errorMessage) except: print('Download File Failed') print(traceback.format_exc()) |
代码样例八:二进制下载
本示例用于二进制下载对象时使用crc64校验文件一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
from obs import ObsClient import os import traceback # 推荐通过环境变量获取AKSK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险 # 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/intl/zh-cn/usermanual-ca/ca_01_0003.html ak = os.getenv("AccessKeyID") sk = os.getenv("SecretAccessKey") # 【可选】如果使用临时AKSK和SecurityToken访问OBS,则同样推荐通过环境变量获取 # security_token = os.getenv("SecurityToken") # server填写Bucket对应的Endpoint, 这里以中国-香港为例,其他地区请按实际情况填写 server = "https://obs.ap-southeast-1.myhuaweicloud.com" # 创建obsClient实例 # 如果使用临时AKSK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值 obsClient = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) try: bucketName = "examplebucket" objectKey = "objectname" # 指定loadStreamInMemory为True忽略downloadpath路径,将文件的二进制流下载到内存 #二进制下载对象 resp = obsClient.getObject(bucketName=bucketName,objectKey=objectKey, loadStreamInMemory=True, isAttachCrc64=True) # 返回码为2xx时,接口调用成功,否则接口调用失败 if resp.status < 300: print('Get Object Succeeded') print('requestId:', resp.requestId) # 获取对象内容 print('buffer:', resp.body.buffer) print('size:', resp.body.size) print('crc64:', resp.body.crc64) else: print('Get Object Failed') print('requestId:', resp.requestId) print('errorCode:', resp.errorCode) print('errorMessage:', resp.errorMessage) except: print('Get Object Failed') print(traceback.format_exc()) |
代码样例九:流式下载
本示例用于流式下载对象时使用crc64校验文件一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
from obs import GetObjectRequest from obs import ObsClient import os import traceback # 推荐通过环境变量获取AKSK,这里也可以使用其他外部引入方式传入。如果使用硬编码可能会存在泄露风险 # 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/intl/zh-cn/usermanual-ca/ca_01_0003.html ak = os.getenv("AccessKeyID") sk = os.getenv("SecretAccessKey") # 【可选】如果使用临时AKSK和SecurityToken访问OBS,则同样推荐通过环境变量获取 # security_token = os.getenv("SecurityToken") # server填写Bucket对应的Endpoint, 这里以中国-香港为例,其他地区请按实际情况填写 server = "https://obs.ap-southeast-1.myhuaweicloud.com" # 创建obsClient实例 # 如果使用临时AKSK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值 obsClient = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) try: # 下载对象的附加请求参数 getObjectRequest = GetObjectRequest() # 获取对象时重写响应中的Content-Type头。 getObjectRequest.content_type = 'text/plain' bucketName = "examplebucket" objectKey = "objectname" #流式下载 resp = obsClient.getObject(bucketName=bucketName,objectKey=objectKey, getObjectRequest=getObjectRequest, loadStreamInMemory=False, isAttachCrc64=True) # 返回码为2xx时,接口调用成功,否则接口调用失败 if resp.status < 300: print('Get Object Succeeded') print('requestId:', resp.requestId) print('crc64 in obs:', resp.body.crc64) # 读取对象内容 while True: chunk = resp.body.response.read(65536) if not chunk: break print(chunk) resp.body.response.close() else: print('Get Object Failed') print('requestId:', resp.requestId) print('errorCode:', resp.errorCode) print('errorMessage:', resp.errorMessage) except: print('Get Object Failed') print(traceback.format_exc()) |
代码样例十:范围下载
本示例用于范围下载对象时使用crc64校验文件一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
from obs import GetObjectHeader from obs import ObsClient import os import traceback # 推荐通过环境变量获取AKSK,这里也可以使用其他外部引入方式传入。如果使用硬编码可能会存在泄露风险 # 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/intl/zh-cn/usermanual-ca/ca_01_0003.html ak = os.getenv("AccessKeyID") sk = os.getenv("SecretAccessKey") # 【可选】如果使用临时AKSK和SecurityToken访问OBS,则同样推荐通过环境变量获取 # security_token = os.getenv("SecurityToken") # server填写Bucket对应的Endpoint, 这里以中国-香港为例,其他地区请按实际情况填写 server = "https://obs.ap-southeast-1.myhuaweicloud.com" # 创建obsClient实例 # 如果使用临时AKSK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值 obsClient = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) try: # 下载对象的附加头域 headers = GetObjectHeader() # 获取对象时,获取在range范围内的对象内容。此处以对象的前1000个字节为例 # 您可以在获取对象时指定多个范围内容,例如:headers.range='0-1000,2000-3000' headers.range = '0-1000' bucketName = "examplebucket" objectKey = "objectname" # 范围下载,loadStreamInMemory如果为True则会忽略downloadpath路径,将数据流下载到内存 resp = obsClient.getObject(bucketName, objectKey, loadStreamInMemory=True, headers=headers) # 返回码为2xx时,接口调用成功,否则接口调用失败 if resp.status < 300: print('Get Object Succeeded') print('requestId:', resp.requestId) # 获取对象内容 print('buffer:', resp.body.buffer) print('crc64:', resp.body.crc64) else: print('Get Object Failed') print('requestId:', resp.requestId) print('errorCode:', resp.errorCode) print('errorMessage:', resp.errorMessage) except: print('Get Object Failed') print(traceback.format_exc()) |
代码样例十一:获取对象元数据
本示例用于获取examplebucket桶里objectname的crc64。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
from obs import ObsClient import os import traceback # 推荐通过环境变量获取AKSK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险 # 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/intl/zh-cn/usermanual-ca/ca_01_0003.html ak = os.getenv("AccessKeyID") sk = os.getenv("SecretAccessKey") # 【可选】如果使用临时AKSK和SecurityToken访问OBS,则同样推荐通过环境变量获取 # security_token = os.getenv("SecurityToken") # server填写Bucket对应的Endpoint, 这里以中国-香港为例,其他地区请按实际情况填写 server = "https://obs.ap-southeast-1.myhuaweicloud.com" # 创建obsClient实例 # 如果使用临时AKSK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值 obsClient = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) try: bucketName = "examplebucket" objectKey = "objectname" # 获取对象元数据 resp = obsClient.getObjectMetadata(bucketName, objectKey) # 返回码为2xx时,接口调用成功,否则接口调用失败 if resp.status < 300: print('Get Object Metadata Succeeded') print('requestId:', resp.requestId) print('etag:', resp.body.etag) print('crc64:', resp.body.crc64) print('lastModified:', resp.body.lastModified) print('contentType:', resp.body.contentType) print('contentLength:', resp.body.contentLength) else: print('Get Object Metadata Failed') print('requestId:', resp.requestId) print('status:', resp.status) print('reason:', resp.reason) except: print('Get Object Metadata Failed') print(traceback.format_exc()) |
父主题: 对象相关接口(Python SDK)