文档首页 > > SDK参考> Go> 上传对象> 分段上传

分段上传

分享
更新时间: 2019/04/22 GMT+08:00

对于较大文件上传,可以切分成段上传。用户可以在如下的应用场景内(但不仅限于此),使用分段上传的模式:

  • 上传超过100MB大小的文件。
  • 网络条件较差,和OBS服务端之间的链接经常断开。
  • 上传前无法确定将要上传文件的大小。

分段上传分为如下3个步骤:

  1. 初始化分段上传任务(ObsClient.InitiateMultipartUpload)。
  2. 逐个或并行上传段(ObsClient.UploadPart)。
  3. 合并段(ObsClient.CompleteMultipartUpload)或取消分段上传任务(ObsClient.AbortMultipartUpload)。

初始化分段上传任务

使用分段上传方式传输数据前,必须先通知OBS初始化一个分段上传任务。该操作会返回一个OBS服务端创建的全局唯一标识(Upload ID),用于标识本次分段上传任务。您可以根据这个唯一标识来发起相关的操作,如取消分段上传任务、列举分段上传任务、列举已上传的段等。

您可以通过ObsClient.InitiateMultipartUpload初始化一个分段上传任务:

// 引入依赖包
import (
       "fmt"
       "obs"
)

var ak = "*** Provide your Access Key ***"
var sk = "*** Provide your Secret Key ***"
var endpoint = "https://your-endpoint"

// 创建ObsClient结构体
var obsClient, _ = obs.New(ak, sk, endpoint)

func main() {
       input := &obs.InitiateMultipartUploadInput{}
       input.Bucket = "bucketname"
       input.Key = "objectname"
       input.ContentType = "text/plain"
       input.Metadata = map[string]string{"property1": "property-value1", "property2": "property-value2"}
       output, err := obsClient.InitiateMultipartUpload(input)
       if err == nil {
              fmt.Printf("UploadId:%s\n", output.UploadId)
       } else if obsError, ok := err.(obs.ObsError); ok {
              fmt.Printf("Code:%s\n", obsError.Code)
              fmt.Printf("Message:%s\n", obsError.Message)
       }
}
说明:
  • 初始化分段上传任务时,除了指定上传对象的名称和所属桶外,您还可以使用InitiateMultipartUploadInput.ContentType和InitiateMultipartUploadInput.Metadata分别指定对象MIME类型和对象自定义元数据。
  • 调用初始化分段上传任务接口成功后,会返回分段上传任务的全局唯一标识(Upload ID),在后面的操作中将用到它。

上传段

初始化一个分段上传任务之后,可以根据指定的对象名和Upload ID来分段上传数据。每一个上传的段都有一个标识它的号码——分段号(Part Number,范围是1~10000)。对于同一个Upload ID,该分段号不但唯一标识这一段数据,也标识了这段数据在整个对象内的相对位置。如果您用同一个分段号上传了新的数据,那么OBS上已有的这个段号的数据将被覆盖。除了最后一段以外,其他段的大小范围是100KB~5GB;最后段大小范围是0~5GB。每个段不需要按顺序上传,甚至可以在不同进程、不同机器上上传,OBS会按照分段号排序组成最终对象。

您可以通过ObsClient.UploadPart上传段:

// 引入依赖包
import (
       "fmt"
       "obs"
)

var ak = "*** Provide your Access Key ***"
var sk = "*** Provide your Secret Key ***"
var endpoint = "https://your-endpoint"

// 创建ObsClient结构体
var obsClient, _ = obs.New(ak, sk, endpoint)

func main() {
       input := &obs.UploadPartInput{}
       input.Bucket = "bucketname"
       input.Key = "objectname"
       input.UploadId = "uploadid"
       input.SourceFile = "localfile"   // localfile为待上传的本地文件路径,需要指定到具体的文件名
       // 第一段分段号
       input.PartNumber = 1
       // 第一段的段偏移量
       input.Offset = 0
       // 第一段分段大小
       input.PartSize = 5 * 1024 * 1024
       // 上传第一段
       output, err := obsClient.UploadPart(input)
       if err == nil {
              // 获取上传成功的ETag
              fmt.Printf("ETag:%s\n", output.ETag)
       } else if obsError, ok := err.(obs.ObsError); ok {
              fmt.Printf("Code:%s\n", obsError.Code)
              fmt.Printf("Message:%s\n", obsError.Message)
       }

       // 第二段分段号
       input.PartNumber = 2
       // 第二段的段偏移量
       input.Offset = 5 * 1024 * 1024
       // 第二段分段大小
       input.PartSize = 10 * 1024 * 1024
       // 上传第 二段
       output, err = obsClient.UploadPart(input)
       if err == nil {
              // 获取上传成功的ETag
              fmt.Printf("ETag:%s\n", output.ETag)
       } else if obsError, ok := err.(obs.ObsError); ok {
              fmt.Printf("Code:%s\n", obsError.Code)
              fmt.Printf("Message:%s\n", obsError.Message)
       }
}
说明:
  • 上传段接口要求除最后一段以外,其他的段大小都要大于100KB。但是上传段接口并不会立即校验上传段的大小(因为不知道是否为最后一块);只有调用合并段接口时才会校验。
  • OBS会将服务端收到段数据的ETag值(段数据的MD5值)返回给用户。
  • 可以通过UploadPartInput.ContentMD5设置上传数据的MD5值,提供给OBS服务端用于校验数据完整性。
  • 分段号的范围是1~10000。如果超出这个范围,OBS将返回400 Bad Request错误。
  • OBS 3.0的桶支持最小段的大小为100KB,OBS 2.0的桶支持最小段的大小为5MB。请在OBS 3.0的桶上执行分段上传操作。

合并段

所有分段上传完成后,需要调用合并段接口来在OBS服务端生成最终对象。在执行该操作时,需要提供所有有效的分段列表(包括分段号和分段ETag值);OBS收到提交的分段列表后,会逐一验证每个段的有效性。当所有段验证通过后,OBS将把这些分段组合成最终的对象。

您可以通过ObsClient.CompleteMultipartUpload合并段:

// 引入依赖包
import (
       "fmt"
       "obs"
)

var ak = "*** Provide your Access Key ***"
var sk = "*** Provide your Secret Key ***"
var endpoint = "https://your-endpoint"

// 创建ObsClient结构体
var obsClient, _ = obs.New(ak, sk, endpoint)

func main() {
       input := &obs.CompleteMultipartUploadInput{}
       input.Bucket = "bucketname"
       input.Key = "objectname"
       input.UploadId = "uploadid"
       input.Parts = []obs.Part{
              obs.Part{PartNumber: 1, ETag: "etag1"},
              obs.Part{PartNumber: 2, ETag: "etag2"},
       }

       output, err := obsClient.CompleteMultipartUpload(input)
       if err == nil {
              fmt.Printf("RequestId:%s\n", output.RequestId)
       } else if obsError, ok := err.(obs.ObsError); ok {
              fmt.Printf("Code:%s\n", obsError.Code)
              fmt.Printf("Message:%s\n", obsError.Message)
       }
}
说明:

分段可以是不连续的。

并发分段上传

分段上传的主要目的是解决大文件上传或网络条件较差的情况。下面的示例代码展示了如何使用分段上传并发上传大文件:

// 引入依赖包
import (
       "fmt"
       "obs"
       "os"
)

var ak = "*** Provide your Access Key ***"
var sk = "*** Provide your Secret Key ***"
var endpoint = "https://your-endpoint"
var bucketName = "bucketname"
var objectKey = "objectname"
var filePath = "localfile"

// 创建ObsClient结构体
var obsClient, _ = obs.New(ak, sk, endpoint)

func main() {
       // 初始化分段上传任务
       input := &obs.InitiateMultipartUploadInput{}
       input.Bucket = bucketName
       input.Key = objectKey
       output, err := obsClient.InitiateMultipartUpload(input)

       if err != nil {
              panic(err)
       }

       uploadId := output.UploadId

       fmt.Printf("UploadId:%s\n", uploadId)
       fmt.Println()

       // 每段上传100MB
       var partSize int64 = 100 * 1024 * 1024

       stat, err := os.Stat(filePath)
       if err != nil {
              panic(err)
       }
       fileSize := stat.Size()

       // 计算需要上传的段数
       partCount := int(fileSize / partSize)

       if fileSize%partSize != 0 {
              partCount++
       }

       // 执行并发上传段
       partChan := make(chan obs.Part, 5)

       for i := 0; i < partCount; i++ {
              partNumber := i + 1
              offset := int64(i) * partSize
              currPartSize := partSize
              if i+1 == partCount {
                     currPartSize = fileSize - offset
              }
              go func() {
                     uploadPartInput := &obs.UploadPartInput{}
                     uploadPartInput.Bucket = bucketName
                     uploadPartInput.Key = objectKey
                     uploadPartInput.UploadId = uploadId
                     uploadPartInput.SourceFile = filePath
                     // 分段号
                     uploadPartInput.PartNumber = partNumber
                     // 分段在文件中的起始位置
                     uploadPartInput.Offset = offset
                     // 分段大小
                     uploadPartInput.PartSize = currPartSize
                     uploadPartInputOutput, err := obsClient.UploadPart(uploadPartInput)
                     if err != nil {
                           panic(err)
                     }
                     fmt.Printf("%d finished\n", partNumber)
                     partChan <- obs.Part{ETag: uploadPartInputOutput.ETag, PartNumber: uploadPartInputOutput.PartNumber}
              }()
       }

       parts := make([]obs.Part, 0, partCount)

       // 等待上传完成
       for {
              part, ok := <-partChan
              if !ok {
                     break
              }
              parts = append(parts, part)

              if len(parts) == partCount {
                     close(partChan)
              }
       }

       completeMultipartUploadInput := &obs.CompleteMultipartUploadInput{}
       completeMultipartUploadInput.Bucket = bucketName
       completeMultipartUploadInput.Key = objectKey
       completeMultipartUploadInput.UploadId = uploadId
       completeMultipartUploadInput.Parts = parts
       // 合并段
       completeMultipartUploadOutput, err := obsClient.CompleteMultipartUpload(completeMultipartUploadInput)
       if err != nil {
              panic(err)
       }
       fmt.Printf("RequestId:%s\n", completeMultipartUploadOutput.RequestId)
}
说明:

大文件分段上传时,使用UploadPartInput.Offset和UploadPartInput.PartSize配合指定每段数据在文件中的起始结束位置。

取消分段上传任务

分段上传任务可以被取消,当一个分段上传任务被取消后,就不能再使用其Upload ID做任何操作,已经上传段也会被OBS删除。

采用分段上传方式上传对象过程中或上传对象失败后会在桶内产生段,这些段会占用您的存储空间,您可以通过取消该分段上传任务来清理掉不需要的段,节约存储空间。

您可以通过ObsClient.AbortMultipartUpload取消分段上传任务:

// 引入依赖包
import (
       "fmt"
       "obs"
)

var ak = "*** Provide your Access Key ***"
var sk = "*** Provide your Secret Key ***"
var endpoint = "https://your-endpoint"

// 创建ObsClient结构体
var obsClient, _ = obs.New(ak, sk, endpoint)

func main() {
       input := &obs.AbortMultipartUploadInput{}
       input.Bucket = "bucketname"
       input.Key = "objectname"
       input.UploadId = "uploadid"

       output, err := obsClient.AbortMultipartUpload(input)
       if err == nil {
              fmt.Printf("RequestId:%s\n", output.RequestId)
       } else if obsError, ok := err.(obs.ObsError); ok {
              fmt.Printf("Code:%s\n", obsError.Code)
              fmt.Printf("Message:%s\n", obsError.Message)
       }
}

列举已上传的段

您可使用ObsClient.ListParts列举出某一分段上传任务所有已经上传成功的段。

该接口可设置的参数如下:

参数

作用

ListPartsInput.Bucket

分段上传任务所属的桶名。

ListPartsInput.Key

分段上传任务所属的对象名。

ListPartsInput.UploadId

分段上传任务全局唯一标识,从ObsClient.InitiateMultipartUpload返回的结果获取。

ListPartsInput.MaxParts

表示列举已上传的段返回结果最大段数目,即分页时每一页中段数目。

ListPartsInput.PartNumberMarker

表示待列出段的起始位置,只有Part Number大于该参数的段会被列出。

  • 简单列举
// 引入依赖包
import (
       "fmt"
       "obs"
)

var ak = "*** Provide your Access Key ***"
var sk = "*** Provide your Secret Key ***"
var endpoint = "https://your-endpoint"

// 创建ObsClient结构体
var obsClient, _ = obs.New(ak, sk, endpoint)

func main() {
       input := &obs.ListPartsInput{}
       input.Bucket = "bucketname"
       input.Key = "objectname"
       input.UploadId = "uploadid"

       output, err := obsClient.ListParts(input)
       if err == nil {
              for index, part := range output.Parts {
                     fmt.Printf("Part[%d]-ETag:%s, PartNumber:%d, LastModified:%s, Size:%d\n", index, part.ETag,
                           part.PartNumber, part.LastModified, part.Size)
              }
       } else if obsError, ok := err.(obs.ObsError); ok {
              fmt.Printf("Code:%s\n", obsError.Code)
              fmt.Printf("Message:%s\n", obsError.Message)
       }
}
说明:
  • 列举段至多返回1000个段信息,如果指定的Upload ID包含的段数量大于1000,则返回结果中ListPartsOutput.IsTruncated为true表明本次没有返回全部段,并可通过ListPartsOutput.NextPartNumberMarker获取下次列举的起始位置。
  • 如果想获取指定Upload ID包含的所有分段,可以采用分页列举的方式。
  • 列举所有段

由于ObsClient.ListParts只能列举至多1000个段,如果段数量大于1000,列举所有分段请参考如下示例:

// 引入依赖包
import (
       "fmt"
       "obs"
)

var ak = "*** Provide your Access Key ***"
var sk = "*** Provide your Secret Key ***"
var endpoint = "https://your-endpoint"

// 创建ObsClient结构体
var obsClient, _ = obs.New(ak, sk, endpoint)

func main() {
       input := &obs.ListPartsInput{}
       input.Bucket = "bucketname"
       input.Key = "objectname"
       input.UploadId = "uploadid"

       for {
              output, err := obsClient.ListParts(input)
              if err == nil {
                     for index, part := range output.Parts {
                           fmt.Printf("Part[%d]-ETag:%s, PartNumber:%d, LastModified:%s, Size:%d\n", index, part.ETag,
                                  part.PartNumber, part.LastModified, part.Size)
                     }
                     if output.IsTruncated {
                           input.PartNumberMarker = output.NextPartNumberMarker
                     } else {
                           break
                     }
              } else {
                     if obsError, ok := err.(obs.ObsError); ok {
                           fmt.Printf("Code:%s\n", obsError.Code)
                           fmt.Printf("Message:%s\n", obsError.Message)
                     }
                     break
              }
       }
}

列举分段上传任务

您可以通过ObsClient.ListMultipartUploads列举分段上传任务。列举分段上传任务可设置的参数如下:

参数

作用

ListMultipartUploadsInput.Bucket

桶名。

ListMultipartUploadsInput.Prefix

限定返回的分段上传任务中的对象名必须带有Prefix前缀。

ListMultipartUploadsInput.MaxUploads

列举分段上传任务的最大数目,取值范围为1~1000,当超出范围时,按照默认的1000进行处理。

ListMultipartUploadsInput.Delimiter

用于对分段上传任务中的对象名进行分组的字符。对于对象名中包含Delimiter的任务,其对象名(如果请求中指定了Prefix,则此处的对象名需要去掉Prefix)中从首字符至第一个Delimiter之间的字符串将作为一个分组并作为CommonPrefix返回。

ListMultipartUploadsInput.KeyMarker

表示列举时返回指定的KeyMarker之后的分段上传任务。

ListMultipartUploadsInput.UploadIdMarker

只有与KeyMarker参数一起使用时才有意义,用于指定返回结果的起始位置,即列举时返回指定KeyMarker的UploadIdMarker之后的分段上传任务。

  • 简单列举分段上传任务
// 引入依赖包
import (
       "fmt"
       "obs"
)

var ak = "*** Provide your Access Key ***"
var sk = "*** Provide your Secret Key ***"
var endpoint = "https://your-endpoint"

// 创建ObsClient结构体
var obsClient, _ = obs.New(ak, sk, endpoint)

func main() {
       input := &obs.ListMultipartUploadsInput{}
       input.Bucket = "bucketname"
       output, err := obsClient.ListMultipartUploads(input)
       if err == nil {
              for index, upload := range output.Uploads {
                     fmt.Printf("Upload[%d]-OwnerId:%s, UploadId:%s, Key:%s, Initiated:%s,StorageClass:%s\n",
                           index, upload.Owner.ID, upload.UploadId, upload.Key, upload.Initiated, upload.StorageClass)
              }
       } else if obsError, ok := err.(obs.ObsError); ok {
              fmt.Printf("Code:%s\n", obsError.Code)
              fmt.Printf("Message:%s\n", obsError.Message)
       }
}
说明:
  • 列举分段上传任务至多返回1000个任务信息,如果指定的桶包含的分段上传任务数量大于1000,则返回结果中ListMultipartUploadsOutput.IsTruncated为true表明本次没有返回全部结果,并可通过ListMultipartUploadsOutput.NextKeyMarker和ListMultipartUploadsOutput.NextUploadIdMarker获取下次列举的起点。
  • 如果想获取指定桶包含的所有分段上传任务,可以采用分页列举的方式。
  • 列举全部分段上传任务
// 引入依赖包
import (
       "fmt"
       "obs"
)

var ak = "*** Provide your Access Key ***"
var sk = "*** Provide your Secret Key ***"
var endpoint = "https://your-endpoint"

// 创建ObsClient结构体
var obsClient, _ = obs.New(ak, sk, endpoint)

func main() {
       input := &obs.ListMultipartUploadsInput{}
       input.Bucket = "bucketname"
       output, err := obsClient.ListMultipartUploads(input)
       for {
              if err == nil {
                     for index, upload := range output.Uploads {
                           fmt.Printf("Upload[%d]-OwnerId:%s, UploadId:%s, Key:%s, Initiated:%s,StorageClass:%s\n",
                                  index, upload.Owner.ID, upload.UploadId, upload.Key, upload.Initiated, upload.StorageClass)
                     }
                     if output.IsTruncated {
                           input.KeyMarker = output.NextKeyMarker
                           input.UploadIdMarker = output.NextUploadIdMarker
                     } else {
                           break
                     }
              } else {
                     if obsError, ok := err.(obs.ObsError); ok {
                           fmt.Printf("Code:%s\n", obsError.Code)
                           fmt.Printf("Message:%s\n", obsError.Message)
                     }
                     break
              }
       }
}
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

跳转到云社区