Browse Source

add ListMultiUploads && 断点续传

tags/v0.7.11^2
jojoliang 5 years ago
parent
commit
64d31f318a
  1. 2
      cos.go
  2. 99
      example/object/list_uploads.go
  3. 193
      object.go
  4. 47
      object_part.go

2
cos.go

@ -22,7 +22,7 @@ import (
const (
// Version current go sdk version
Version = "0.7.10"
Version = "0.7.11"
userAgent = "cos-go-sdk-v5/" + Version
contentTypeXML = "application/xml"
defaultServiceBaseURL = "http://service.cos.myqcloud.com"

99
example/object/list_uploads.go

@ -0,0 +1,99 @@
package main
import (
"context"
"fmt"
"math/rand"
"net/url"
"os"
"strings"
"net/http"
"github.com/tencentyun/cos-go-sdk-v5"
"github.com/tencentyun/cos-go-sdk-v5/debug"
)
func log_status(err error) {
if err == nil {
return
}
if cos.IsNotFoundError(err) {
// WARN
fmt.Println("WARN: Resource is not existed")
} else if e, ok := cos.IsCOSError(err); ok {
fmt.Printf("ERROR: Code: %v\n", e.Code)
fmt.Printf("ERROR: Message: %v\n", e.Message)
fmt.Printf("ERROR: Resource: %v\n", e.Resource)
fmt.Printf("ERROR: RequestId: %v\n", e.RequestID)
// ERROR
} else {
fmt.Printf("ERROR: %v\n", err)
// ERROR
}
}
func initUpload(c *cos.Client, name string) *cos.InitiateMultipartUploadResult {
v, _, err := c.Object.InitiateMultipartUpload(context.Background(), name, nil)
log_status(err)
fmt.Printf("%#v\n", v)
return v
}
func uploadPart(c *cos.Client, name string, uploadID string, blockSize, n int) string {
b := make([]byte, blockSize)
if _, err := rand.Read(b); err != nil {
log_status(err)
}
s := fmt.Sprintf("%X", b)
f := strings.NewReader(s)
resp, err := c.Object.UploadPart(
context.Background(), name, uploadID, n, f, nil,
)
log_status(err)
fmt.Printf("%s\n", resp.Status)
return resp.Header.Get("Etag")
}
func main() {
u, _ := url.Parse("http://test-1259654469.cos.ap-guangzhou.myqcloud.com")
b := &cos.BaseURL{BucketURL: u}
c := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: os.Getenv("COS_SECRETID"),
SecretKey: os.Getenv("COS_SECRETKEY"),
Transport: &debug.DebugRequestTransport{
RequestHeader: true,
RequestBody: false,
ResponseHeader: true,
ResponseBody: true,
},
},
})
name := "test/test_list_parts.go"
up := initUpload(c, name)
uploadID := up.UploadID
ctx := context.Background()
blockSize := 1024 * 1024 * 3
for i := 1; i < 5; i++ {
uploadPart(c, name, uploadID, blockSize, i)
}
opt := &cos.ObjectListUploadsOptions{
Prefix: cos.EncodeURIComponent("test/test_list_parts"),
MaxUploads: 100,
}
v, _, err := c.Object.ListUploads(context.Background(), opt)
if err != nil {
log_status(err)
return
}
fmt.Printf("%+v\n", v)
for _, p := range v.Upload {
fmt.Printf("%+v\n", p)
fmt.Printf("%v, %v, %v\n", p.Key, p.UploadID, p.Initiated)
}
}

193
object.go

@ -2,6 +2,7 @@ package cos
import (
"context"
"crypto/md5"
"encoding/xml"
"errors"
"fmt"
@ -32,6 +33,7 @@ type ObjectGetOptions struct {
XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
}
@ -149,7 +151,6 @@ type ObjectPutHeaderOptions struct {
XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
//兼容其他自定义头部
XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
}
@ -488,18 +489,7 @@ type MultiUploadOptions struct {
OptIni *InitiateMultipartUploadOptions
PartSize int64
ThreadPoolSize int
CheckPointFile string
EnableCheckpoint bool
}
type CheckPointOptions struct {
cpfile *os.File
Key string `xml:"Key"`
FilePath string `xml:"FilePath"`
FileSize int64 `xml:"FileSize"`
PartSize int64 `xml:"PartSize"`
UploadID string `xml:"UploadID"`
Parts []Object `xml:"Parts>Part,omitempty"`
CheckPoint bool
}
type Chunk struct {
@ -574,73 +564,34 @@ func DividePart(fileSize int64) (int64, int64) {
return partNum, partSize
}
func SplitFileIntoChunks(name, filePath string, opt *MultiUploadOptions) (*CheckPointOptions, []Chunk, int, error) {
func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) {
if filePath == "" {
return nil, nil, 0, errors.New("filePath invalid")
return nil, 0, errors.New("filePath invalid")
}
file, err := os.Open(filePath)
if err != nil {
return nil, nil, 0, err
return nil, 0, err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return nil, nil, 0, err
return nil, 0, err
}
optcp := &CheckPointOptions{}
uploaded := false
if opt.EnableCheckpoint {
for {
optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDONLY|os.O_CREATE, 0644)
if err != nil {
return nil, nil, 0, errors.New("open(create) checkpoint file[" + opt.CheckPointFile + "] failed, error:" + err.Error())
}
defer optcp.cpfile.Close()
bs, err := ioutil.ReadAll(optcp.cpfile)
if err != nil {
break
}
err = xml.Unmarshal(bs, optcp)
if err != nil {
break
}
if optcp.Key != name || optcp.FilePath != filePath || optcp.FileSize != stat.Size() {
optcp = &CheckPointOptions{}
break
}
uploaded = true
break
}
optcp.Key = name
optcp.FilePath = filePath
optcp.FileSize = stat.Size()
}
var partNum int64
partSize := opt.PartSize
if uploaded {
partSize = optcp.PartSize
}
if partSize > 0 {
partSize = partSize * 1024 * 1024
partNum = stat.Size() / partSize
if partNum >= 10000 {
return nil, nil, 0, errors.New("Too many parts, out of 10000")
return nil, 0, errors.New("Too many parts, out of 10000")
}
} else {
partNum, partSize = DividePart(stat.Size())
}
if opt.EnableCheckpoint {
optcp.PartSize = partSize / 1024 / 1024
}
var chunks []Chunk
var chunk = Chunk{
Done: false,
}
var chunk = Chunk{}
for i := int64(0); i < partNum; i++ {
chunk.Number = int(i + 1)
chunk.OffSet = i * partSize
@ -656,14 +607,84 @@ func SplitFileIntoChunks(name, filePath string, opt *MultiUploadOptions) (*Check
partNum++
}
if uploaded {
for _, part := range optcp.Parts {
if part.PartNumber <= int(partNum) {
chunks[(part.PartNumber - 1)].Done = true
return chunks, int(partNum), nil
}
func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) {
opt := &ObjectListUploadsOptions{
Prefix: name,
EncodingType: "url",
}
res, _, err := s.ListUploads(ctx, opt)
if err != nil {
return "", err
}
if len(res.Upload) == 0 {
return "", nil
}
last := len(res.Upload) - 1
for last >= 0 {
decodeKey, _ := decodeURIComponent(res.Upload[last].Key)
if decodeKey == name {
return decodeURIComponent(res.Upload[last].UploadID)
}
last = last - 1
}
return optcp, chunks, int(partNum), nil
return "", nil
}
func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error {
var err error
var uploadedParts []Object
isTruncated := true
opt := &ObjectListPartsOptions{
EncodingType: "url",
}
for isTruncated {
res, _, err := s.ListParts(ctx, name, UploadID, opt)
if err != nil {
return err
}
if len(res.Parts) > 0 {
uploadedParts = append(uploadedParts, res.Parts...)
}
isTruncated = res.IsTruncated
opt.PartNumberMarker = res.NextPartNumberMarker
}
fd, err := os.Open(filepath)
if err != nil {
return err
}
defer fd.Close()
err = nil
for _, part := range uploadedParts {
partNumber := part.PartNumber
if partNumber > partNum {
err = errors.New("Part Number is not consistent")
break
}
partNumber = partNumber - 1
fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET)
bs, e := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size))
if e != nil {
err = e
break
}
localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs))
if localMD5 != part.ETag {
err = errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber))
break
}
chunks[partNumber].Done = true
}
// 某个分块出错, 重置chunks
if err != nil {
for _, chunk := range chunks {
chunk.Done = false
}
}
return err
}
// MultiUpload/Upload 为高级upload接口,并发分块上传
@ -680,12 +701,8 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
if opt == nil {
opt = &MultiUploadOptions{}
}
if opt.EnableCheckpoint && opt.CheckPointFile == "" {
opt.CheckPointFile = fmt.Sprintf("%s.cp", filepath)
}
// 1.Get the file chunk
optcp, chunks, partNum, err := SplitFileIntoChunks(name, filepath, opt)
chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
if err != nil {
return nil, nil, err
}
@ -707,24 +724,26 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
}
return result, rsp, nil
}
if opt.EnableCheckpoint {
optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDWR, 0644)
if err != nil {
return nil, nil, errors.New("open checkpoint file failed, error: " + err.Error())
var uploadID string
resumableFlag := false
if opt.CheckPoint {
var err error
uploadID, err = s.getResumableUploadID(ctx, name)
if err == nil && uploadID != "" {
err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum)
resumableFlag = (err == nil)
}
defer optcp.cpfile.Close()
}
uploadID := optcp.UploadID
optini := opt.OptIni
if uploadID == "" {
// 2.Init
optini := opt.OptIni
if !resumableFlag {
res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
if err != nil {
return nil, nil, err
}
uploadID = res.UploadID
optcp.UploadID = uploadID
}
var poolSize int
if opt.ThreadPoolSize > 0 {
@ -737,10 +756,6 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
chjobs := make(chan *Jobs, 100)
chresults := make(chan *Results, 10000)
optcom := &CompleteMultipartUploadOptions{}
if len(optcp.Parts) > 0 {
optcom.Parts = append(optcom.Parts, optcp.Parts...)
partNum -= len(optcp.Parts)
}
// 3.Start worker
for w := 1; w <= poolSize; w++ {
@ -784,26 +799,10 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
optcom.Parts = append(optcom.Parts, Object{
PartNumber: res.PartNumber, ETag: etag},
)
if opt.EnableCheckpoint {
optcp.Parts = append(optcp.Parts, Object{
PartNumber: res.PartNumber, ETag: etag},
)
err := optcp.cpfile.Truncate(0)
if err != nil {
continue
}
_, err = optcp.cpfile.Seek(0, os.SEEK_SET)
if err == nil {
xml.NewEncoder(optcp.cpfile).Encode(optcp)
}
}
}
sort.Sort(ObjectList(optcom.Parts))
v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
if opt.EnableCheckpoint && err == nil {
os.Remove(opt.CheckPointFile)
}
return v, resp, err
}

47
object_part.go

@ -246,3 +246,50 @@ func (s *ObjectService) CopyPart(ctx context.Context, name, uploadID string, par
}
return &res, resp, err
}
type ObjectListUploadsOptions struct {
Delimiter string `url:"Delimiter,omitempty"`
EncodingType string `url:"EncodingType,omitempty"`
Prefix string `url:"Prefix"`
MaxUploads int `url:"MaxUploads"`
KeyMarker string `url:"KeyMarker"`
UploadIdMarker string `url:"UploadIDMarker"`
}
type ObjectListUploadsResult struct {
XMLName xml.Name `xml:"ListMultipartUploadsResult"`
Bucket string `xml:"Bucket,omitempty"`
EncodingType string `xml:"Encoding-Type,omitempty"`
KeyMarker string `xml:"KeyMarker,omitempty"`
UploadIdMarker string `xml:"UploadIdMarker,omitempty"`
NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
NextUploadIdMarker string `xml:"NextUploadIdMarker,omitempty"`
MaxUploads string `xml:"MaxUploads,omitempty"`
IsTruncated bool `xml:"IsTruncated,omitempty"`
Prefix string `xml:"Prefix,omitempty"`
Delimiter string `xml:"Delimiter,omitempty"`
Upload []ListUploadsResultUpload `xml:"Upload,omitempty"`
CommonPrefixes []string `xml:"CommonPrefixes>Prefix,omitempty"`
}
type ListUploadsResultUpload struct {
Key string `xml:"Key,omitempty"`
UploadID string `xml:"UploadId,omitempty"`
StorageClass string `xml:"StorageClass,omitempty"`
Initiator *Initiator `xml:"Initiator,omitempty"`
Owner *Owner `xml:"Owner,omitempty"`
Initiated string `xml:"Initiated,omitempty"`
}
func (s *ObjectService) ListUploads(ctx context.Context, opt *ObjectListUploadsOptions) (*ObjectListUploadsResult, *Response, error) {
var res ObjectListUploadsResult
sendOpt := &sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: "/?uploads",
method: http.MethodGet,
optQuery: opt,
result: &res,
}
resp, err := s.client.send(ctx, sendOpt)
return &res, resp, err
}
Loading…
Cancel
Save