From 64d31f318a223866ac16c64bbe9effcd168b696d Mon Sep 17 00:00:00 2001 From: jojoliang Date: Wed, 22 Jul 2020 14:46:26 +0800 Subject: [PATCH] =?UTF-8?q?add=20ListMultiUploads=20&&=20=E6=96=AD?= =?UTF-8?q?=E7=82=B9=E7=BB=AD=E4=BC=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cos.go | 2 +- example/object/list_uploads.go | 99 +++++++++++++++++++ object.go | 209 ++++++++++++++++++++--------------------- object_part.go | 47 +++++++++ 4 files changed, 251 insertions(+), 106 deletions(-) create mode 100644 example/object/list_uploads.go diff --git a/cos.go b/cos.go index 665084f..42ca376 100644 --- a/cos.go +++ b/cos.go @@ -22,7 +22,7 @@ import ( const ( // Version current go sdk version - Version = "0.7.10" + Version = "0.7.11" userAgent = "cos-go-sdk-v5/" + Version contentTypeXML = "application/xml" defaultServiceBaseURL = "http://service.cos.myqcloud.com" diff --git a/example/object/list_uploads.go b/example/object/list_uploads.go new file mode 100644 index 0000000..f2bf8aa --- /dev/null +++ b/example/object/list_uploads.go @@ -0,0 +1,99 @@ +package main + +import ( + "context" + "fmt" + "math/rand" + "net/url" + "os" + "strings" + + "net/http" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func initUpload(c *cos.Client, name string) *cos.InitiateMultipartUploadResult { + v, _, err := c.Object.InitiateMultipartUpload(context.Background(), name, nil) + log_status(err) + fmt.Printf("%#v\n", v) + return v +} + +func uploadPart(c *cos.Client, name string, uploadID string, blockSize, n int) string { + + b := make([]byte, blockSize) + if _, err := rand.Read(b); err != nil { + log_status(err) + } + s := fmt.Sprintf("%X", b) + f := strings.NewReader(s) + + resp, err := c.Object.UploadPart( + context.Background(), name, uploadID, n, f, nil, + ) + log_status(err) + fmt.Printf("%s\n", resp.Status) + return resp.Header.Get("Etag") +} + +func main() { + u, _ := url.Parse("http://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + RequestBody: false, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + + name := "test/test_list_parts.go" + up := initUpload(c, name) + uploadID := up.UploadID + ctx := context.Background() + blockSize := 1024 * 1024 * 3 + + for i := 1; i < 5; i++ { + uploadPart(c, name, uploadID, blockSize, i) + } + opt := &cos.ObjectListUploadsOptions{ + Prefix: cos.EncodeURIComponent("test/test_list_parts"), + MaxUploads: 100, + } + v, _, err := c.Object.ListUploads(context.Background(), opt) + if err != nil { + log_status(err) + return + } + fmt.Printf("%+v\n", v) + for _, p := range v.Upload { + fmt.Printf("%+v\n", p) + fmt.Printf("%v, %v, %v\n", p.Key, p.UploadID, p.Initiated) + } +} diff --git a/object.go b/object.go index 0c348fc..1cea83f 100644 --- a/object.go +++ b/object.go @@ -2,6 +2,7 @@ package cos import ( "context" + "crypto/md5" "encoding/xml" "errors" "fmt" @@ -32,7 +33,8 @@ type ObjectGetOptions struct { XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"` XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"` XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` - XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` + + XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` } // presignedURLTestingOptions is the opt of presigned url @@ -148,9 +150,8 @@ type ObjectPutHeaderOptions struct { XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"` XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` //兼容其他自定义头部 - XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"` - - XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` + XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"` + XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` } // ObjectPutOptions the options of put object @@ -485,21 +486,10 @@ type Object struct { // MultiUploadOptions is the option of the multiupload, // ThreadPoolSize default is one type MultiUploadOptions struct { - OptIni *InitiateMultipartUploadOptions - PartSize int64 - ThreadPoolSize int - CheckPointFile string - EnableCheckpoint bool -} - -type CheckPointOptions struct { - cpfile *os.File - Key string `xml:"Key"` - FilePath string `xml:"FilePath"` - FileSize int64 `xml:"FileSize"` - PartSize int64 `xml:"PartSize"` - UploadID string `xml:"UploadID"` - Parts []Object `xml:"Parts>Part,omitempty"` + OptIni *InitiateMultipartUploadOptions + PartSize int64 + ThreadPoolSize int + CheckPoint bool } type Chunk struct { @@ -574,73 +564,34 @@ func DividePart(fileSize int64) (int64, int64) { return partNum, partSize } -func SplitFileIntoChunks(name, filePath string, opt *MultiUploadOptions) (*CheckPointOptions, []Chunk, int, error) { +func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) { if filePath == "" { - return nil, nil, 0, errors.New("filePath invalid") + return nil, 0, errors.New("filePath invalid") } file, err := os.Open(filePath) if err != nil { - return nil, nil, 0, err + return nil, 0, err } defer file.Close() stat, err := file.Stat() if err != nil { - return nil, nil, 0, err - } - - optcp := &CheckPointOptions{} - uploaded := false - if opt.EnableCheckpoint { - for { - optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDONLY|os.O_CREATE, 0644) - if err != nil { - return nil, nil, 0, errors.New("open(create) checkpoint file[" + opt.CheckPointFile + "] failed, error:" + err.Error()) - } - defer optcp.cpfile.Close() - bs, err := ioutil.ReadAll(optcp.cpfile) - if err != nil { - break - } - err = xml.Unmarshal(bs, optcp) - if err != nil { - break - } - if optcp.Key != name || optcp.FilePath != filePath || optcp.FileSize != stat.Size() { - optcp = &CheckPointOptions{} - break - } - uploaded = true - break - } - optcp.Key = name - optcp.FilePath = filePath - optcp.FileSize = stat.Size() + return nil, 0, err } - var partNum int64 - partSize := opt.PartSize - if uploaded { - partSize = optcp.PartSize - } if partSize > 0 { partSize = partSize * 1024 * 1024 partNum = stat.Size() / partSize if partNum >= 10000 { - return nil, nil, 0, errors.New("Too many parts, out of 10000") + return nil, 0, errors.New("Too many parts, out of 10000") } } else { partNum, partSize = DividePart(stat.Size()) } - if opt.EnableCheckpoint { - optcp.PartSize = partSize / 1024 / 1024 - } var chunks []Chunk - var chunk = Chunk{ - Done: false, - } + var chunk = Chunk{} for i := int64(0); i < partNum; i++ { chunk.Number = int(i + 1) chunk.OffSet = i * partSize @@ -656,14 +607,84 @@ func SplitFileIntoChunks(name, filePath string, opt *MultiUploadOptions) (*Check partNum++ } - if uploaded { - for _, part := range optcp.Parts { - if part.PartNumber <= int(partNum) { - chunks[(part.PartNumber - 1)].Done = true - } + return chunks, int(partNum), nil + +} + +func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) { + opt := &ObjectListUploadsOptions{ + Prefix: name, + EncodingType: "url", + } + res, _, err := s.ListUploads(ctx, opt) + if err != nil { + return "", err + } + if len(res.Upload) == 0 { + return "", nil + } + last := len(res.Upload) - 1 + for last >= 0 { + decodeKey, _ := decodeURIComponent(res.Upload[last].Key) + if decodeKey == name { + return decodeURIComponent(res.Upload[last].UploadID) } + last = last - 1 } - return optcp, chunks, int(partNum), nil + return "", nil +} + +func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error { + var err error + var uploadedParts []Object + isTruncated := true + opt := &ObjectListPartsOptions{ + EncodingType: "url", + } + for isTruncated { + res, _, err := s.ListParts(ctx, name, UploadID, opt) + if err != nil { + return err + } + if len(res.Parts) > 0 { + uploadedParts = append(uploadedParts, res.Parts...) + } + isTruncated = res.IsTruncated + opt.PartNumberMarker = res.NextPartNumberMarker + } + fd, err := os.Open(filepath) + if err != nil { + return err + } + defer fd.Close() + err = nil + for _, part := range uploadedParts { + partNumber := part.PartNumber + if partNumber > partNum { + err = errors.New("Part Number is not consistent") + break + } + partNumber = partNumber - 1 + fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET) + bs, e := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size)) + if e != nil { + err = e + break + } + localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs)) + if localMD5 != part.ETag { + err = errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber)) + break + } + chunks[partNumber].Done = true + } + // 某个分块出错, 重置chunks + if err != nil { + for _, chunk := range chunks { + chunk.Done = false + } + } + return err } // MultiUpload/Upload 为高级upload接口,并发分块上传 @@ -680,12 +701,8 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string if opt == nil { opt = &MultiUploadOptions{} } - if opt.EnableCheckpoint && opt.CheckPointFile == "" { - opt.CheckPointFile = fmt.Sprintf("%s.cp", filepath) - } - // 1.Get the file chunk - optcp, chunks, partNum, err := SplitFileIntoChunks(name, filepath, opt) + chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize) if err != nil { return nil, nil, err } @@ -706,25 +723,27 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string ETag: rsp.Header.Get("ETag"), } return result, rsp, nil - } - if opt.EnableCheckpoint { - optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDWR, 0644) - if err != nil { - return nil, nil, errors.New("open checkpoint file failed, error: " + err.Error()) + } + + var uploadID string + resumableFlag := false + if opt.CheckPoint { + var err error + uploadID, err = s.getResumableUploadID(ctx, name) + if err == nil && uploadID != "" { + err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum) + resumableFlag = (err == nil) } - defer optcp.cpfile.Close() } - uploadID := optcp.UploadID + // 2.Init optini := opt.OptIni - if uploadID == "" { - // 2.Init + if !resumableFlag { res, _, err := s.InitiateMultipartUpload(ctx, name, optini) if err != nil { return nil, nil, err } uploadID = res.UploadID - optcp.UploadID = uploadID } var poolSize int if opt.ThreadPoolSize > 0 { @@ -737,10 +756,6 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string chjobs := make(chan *Jobs, 100) chresults := make(chan *Results, 10000) optcom := &CompleteMultipartUploadOptions{} - if len(optcp.Parts) > 0 { - optcom.Parts = append(optcom.Parts, optcp.Parts...) - partNum -= len(optcp.Parts) - } // 3.Start worker for w := 1; w <= poolSize; w++ { @@ -784,26 +799,10 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string optcom.Parts = append(optcom.Parts, Object{ PartNumber: res.PartNumber, ETag: etag}, ) - if opt.EnableCheckpoint { - optcp.Parts = append(optcp.Parts, Object{ - PartNumber: res.PartNumber, ETag: etag}, - ) - err := optcp.cpfile.Truncate(0) - if err != nil { - continue - } - _, err = optcp.cpfile.Seek(0, os.SEEK_SET) - if err == nil { - xml.NewEncoder(optcp.cpfile).Encode(optcp) - } - } } sort.Sort(ObjectList(optcom.Parts)) v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom) - if opt.EnableCheckpoint && err == nil { - os.Remove(opt.CheckPointFile) - } return v, resp, err } diff --git a/object_part.go b/object_part.go index d2d3bc4..ee181b8 100644 --- a/object_part.go +++ b/object_part.go @@ -246,3 +246,50 @@ func (s *ObjectService) CopyPart(ctx context.Context, name, uploadID string, par } return &res, resp, err } + +type ObjectListUploadsOptions struct { + Delimiter string `url:"Delimiter,omitempty"` + EncodingType string `url:"EncodingType,omitempty"` + Prefix string `url:"Prefix"` + MaxUploads int `url:"MaxUploads"` + KeyMarker string `url:"KeyMarker"` + UploadIdMarker string `url:"UploadIDMarker"` +} + +type ObjectListUploadsResult struct { + XMLName xml.Name `xml:"ListMultipartUploadsResult"` + Bucket string `xml:"Bucket,omitempty"` + EncodingType string `xml:"Encoding-Type,omitempty"` + KeyMarker string `xml:"KeyMarker,omitempty"` + UploadIdMarker string `xml:"UploadIdMarker,omitempty"` + NextKeyMarker string `xml:"NextKeyMarker,omitempty"` + NextUploadIdMarker string `xml:"NextUploadIdMarker,omitempty"` + MaxUploads string `xml:"MaxUploads,omitempty"` + IsTruncated bool `xml:"IsTruncated,omitempty"` + Prefix string `xml:"Prefix,omitempty"` + Delimiter string `xml:"Delimiter,omitempty"` + Upload []ListUploadsResultUpload `xml:"Upload,omitempty"` + CommonPrefixes []string `xml:"CommonPrefixes>Prefix,omitempty"` +} + +type ListUploadsResultUpload struct { + Key string `xml:"Key,omitempty"` + UploadID string `xml:"UploadId,omitempty"` + StorageClass string `xml:"StorageClass,omitempty"` + Initiator *Initiator `xml:"Initiator,omitempty"` + Owner *Owner `xml:"Owner,omitempty"` + Initiated string `xml:"Initiated,omitempty"` +} + +func (s *ObjectService) ListUploads(ctx context.Context, opt *ObjectListUploadsOptions) (*ObjectListUploadsResult, *Response, error) { + var res ObjectListUploadsResult + sendOpt := &sendOptions{ + baseURL: s.client.BaseURL.BucketURL, + uri: "/?uploads", + method: http.MethodGet, + optQuery: opt, + result: &res, + } + resp, err := s.client.send(ctx, sendOpt) + return &res, resp, err +}