更新时间:2026-06-29 GMT+08:00
分享

使用CRC64校验文件一致性(Python SDK)

  • CRC计算是一个计算密集型的过程,需要进行大量的位移、异或 (XOR) 和查表操作。开启后会耗费额外时间,数据量越大,所需的CPU时间越多,C扩展模式1GB数据计算时间大约在4-6秒之间。
  • OBS Python SDK需要crcmod计算CRC校验码,而crcmod依赖于python-devel包中的Python.h文件。crcmod包括C拓展模式和Python模式,当系统缺少Python.h文件时,虽然不影响OBS Python SDK正常安装,但会导致crcmod的C扩展模式安装失败,系统在上传、下载计算CRC校验码时会使用纯Python模式计算CRC校验码,其性能和效率远低于使用C扩展模式。因此在安装OBS Python SDK之前,请确保系统已安装python-devel,如果未安装的,请关闭CRC64校验。

您可以通过以下接口在上传时使用crc64校验文件一致性:

您可以通过以下接口在下载时使用crc64校验文件一致性:

您可以通过以下接口获取对象crc64:

代码样例一:文本上传

本示例用于上传文本对象时使用crc64校验文件一致性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os
import traceback

from obs import ObsClient
from obs import PutObjectHeader

# 推荐通过环境变量获取AK/SK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险
# 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/usermanual-ca/ca_01_0003.html
# 运行本代码示例之前,请确保已设置环境变量AccessKeyID和SecretAccessKey
ak = os.getenv("AccessKeyID")
sk = os.getenv("SecretAccessKey")
# 如需使用临时AK/SK和SecurityToken访问OBS,则同样推荐通过环境变量获取
# security_token = os.getenv("SecurityToken")
# server填写Bucket对应的Endpoint,这里以华北-北京四为例,其他地区请按实际情况填写
server = "https://obs.cn-north-4.myhuaweicloud.com"
# 创建obsClient实例
# 如果使用临时AK/SK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) 
try: 
    bucket_name = "examplebucket" 
    object_key = "objectname" 
    # 设置待上传的文本内容 
    content = 'Hello OBS' 
    # 上传对象的附加头域 
    headers = PutObjectHeader() 
    # 计算上传数据的crc64值,并提交服务端校验 
    headers.isAttachCrc64 = True 
    # 上传文本对象 
    resp = obs_client.putContent(bucket_name, object_key, content, headers=headers) 
    # 返回码为2xx时,接口调用成功,否则接口调用失败 
    if resp.status < 300: 
        print('Put Content Succeeded') 
        print('requestId:', resp.requestId) 
        print('crc64:', resp.body.crc64) 
    else: 
        print('Put Content Failed') 
        print('requestId:', resp.requestId) 
        print('errorCode:', resp.errorCode) 
        print('errorMessage:', resp.errorMessage) 
except Exception: 
    print('Put Content Failed') 
    print(traceback.format_exc())

代码样例二:文件上传

本示例用于上传单个文件时使用crc64校验文件一致性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import os
import traceback

from obs import ObsClient
from obs import PutObjectHeader

# 推荐通过环境变量获取AK/SK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险
# 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/usermanual-ca/ca_01_0003.html。
# 运行本代码示例之前,请确保已设置环境变量AccessKeyID和SecretAccessKey
ak = os.getenv("AccessKeyID")
sk = os.getenv("SecretAccessKey")
# 如需使用临时AK/SK和SecurityToken访问OBS,则同样推荐通过环境变量获取
# security_token = os.getenv("SecurityToken")
# server填写Bucket对应的Endpoint,这里以华北-北京四为例,其他地区请按实际情况填写
server = "https://obs.cn-north-4.myhuaweicloud.com"
# 创建obsClient实例
# 如果使用临时AK/SK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) 
try: 
    # 上传对象的附加头域 
    headers = PutObjectHeader() 
    # 【可选】待上传对象的MIME类型 
    headers.contentType = 'text/plain' 
    # 计算上传数据的crc64值,并提交服务端校验 
    headers.isAttachCrc64 = True 
    bucket_name = "examplebucket" 
    # 对象名,即上传后的文件名 
    object_key = "objectname" 
    # 待上传文件的完整路径,如aa/bb.txt 
    file_path = 'localfile' 
    # 上传文件的自定义元数据 
    metadata = {'meta1': 'value1', 'meta2': 'value2'} 
    # 文件上传 
    resp = obs_client.putFile(bucket_name, object_key, file_path, metadata, headers) 
    # 返回码为2xx时,接口调用成功,否则接口调用失败 
    if resp.status < 300: 
        print('Put File Succeeded') 
        print('requestId:', resp.requestId) 
        print('crc64:', resp.body.crc64) 
        print('versionId:', resp.body.versionId) 
        print('storageClass:', resp.body.storageClass) 
    else: 
        print('Put File Failed') 
        print('requestId:', resp.requestId) 
        print('errorCode:', resp.errorCode) 
        print('errorMessage:', resp.errorMessage) 
except Exception: 
    print('Put File Failed') 
    print(traceback.format_exc())

代码样例三:追加上传

本示例用于追加上传对象时使用crc64校验文件一致性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import os
import traceback

from obs import AppendObjectContent, AppendObjectHeader
from obs import ObsClient

# 推荐通过环境变量获取AK/SK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险
# 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/usermanual-ca/ca_01_0003.html
ak = os.getenv("AccessKeyID")
sk = os.getenv("SecretAccessKey")
# 如需使用临时AK/SK和SecurityToken访问OBS,则同样推荐通过环境变量获取
# security_token = os.getenv("SecurityToken")
# server填写Bucket对应的Endpoint,这里以华北-北京四为例,其他地区请按实际情况填写
server = "https://obs.cn-north-4.myhuaweicloud.com"
# 创建obsClient实例
# 如果使用临时AK/SK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) 
try: 
    headers = AppendObjectHeader() 
    # 计算上传数据的crc64值,并提交服务端校验 
    headers.isAttachCrc64 = True 
    # 追加上传的消息体 
    content = AppendObjectContent() 
    # 设置追加内容 
    content.content = 'Hello OBS' 
    # 设置追加内容起始位置,此处以对象的0个字节为起始位置 
    content.position = 0 
    # 第一次调用追加上传时,如果已存在同名的普通对象,则抛出异常(状态码为409) 
    bucket_name = "examplebucket" 
    # 追加上传的对象名 
    object_key = "objectname" 
    # 追加上传 
    resp = obs_client.appendObject(bucket_name, object_key, content, headers=headers) 

    # 返回码为2xx时,接口调用成功,否则接口调用失败 
    if resp.status < 300: 
        print('Append Object Succeeded') 
        print('requestId:', resp.requestId) 
        print('crc64:', resp.body.crc64) 
        print('nextPosition:', resp.body.nextPosition) 
    else: 
        print('Append Object Failed') 
        print('requestId:', resp.requestId) 
        print('errorCode:', resp.errorCode) 
        print('errorMessage:', resp.errorMessage) 
except Exception: 
    print('Append Object Failed') 
    print(traceback.format_exc())

代码样例四:断点续传上传

本示例用于断点续传上传时使用crc64校验文件一致性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import os
import traceback

from obs import ObsClient

# 推荐通过环境变量获取AK/SK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险
# 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/usermanual-ca/ca_01_0003.html
ak = os.getenv("AccessKeyID")
sk = os.getenv("SecretAccessKey")
# 如需使用临时AK/SK和SecurityToken访问OBS,则同样推荐通过环境变量获取
# security_token = os.getenv("SecurityToken")
# server填写Bucket对应的Endpoint,这里以华北-北京四为例,其他地区请按实际情况填写
server = "https://obs.cn-north-4.myhuaweicloud.com"
# 创建obsClient实例
# 如果使用临时AK/SK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) 

try: 
    bucket_name = "examplebucket" 
    # 对象名,即上传后的文件名 
    object_key = "objectname" 
    # 待上传的文件路径 
    upload_file = 'localfile' 
    # 分段上传的并发数 
    task_num = 5 
    # 分段的大小,单位字节,此处为10MB 
    part_size = 10 * 1024 * 1024 
    # True表示开启断点续传 
    enable_checkpoint = True 
    # 断点续传上传 
    resp = obs_client.uploadFile(bucket_name, object_key, upload_file, part_size, task_num, enable_checkpoint, encoding_type='url', isAttachCrc64=True) 

    # 返回码为2xx时,接口调用成功,否则接口调用失败 
    if resp.status < 300: 
        print('Upload File Succeeded') 
        print('requestId:', resp.requestId) 
        print('crc64:', resp.body.crc64) 
    else: 
        print('Upload File Failed') 
        print('requestId:', resp.requestId) 
        print('errorCode:', resp.errorCode) 
        print('errorMessage:', resp.errorMessage) 
except Exception: 
    print('Upload File Failed') 
    print(traceback.format_exc())

代码样例五:分段上传

本示例用于展示分段上传本地文件时使用crc64校验文件一致性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import os
import traceback

from obs import ObsClient, CompleteMultipartUploadRequest, CompletePart

# 推荐通过环境变量获取AK/SK,这里也可以使用其他外部引入方式传入。如果使用硬编码可能会存在泄露风险
# 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/usermanual-ca/ca_01_0003.html
ak = os.getenv("AccessKeyID")
sk = os.getenv("SecretAccessKey")
# 如需使用临时AK/SK和SecurityToken访问OBS,则同样推荐通过环境变量获取
# security_token = os.getenv("SecurityToken")
# server填写Bucket对应的Endpoint,这里以华北-北京四为例,其他地区请按实际情况填写
server = "https://obs.cn-north-4.myhuaweicloud.com"
# 创建obsClient实例
# 如果使用临时AK/SK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server)
try:
    # 桶名
    bucket_name = "examplebucket"
    # 对象名
    object_key = "objectname"
    # 对象的MIME类型
    content_type = 'text/plain'
    # 初始化分段上传任务
    resp = obs_client.initiateMultipartUpload(bucket_name, object_key,
                                             content_type = content_type)
    # 获取初始化上传任务的uploadId
    upload_id = resp.body["uploadId"]
    # 上传段大小
    part_size = 10 * 1024 * 1024
    # 段号
    part_num = 1
    # 指明object字段是否代表文件路径,默认为False、此处为True
    is_file = True
    # 本地要上传的对象文件
    filepath = r"D:\tmp\file.txt"
    content_length = os.path.getsize(filepath)
    # 源文件中某一分段的起始偏移大小。
    offset = 0
    etags = {}
    crc64s = {}

    while offset < content_length:
        part_size = min(part_size, (content_length - offset));
        # 用于上传段
        resp1 = obs_client.uploadPart(bucket_name, object_key, part_num, upload_id, filepath, is_file, part_size, offset, isAttachCrc64=True)
        etags[part_num] = resp1.body.etag
        crc64s[part_num] = resp1.body.crc64
        offset = offset + part_size
        part_num = part_num + 1

    completes = []
    for i in range(1, part_num):
        completes.append(CompletePart(i, etags[i], crc64s[i], part_size))
    # 用于合并段
    complete_multipart_upload_request = CompleteMultipartUploadRequest(parts=completes)
    resp = obs_client.completeMultipartUpload(bucket_name, object_key, upload_id, complete_multipart_upload_request, isAttachCrc64=True)
    # 返回码为2xx时,接口调用成功,否则接口调用失败
    if resp.status < 300:
        print('Upload Part Succeeded')
        print('requestId:', resp.requestId)
        print('crc64:', resp.body.crc64)
    else:
        print('Upload Part Failed')
        print('requestId:', resp.requestId)
        print('errorCode:', resp.errorCode)
        print('errorMessage:', resp.errorMessage)
except Exception:
    print('multPartsUpload Failed')
    print(traceback.format_exc())

代码样例六:文件下载

本示例用于下载文件对象时使用crc64校验文件一致性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import os
import traceback

from obs import ObsClient

# 推荐通过环境变量获取AK/SK,这里也可以使用其他外部引入方式传入。如果使用硬编码可能会存在泄露风险
# 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/usermanual-ca/ca_01_0003.html
ak = os.getenv("AccessKeyID")
sk = os.getenv("SecretAccessKey")
# 如需使用临时AK/SK和SecurityToken访问OBS,则同样推荐通过环境变量获取
# security_token = os.getenv("SecurityToken")
# server填写Bucket对应的Endpoint,这里以华北-北京四为例,其他地区请按实际情况填写
server = "https://obs.cn-north-4.myhuaweicloud.com"
# 创建obsClient实例
# 如果使用临时AK/SK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) 
try: 
    # 下载到本地的路径, localfile为包含本地文件名称的全路径 
    download_path = 'localfile' 
    bucket_name = "examplebucket" 
    object_key = "objectname" 
    # 文件下载 
    resp = obs_client.getObject(bucket_name, object_key, download_path, isAttachCrc64=True) 
    # 返回码为2xx时,接口调用成功,否则接口调用失败 
    if resp.status < 300: 
        print('Get Object Succeeded') 
        print('requestId:', resp.requestId) 
        print('crc64:', resp.body.crc64) 
    else: 
        print('Get Object Failed') 
        print('requestId:', resp.requestId) 
        print('errorCode:', resp.errorCode) 
        print('errorMessage:', resp.errorMessage) 
except Exception: 
    print('Get Object Failed') 
    print(traceback.format_exc())

代码样例七:断点续传下载

本示例用于断点续传下载对象时使用crc64校验文件一致性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import os
import traceback

from obs import ObsClient

# 推荐通过环境变量获取AK/SK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险
# 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/usermanual-ca/ca_01_0003.html
ak = os.getenv("AccessKeyID")
sk = os.getenv("SecretAccessKey")
# 如需使用临时AK/SK和SecurityToken访问OBS,则同样推荐通过环境变量获取
# security_token = os.getenv("SecurityToken")
# server填写Bucket对应的Endpoint,这里以华北-北京四为例,其他地区请按实际情况填写
server = "https://obs.cn-north-4.myhuaweicloud.com"
# 创建obsClient实例
# 如果使用临时AK/SK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) 
try: 
    bucket_name = "examplebucket" 
    object_key = "objectname" 
    # 下载到本地的路径, 包含本地文件名称的全路径 
    download_file = 'localfile' 
    # 分段下载的并发数 
    task_num = 5 
    # 分段的大小 
    part_size = 10 * 1024 * 1024 
    # True表示开启断点续传 
    enable_checkpoint = True 
    # 断点续传下载对象 
    resp = obs_client.downloadFile(bucket_name, object_key, download_file, part_size, task_num, enable_checkpoint, isAttachCrc64=True) 

    # 返回码为2xx时,接口调用成功,否则接口调用失败 
    if resp.status < 300: 
        print('Download File Succeeded') 
        print('requestId:', resp.requestId) 
        print('crc64:', resp.body.crc64) 
    else: 
        print('Download File Failed') 
        print('requestId:', resp.requestId) 
        print('errorCode:', resp.errorCode) 
        print('errorMessage:', resp.errorMessage) 
except Exception: 
    print('Download File Failed') 
    print(traceback.format_exc())

代码样例八:二进制下载

本示例用于二进制下载对象时使用crc64校验文件一致性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import os
import traceback

from obs import ObsClient

# 推荐通过环境变量获取AK/SK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险
# 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/usermanual-ca/ca_01_0003.html
ak = os.getenv("AccessKeyID")
sk = os.getenv("SecretAccessKey")
# 如需使用临时AK/SK和SecurityToken访问OBS,则同样推荐通过环境变量获取
# security_token = os.getenv("SecurityToken")
# server填写Bucket对应的Endpoint,这里以华北-北京四为例,其他地区请按实际情况填写
server = "https://obs.cn-north-4.myhuaweicloud.com"
# 创建obsClient实例
# 如果使用临时AK/SK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) 
try: 
    bucket_name = "examplebucket" 
    object_key = "objectname" 
    # 指定loadStreamInMemory为True忽略downloadpath路径,将文件的二进制流下载到内存 
    # 二进制下载对象 
    resp = obs_client.getObject(bucket_name=bucket_name, object_key=object_key, loadStreamInMemory=True, isAttachCrc64=True) 
    # 返回码为2xx时,接口调用成功,否则接口调用失败 
    if resp.status < 300: 
        print('Get Object Succeeded') 
        print('requestId:', resp.requestId) 
        # 获取对象内容 
        print('buffer:', resp.body.buffer) 
        print('size:', resp.body.size) 
        print('crc64:', resp.body.crc64) 
    else: 
        print('Get Object Failed') 
        print('requestId:', resp.requestId) 
        print('errorCode:', resp.errorCode) 
        print('errorMessage:', resp.errorMessage) 
except Exception: 
    print('Get Object Failed') 
    print(traceback.format_exc())

代码样例九:流式下载

本示例用于流式下载对象时使用crc64校验文件一致性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import os
import traceback

from obs import GetObjectRequest
from obs import ObsClient

# 推荐通过环境变量获取AK/SK,这里也可以使用其他外部引入方式传入。如果使用硬编码可能会存在泄露风险
# 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/usermanual-ca/ca_01_0003.html
ak = os.getenv("AccessKeyID")
sk = os.getenv("SecretAccessKey")
# 如需使用临时AK/SK和SecurityToken访问OBS,则同样推荐通过环境变量获取
# security_token = os.getenv("SecurityToken")
# server填写Bucket对应的Endpoint,这里以华北-北京四为例,其他地区请按实际情况填写
server = "https://obs.cn-north-4.myhuaweicloud.com"
# 创建obsClient实例
# 如果使用临时AK/SK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) 
try: 
    # 下载对象的附加请求参数 
    get_object_request = GetObjectRequest() 
    # 获取对象时重写响应中的Content-Type头。 
    get_object_request.content_type = 'text/plain' 
    bucket_name = "examplebucket" 
    object_key = "objectname" 
    # 流式下载 
    resp = obs_client.getObject(bucket_name=bucket_name, object_key=object_key, getObjectRequest=get_object_request, loadStreamInMemory=False, isAttachCrc64=True) 
    # 返回码为2xx时,接口调用成功,否则接口调用失败 
    if resp.status < 300: 
        print('Get Object Succeeded') 
        print('requestId:', resp.requestId) 
        print('crc64 in obs:', resp.body.crc64) 
        # 读取对象内容 
        while True: 
            chunk = resp.body.response.read(65536) 
            if not chunk: 
                break 
            print(chunk) 
        resp.body.response.close() 
    else: 
        print('Get Object Failed') 
        print('requestId:', resp.requestId) 
        print('errorCode:', resp.errorCode) 
        print('errorMessage:', resp.errorMessage) 
except Exception: 
    print('Get Object Failed') 
    print(traceback.format_exc())

代码样例十:范围下载

本示例用于范围下载对象时使用crc64校验文件一致性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import os
import traceback

from obs import GetObjectHeader
from obs import ObsClient

# 推荐通过环境变量获取AK/SK,这里也可以使用其他外部引入方式传入。如果使用硬编码可能会存在泄露风险
# 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/usermanual-ca/ca_01_0003.html
ak = os.getenv("AccessKeyID")
sk = os.getenv("SecretAccessKey")
# 如需使用临时AK/SK和SecurityToken访问OBS,则同样推荐通过环境变量获取
# security_token = os.getenv("SecurityToken")
# server填写Bucket对应的Endpoint,这里以华北-北京四为例,其他地区请按实际情况填写
server = "https://obs.cn-north-4.myhuaweicloud.com"
# 创建obsClient实例
# 如果使用临时AK/SK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server) 
try: 
    # 下载对象的附加头域 
    headers = GetObjectHeader() 
    # 获取对象时,获取在range范围内的对象内容。此处以对象的前1000个字节为例 
    # 您可以在获取对象时指定多个范围内容,例如:headers.range = '0-1000, 2000-3000' 
    headers.range = '0-1000' 
    bucket_name = "examplebucket" 
    object_key = "objectname" 
    # 范围下载,loadStreamInMemory如果为True则会忽略downloadpath路径, 将数据流下载到内存 
    resp = obs_client.getObject(bucket_name, object_key, loadStreamInMemory=True, headers=headers) 
    # 返回码为2xx时,接口调用成功,否则接口调用失败 
    if resp.status < 300: 
        print('Get Object Succeeded') 
        print('requestId:', resp.requestId) 
        # 获取对象内容 
        print('buffer:', resp.body.buffer) 
        print('crc64:', resp.body.crc64) 
    else: 
        print('Get Object Failed') 
        print('requestId:', resp.requestId) 
        print('errorCode:', resp.errorCode) 
        print('errorMessage:', resp.errorMessage) 
except Exception: 
    print('Get Object Failed') 
    print(traceback.format_exc())

代码样例十一:获取对象元数据

本示例用于获取examplebucket桶里objectname的crc64。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import os
import traceback

from obs import ObsClient

# 推荐通过环境变量获取AK/SK,这里也可以使用其他外部引入方式传入,如果使用硬编码可能会存在泄露风险
# 您可以登录访问管理控制台获取访问密钥AK/SK,获取方式请参见https://support.huaweicloud.com/usermanual-ca/ca_01_0003.html
ak = os.getenv("AccessKeyID")
sk = os.getenv("SecretAccessKey")
# 如需使用临时AK/SK和SecurityToken访问OBS,则同样推荐通过环境变量获取
# security_token = os.getenv("SecurityToken")
# server填写Bucket对应的Endpoint,这里以华北-北京四为例,其他地区请按实际情况填写
server = "https://obs.cn-north-4.myhuaweicloud.com"
# 创建obsClient实例
# 如果使用临时AK/SK和SecurityToken访问OBS,需要在创建实例时通过security_token参数指定securityToken值
obs_client = ObsClient(access_key_id=ak, secret_access_key=sk, server=server)
try:
    bucket_name = "examplebucket"
    object_key = "objectname"
    # 获取对象元数据
    resp = obs_client.getObjectMetadata(bucket_name, object_key)
    # 返回码为2xx时,接口调用成功,否则接口调用失败
    if resp.status < 300:
        print('Get Object Metadata Succeeded')
        print('requestId:', resp.requestId)
        print('etag:', resp.body.etag)
        print('crc64:', resp.body.crc64)
        print('lastModified:', resp.body.lastModified)
        print('contentType:', resp.body.contentType)
        print('contentLength:', resp.body.contentLength)
    else:
        print('Get Object Metadata Failed')
        print('requestId:', resp.requestId)
        print('status:', resp.status)
        print('reason:', resp.reason)
except Exception:
    print('Get Object Metadata Failed')
    print(traceback.format_exc())

相关文档