diff --git a/cos.go b/cos.go index 616f2bf..2f5dacf 100644 --- a/cos.go +++ b/cos.go @@ -22,7 +22,7 @@ import ( const ( // Version current go sdk version - Version = "0.7.16" + Version = "0.7.17" userAgent = "cos-go-sdk-v5/" + Version contentTypeXML = "application/xml" defaultServiceBaseURL = "http://service.cos.myqcloud.com" diff --git a/example/object/multicopy.go b/example/object/multicopy.go new file mode 100644 index 0000000..3c26ccc --- /dev/null +++ b/example/object/multicopy.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "net/url" + "os" + + "net/http" + + "fmt" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + RequestBody: true, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + + opt := &cos.MultiCopyOptions{ + OptCopy: &cos.ObjectCopyOptions{ + &cos.ObjectCopyHeaderOptions{ + XCosStorageClass: "Archive", + }, + nil, + }, + ThreadPoolSize: 10, + } + source := "exampleobject" + soruceURL := fmt.Sprintf("%s/%s", u.Host, source) + dest := fmt.Sprintf("destobject") + res, _, err := c.Object.MultiCopy(context.Background(), dest, soruceURL, opt) + log_status(err) + fmt.Printf("res:%+v\n", res) +} diff --git a/helper.go b/helper.go index c178016..5aef708 100644 --- a/helper.go +++ b/helper.go @@ -152,3 +152,33 @@ func CheckReaderLen(reader io.Reader) error { } return errors.New("The single object size you upload can not be larger than 5GB") } + +func CopyOptionsToMulti(opt *ObjectCopyOptions) *InitiateMultipartUploadOptions { + if opt == nil { + return nil + } + optini := &InitiateMultipartUploadOptions{ + opt.ACLHeaderOptions, + &ObjectPutHeaderOptions{}, + } + if opt.ObjectCopyHeaderOptions == nil { + return optini + } + optini.ObjectPutHeaderOptions = &ObjectPutHeaderOptions{ + CacheControl: opt.ObjectCopyHeaderOptions.CacheControl, + ContentDisposition: opt.ObjectCopyHeaderOptions.ContentDisposition, + ContentEncoding: opt.ObjectCopyHeaderOptions.ContentEncoding, + ContentType: opt.ObjectCopyHeaderOptions.ContentType, + ContentLanguage: opt.ObjectCopyHeaderOptions.ContentLanguage, + Expect: opt.ObjectCopyHeaderOptions.Expect, + Expires: opt.ObjectCopyHeaderOptions.Expires, + XCosMetaXXX: opt.ObjectCopyHeaderOptions.XCosMetaXXX, + XCosStorageClass: opt.ObjectCopyHeaderOptions.XCosStorageClass, + XCosServerSideEncryption: opt.ObjectCopyHeaderOptions.XCosServerSideEncryption, + XCosSSECustomerAglo: opt.ObjectCopyHeaderOptions.XCosSSECustomerAglo, + XCosSSECustomerKey: opt.ObjectCopyHeaderOptions.XCosSSECustomerKey, + XCosSSECustomerKeyMD5: opt.ObjectCopyHeaderOptions.XCosSSECustomerKeyMD5, + XOptionHeader: opt.ObjectCopyHeaderOptions.XOptionHeader, + } + return optini +} diff --git a/object.go b/object.go index de7733e..71b60af 100644 --- a/object.go +++ b/object.go @@ -258,6 +258,8 @@ type ObjectCopyResult struct { XMLName xml.Name `xml:"CopyObjectResult"` ETag string `xml:"ETag,omitempty"` LastModified string `xml:"LastModified,omitempty"` + CRC64 string `xml:"CRC64,omitempty"` + VersionId string `xml:"VersionId,omitempty"` } // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G, @@ -588,8 +590,8 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { } } -func DividePart(fileSize int64) (int64, int64) { - partSize := int64(1 * 1024 * 1024) +func DividePart(fileSize int64, last int) (int64, int64) { + partSize := int64(last * 1024 * 1024) partNum := fileSize / partSize for partNum >= 10000 { partSize = partSize * 2 @@ -621,7 +623,7 @@ func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int, return 0, nil, 0, errors.New("Too many parts, out of 10000") } } else { - partNum, partSize = DividePart(stat.Size()) + partNum, partSize = DividePart(stat.Size(), 1) } var chunks []Chunk @@ -866,6 +868,9 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string progressCallback(listener, event) v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom) + if err != nil { + s.AbortMultipartUpload(ctx, name, uploadID) + } return v, resp, err } diff --git a/object_part.go b/object_part.go index 1752e02..bdae188 100644 --- a/object_part.go +++ b/object_part.go @@ -7,6 +7,9 @@ import ( "fmt" "io" "net/http" + "net/url" + "sort" + "strings" ) // InitiateMultipartUploadOptions is the option of InitateMultipartUpload @@ -307,3 +310,206 @@ func (s *ObjectService) ListUploads(ctx context.Context, opt *ObjectListUploadsO resp, err := s.client.send(ctx, sendOpt) return &res, resp, err } + +type MultiCopyOptions struct { + OptCopy *ObjectCopyOptions + PartSize int64 + ThreadPoolSize int +} + +type CopyJobs struct { + Name string + UploadId string + RetryTimes int + Chunk Chunk + Opt *ObjectCopyPartOptions +} + +type CopyResults struct { + PartNumber int + Resp *Response + err error + res *CopyPartResult +} + +func copyworker(s *ObjectService, jobs <-chan *CopyJobs, results chan<- *CopyResults) { + for j := range jobs { + var copyres CopyResults + j.Opt.XCosCopySourceRange = fmt.Sprintf("bytes=%d-%d", j.Chunk.OffSet, j.Chunk.OffSet+j.Chunk.Size-1) + rt := j.RetryTimes + for { + res, resp, err := s.CopyPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number, j.Opt.XCosCopySource, j.Opt) + copyres.PartNumber = j.Chunk.Number + copyres.Resp = resp + copyres.err = err + copyres.res = res + if err != nil { + rt-- + if rt == 0 { + results <- ©res + break + } + continue + } + results <- ©res + break + } + } +} + +func (s *ObjectService) innerHead(ctx context.Context, sourceURL string, opt *ObjectHeadOptions, id []string) (resp *Response, err error) { + surl := strings.SplitN(sourceURL, "/", 2) + if len(surl) < 2 { + err = errors.New(fmt.Sprintf("sourceURL format error: %s", sourceURL)) + return + } + + u, err := url.Parse(fmt.Sprintf("https://%s", surl[0])) + if err != nil { + return + } + b := &BaseURL{BucketURL: u} + client := NewClient(b, &http.Client{ + Transport: s.client.client.Transport, + }) + if len(id) > 0 { + resp, err = client.Object.Head(ctx, surl[1], nil, id[0]) + } else { + resp, err = client.Object.Head(ctx, surl[1], nil) + } + return +} + +func SplitCopyFileIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, error) { + var partNum int64 + if partSize > 0 { + partSize = partSize * 1024 * 1024 + partNum = totalBytes / partSize + if partNum >= 10000 { + return nil, 0, errors.New("Too many parts, out of 10000") + } + } else { + partNum, partSize = DividePart(totalBytes, 64) + } + + var chunks []Chunk + var chunk = Chunk{} + for i := int64(0); i < partNum; i++ { + chunk.Number = int(i + 1) + chunk.OffSet = i * partSize + chunk.Size = partSize + chunks = append(chunks, chunk) + } + + if totalBytes%partSize > 0 { + chunk.Number = len(chunks) + 1 + chunk.OffSet = int64(len(chunks)) * partSize + chunk.Size = totalBytes % partSize + chunks = append(chunks, chunk) + partNum++ + } + return chunks, int(partNum), nil +} + +func (s *ObjectService) MultiCopy(ctx context.Context, name string, sourceURL string, opt *MultiCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) { + resp, err := s.innerHead(ctx, sourceURL, nil, id) + if err != nil { + return nil, nil, err + } + totalBytes := resp.ContentLength + surl := strings.SplitN(sourceURL, "/", 2) + if len(surl) < 2 { + return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL)) + } + var u string + if len(id) == 1 { + u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0]) + } else if len(id) == 0 { + u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1])) + } else { + return nil, nil, errors.New("wrong params") + } + + if opt == nil { + opt = &MultiCopyOptions{} + } + chunks, partNum, err := SplitCopyFileIntoChunks(totalBytes, opt.PartSize) + if err != nil { + return nil, nil, err + } + if partNum == 0 || totalBytes < singleUploadMaxLength { + if len(id) > 0 { + return s.Copy(ctx, name, sourceURL, opt.OptCopy, id[0]) + } else { + return s.Copy(ctx, name, sourceURL, opt.OptCopy) + } + } + optini := CopyOptionsToMulti(opt.OptCopy) + var uploadID string + res, _, err := s.InitiateMultipartUpload(ctx, name, optini) + if err != nil { + return nil, nil, err + } + uploadID = res.UploadID + + var poolSize int + if opt.ThreadPoolSize > 0 { + poolSize = opt.ThreadPoolSize + } else { + poolSize = 1 + } + + chjobs := make(chan *CopyJobs, 100) + chresults := make(chan *CopyResults, 10000) + optcom := &CompleteMultipartUploadOptions{} + + for w := 1; w <= poolSize; w++ { + go copyworker(s, chjobs, chresults) + } + + go func() { + for _, chunk := range chunks { + partOpt := &ObjectCopyPartOptions{ + XCosCopySource: u, + } + job := &CopyJobs{ + Name: name, + RetryTimes: 3, + UploadId: uploadID, + Chunk: chunk, + Opt: partOpt, + } + chjobs <- job + } + close(chjobs) + }() + + err = nil + for i := 0; i < partNum; i++ { + res := <-chresults + if res.res == nil || res.err != nil { + err = fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error()) + break + } + etag := res.res.ETag + optcom.Parts = append(optcom.Parts, Object{ + PartNumber: res.PartNumber, ETag: etag}, + ) + } + close(chresults) + if err != nil { + return nil, nil, err + } + sort.Sort(ObjectList(optcom.Parts)) + + v, resp, err := s.CompleteMultipartUpload(ctx, name, uploadID, optcom) + if err != nil { + s.AbortMultipartUpload(ctx, name, uploadID) + } + cpres := &ObjectCopyResult{ + ETag: v.ETag, + CRC64: resp.Header.Get("x-cos-hash-crc64ecma"), + VersionId: resp.Header.Get("x-cos-version-id"), + } + return cpres, resp, err +}