19 Commits

Author SHA1 Message Date
agin719
2c06d5299e Merge pull request #100 from agin719/cos-v4-dev
update upload
2021-01-11 07:37:06 -06:00
jojoliang
c403391509 update upload 2021-01-11 21:29:34 +08:00
gouki0123
5172396831 Updated CHANGELOG.md 2020-12-29 10:57:52 +00:00
agin719
957a6e1b9c Merge pull request #99 from agin719/cos-v4-dev
add MultiCopy
2020-12-29 04:56:01 -06:00
jojoliang
efead0ccc2 add MultiCopy 2020-12-29 17:43:52 +08:00
gouki0123
91fc87ba8a Updated CHANGELOG.md 2020-12-25 03:08:01 +00:00
agin719
6a2da87a06 Merge pull request #98 from agin719/cos-v4-dev
Cos v4 dev
2020-12-24 21:05:38 -06:00
jojoliang
c07e49771c Add Bucket Accelerate 2020-12-25 10:58:17 +08:00
jojoliang
6da3d4094c update version 2020-12-23 20:16:59 +08:00
jojoliang
72e7751604 update upload progress && single object length 2020-12-23 20:09:56 +08:00
gouki0123
5057561906 Updated CHANGELOG.md 2020-12-11 07:03:07 +00:00
agin719
3b45c0ae0e Merge pull request #96 from agin719/cos-v4-dev
update ci
2020-12-11 01:01:55 -06:00
jojoliang
ba2e64b62e update ci 2020-12-11 14:46:59 +08:00
gouki0123
b8afb3f850 Updated CHANGELOG.md 2020-12-09 13:18:08 +00:00
agin719
b44176ed7c Merge pull request #95 from agin719/cos-v4-dev
update version
2020-12-09 07:16:56 -06:00
jojoliang
757ad28a32 update version 2020-12-09 21:15:09 +08:00
agin719
f7881abec6 Merge pull request #94 from agin719/cos-v4-dev
fix bucket lifecycle
2020-12-09 07:00:38 -06:00
jojoliang
0c4356fe8e fix bucket lifecycle 2020-12-09 20:56:09 +08:00
gouki0123
e3a89ee58b Updated CHANGELOG.md 2020-12-09 02:54:44 +00:00
15 changed files with 659 additions and 46 deletions

View File

@@ -7,6 +7,74 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
Generated by [`auto-changelog`](https://github.com/CookPete/auto-changelog). Generated by [`auto-changelog`](https://github.com/CookPete/auto-changelog).
## [v0.7.17](https://github.com/tencentyun/cos-go-sdk-v5/compare/v0.7.16...v0.7.17) - 2020-12-29
add multicopy
### Merged
- add MultiCopy [`#99`](https://github.com/tencentyun/cos-go-sdk-v5/pull/99)
### Commits
- Updated CHANGELOG.md [`91fc87b`](https://github.com/tencentyun/cos-go-sdk-v5/commit/91fc87ba8af042adb0d0bf147f8fe6aa49057976)
## [v0.7.16](https://github.com/tencentyun/cos-go-sdk-v5/compare/v0.7.15...v0.7.16) - 2020-12-24
add bucket accelerate && update upload progress
### Merged
- Cos v4 dev [`#98`](https://github.com/tencentyun/cos-go-sdk-v5/pull/98)
### Commits
- Add Bucket Accelerate [`c07e497`](https://github.com/tencentyun/cos-go-sdk-v5/commit/c07e49771c809fab640ac9f2c31d776de9dea23d)
- update upload progress && single object length [`72e7751`](https://github.com/tencentyun/cos-go-sdk-v5/commit/72e77516044f833f60577906800a638ebb31dc83)
- Updated CHANGELOG.md [`5057561`](https://github.com/tencentyun/cos-go-sdk-v5/commit/50575619064fbd54e69745b1884756e6f6222a99)
- update version [`6da3d40`](https://github.com/tencentyun/cos-go-sdk-v5/commit/6da3d4094cd8ca8e6840dedcb8b540c14e1f4c93)
## [v0.7.15](https://github.com/tencentyun/cos-go-sdk-v5/compare/v0.7.14...v0.7.15) - 2020-12-11
update ci & ci document
### Merged
- update ci [`#96`](https://github.com/tencentyun/cos-go-sdk-v5/pull/96)
### Commits
- Updated CHANGELOG.md [`b8afb3f`](https://github.com/tencentyun/cos-go-sdk-v5/commit/b8afb3f85050cee4884a46c8ed49d26bf76d10a4)
## [v0.7.14](https://github.com/tencentyun/cos-go-sdk-v5/compare/v0.7.13...v0.7.14) - 2020-12-09
fix bucket lifecycle
### Merged
- update version [`#95`](https://github.com/tencentyun/cos-go-sdk-v5/pull/95)
- fix bucket lifecycle [`#94`](https://github.com/tencentyun/cos-go-sdk-v5/pull/94)
### Commits
- Updated CHANGELOG.md [`e3a89ee`](https://github.com/tencentyun/cos-go-sdk-v5/commit/e3a89ee58b4f524c7ad5f2b1f0bdc688a2e39c32)
## [v0.7.13](https://github.com/tencentyun/cos-go-sdk-v5/compare/v0.7.12...v0.7.13) - 2020-12-08
add ci document && add progress
### Merged
- Cos v4 dev [`#93`](https://github.com/tencentyun/cos-go-sdk-v5/pull/93)
### Commits
- add ci doc [`dad5b1f`](https://github.com/tencentyun/cos-go-sdk-v5/commit/dad5b1f3fbbf30958d3d8ae930b06abcaf8db5ac)
- add progress [`2afc5e1`](https://github.com/tencentyun/cos-go-sdk-v5/commit/2afc5e192cd9bc8d6630fa929e10028ec79bde8e)
- Updated CHANGELOG.md [`e39f3e3`](https://github.com/tencentyun/cos-go-sdk-v5/commit/e39f3e3585e0478abe50b33962e1e3981a2d432c)
- update test [`4899c22`](https://github.com/tencentyun/cos-go-sdk-v5/commit/4899c226f70ccc1bb4ac8489a2b8ff000003bc97)
- add auto changelog workflow [`ca3ea38`](https://github.com/tencentyun/cos-go-sdk-v5/commit/ca3ea38770afcd11e50570a835090c68a377157f)
## [v0.7.12](https://github.com/tencentyun/cos-go-sdk-v5/compare/v0.7.11...v0.7.12) - 2020-11-25 ## [v0.7.12](https://github.com/tencentyun/cos-go-sdk-v5/compare/v0.7.11...v0.7.12) - 2020-11-25
update presignedurl && copy update presignedurl && copy

37
bucket_accelerate.go Normal file
View File

@@ -0,0 +1,37 @@
package cos
import (
"context"
"encoding/xml"
"net/http"
)
type BucketPutAccelerateOptions struct {
XMLName xml.Name `xml:"AccelerateConfiguration"`
Status string `xml:"Status,omitempty"`
Type string `xml:"Type,omitempty"`
}
type BucketGetAccelerateResult BucketPutAccelerateOptions
func (s *BucketService) PutAccelerate(ctx context.Context, opt *BucketPutAccelerateOptions) (*Response, error) {
sendOpt := &sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: "/?accelerate",
method: http.MethodPut,
body: opt,
}
resp, err := s.client.send(ctx, sendOpt)
return resp, err
}
func (s *BucketService) GetAccelerate(ctx context.Context) (*BucketGetAccelerateResult, *Response, error) {
var res BucketGetAccelerateResult
sendOpt := &sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: "/?accelerate",
method: http.MethodGet,
result: &res,
}
resp, err := s.client.send(ctx, sendOpt)
return &res, resp, err
}

74
bucket_accelerate_test.go Normal file
View File

@@ -0,0 +1,74 @@
package cos
import (
"context"
"encoding/xml"
"fmt"
"net/http"
"reflect"
"testing"
)
func TestBucketService_GetAccelerate(t *testing.T) {
setup()
defer teardown()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, "GET")
vs := values{
"accelerate": "",
}
testFormValues(t, r, vs)
fmt.Fprint(w, `<AccelerateConfiguration>
<Status>Enabled</Status>
<Type>COS</Type>
</AccelerateConfiguration>`)
})
res, _, err := client.Bucket.GetAccelerate(context.Background())
if err != nil {
t.Fatalf("Bucket.GetAccelerate returned error %v", err)
}
want := &BucketGetAccelerateResult{
XMLName: xml.Name{Local: "AccelerateConfiguration"},
Status: "Enabled",
Type: "COS",
}
if !reflect.DeepEqual(res, want) {
t.Errorf("Bucket.GetAccelerate returned %+v, want %+v", res, want)
}
}
func TestBucketService_PutAccelerate(t *testing.T) {
setup()
defer teardown()
opt := &BucketPutAccelerateOptions{
XMLName: xml.Name{Local: "AccelerateConfiguration"},
Status: "Enabled",
Type: "COS",
}
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, "PUT")
vs := values{
"accelerate": "",
}
testFormValues(t, r, vs)
body := new(BucketPutAccelerateOptions)
xml.NewDecoder(r.Body).Decode(body)
want := opt
want.XMLName = xml.Name{Local: "AccelerateConfiguration"}
if !reflect.DeepEqual(body, want) {
t.Errorf("Bucket.PutAccelerate request\n body: %+v\n, want %+v\n", body, want)
}
})
_, err := client.Bucket.PutAccelerate(context.Background(), opt)
if err != nil {
t.Fatalf("Bucket.PutAccelerate returned error: %v", err)
}
}

View File

@@ -26,7 +26,7 @@ type BucketLifecycleTransition struct {
// BucketLifecycleAbortIncompleteMultipartUpload is the param of BucketLifecycleRule // BucketLifecycleAbortIncompleteMultipartUpload is the param of BucketLifecycleRule
type BucketLifecycleAbortIncompleteMultipartUpload struct { type BucketLifecycleAbortIncompleteMultipartUpload struct {
DaysAfterInitiation string `xml:"DaysAfterInititation,omitempty"` DaysAfterInitiation int `xml:"DaysAfterInitiation,omitempty"`
} }
// BucketLifecycleRule is the rule of BucketLifecycle // BucketLifecycleRule is the rule of BucketLifecycle

19
ci.go
View File

@@ -31,21 +31,23 @@ func EncodePicOperations(pic *PicOperations) string {
} }
type ImageProcessResult struct { type ImageProcessResult struct {
XMLName xml.Name `xml:"UploadResult"` XMLName xml.Name `xml:"UploadResult"`
OriginalInfo *PicOriginalInfo `xml:"OriginalInfo,omitempty"` OriginalInfo *PicOriginalInfo `xml:"OriginalInfo,omitempty"`
ProcessObject *PicProcessObject `xml:"ProcessResults>Object,omitempty"` ProcessResults *PicProcessObject `xml:"ProcessResults>Object,omitempty"`
} }
type PicOriginalInfo struct { type PicOriginalInfo struct {
Key string `xml:"Key,omitempty"` Key string `xml:"Key,omitempty"`
Location string `xml:"Location,omitempty"` Location string `xml:"Location,omitempty"`
ImageInfo *PicImageInfo `xml:"ImageInfo,omitempty"` ImageInfo *PicImageInfo `xml:"ImageInfo,omitempty"`
ETag string `xml:"ETag,omitempty"`
} }
type PicImageInfo struct { type PicImageInfo struct {
Format string `xml:"Format,omitempty"` Format string `xml:"Format,omitempty"`
Width int `xml:"Width,omitempty"` Width int `xml:"Width,omitempty"`
Height int `xml:"Height,omitempty"` Height int `xml:"Height,omitempty"`
Size int `xml:"Size,omitempty"` Quality int `xml:"Quality,omitempty"`
Quality int `xml:"Quality,omitempty"` Ave string `xml:"Ave,omitempty"`
Orientation int `xml:"Orientation,omitempty"`
} }
type PicProcessObject struct { type PicProcessObject struct {
Key string `xml:"Key,omitempty"` Key string `xml:"Key,omitempty"`
@@ -55,6 +57,7 @@ type PicProcessObject struct {
Height int `xml:"Height,omitempty"` Height int `xml:"Height,omitempty"`
Size int `xml:"Size,omitempty"` Size int `xml:"Size,omitempty"`
Quality int `xml:"Quality,omitempty"` Quality int `xml:"Quality,omitempty"`
ETag string `xml:"ETag,omitempty"`
WatermarkStatus int `xml:"WatermarkStatus,omitempty"` WatermarkStatus int `xml:"WatermarkStatus,omitempty"`
} }

View File

@@ -261,10 +261,11 @@ type DocPreviewOptions struct {
func (s *CIService) DocPreview(ctx context.Context, name string, opt *DocPreviewOptions) (*Response, error) { func (s *CIService) DocPreview(ctx context.Context, name string, opt *DocPreviewOptions) (*Response, error) {
sendOpt := sendOptions{ sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL, baseURL: s.client.BaseURL.BucketURL,
uri: "/" + encodeURIComponent(name) + "?ci-process=doc-preview", uri: "/" + encodeURIComponent(name) + "?ci-process=doc-preview",
optQuery: opt, optQuery: opt,
method: http.MethodGet, method: http.MethodGet,
disableCloseBody: true,
} }
resp, err := s.client.send(ctx, &sendOpt) resp, err := s.client.send(ctx, &sendOpt)
return resp, err return resp, err

2
cos.go
View File

@@ -22,7 +22,7 @@ import (
const ( const (
// Version current go sdk version // Version current go sdk version
Version = "0.7.13" Version = "0.7.18"
userAgent = "cos-go-sdk-v5/" + Version userAgent = "cos-go-sdk-v5/" + Version
contentTypeXML = "application/xml" contentTypeXML = "application/xml"
defaultServiceBaseURL = "http://service.cos.myqcloud.com" defaultServiceBaseURL = "http://service.cos.myqcloud.com"

View File

@@ -882,6 +882,31 @@ func (s *CosTestSuite) TestReferer() {
assert.Equal(s.T(), opt.EmptyReferConfiguration, res.EmptyReferConfiguration, "GetReferer Failed") assert.Equal(s.T(), opt.EmptyReferConfiguration, res.EmptyReferConfiguration, "GetReferer Failed")
} }
func (s *CosTestSuite) TestAccelerate() {
opt := &cos.BucketPutAccelerateOptions{
Status: "Enabled",
Type: "COS",
}
_, err := s.Client.Bucket.PutAccelerate(context.Background(), opt)
assert.Nil(s.T(), err, "PutAccelerate Failed")
time.Sleep(time.Second)
res, _, err := s.Client.Bucket.GetAccelerate(context.Background())
assert.Nil(s.T(), err, "GetAccelerate Failed")
assert.Equal(s.T(), opt.Status, res.Status, "GetAccelerate Failed")
assert.Equal(s.T(), opt.Type, res.Type, "GetAccelerate Failed")
opt.Status = "Suspended"
_, err = s.Client.Bucket.PutAccelerate(context.Background(), opt)
assert.Nil(s.T(), err, "PutAccelerate Failed")
time.Sleep(time.Second)
res, _, err = s.Client.Bucket.GetAccelerate(context.Background())
assert.Nil(s.T(), err, "GetAccelerate Failed")
assert.Equal(s.T(), opt.Status, res.Status, "GetAccelerate Failed")
assert.Equal(s.T(), opt.Type, res.Type, "GetAccelerate Failed")
}
// End of api test // End of api test
// All methods that begin with "Test" are run as tests within a // All methods that begin with "Test" are run as tests within a

View File

@@ -0,0 +1,72 @@
package main
import (
"context"
"fmt"
"net/http"
"net/url"
"os"
"github.com/tencentyun/cos-go-sdk-v5"
"github.com/tencentyun/cos-go-sdk-v5/debug"
)
func log_status(err error) {
if err == nil {
return
}
if cos.IsNotFoundError(err) {
// WARN
fmt.Println("Resource is not existed")
} else if e, ok := cos.IsCOSError(err); ok {
fmt.Printf("Code: %v\n", e.Code)
fmt.Printf("Message: %v\n", e.Message)
fmt.Printf("Resource: %v\n", e.Resource)
fmt.Printf("RequestId: %v\n", e.RequestID)
// ERROR
} else {
fmt.Println(err)
// ERROR
}
}
func main() {
u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com")
b := &cos.BaseURL{
BucketURL: u,
}
c := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: os.Getenv("COS_SECRETID"),
SecretKey: os.Getenv("COS_SECRETKEY"),
Transport: &debug.DebugRequestTransport{
RequestHeader: true,
RequestBody: true,
ResponseHeader: true,
ResponseBody: true,
},
},
})
res, _, err := c.Bucket.GetAccelerate(context.Background())
log_status(err)
fmt.Printf("%+v\n", res)
opt := &cos.BucketPutAccelerateOptions{
Status: "Enabled",
Type: "COS",
}
_, err = c.Bucket.PutAccelerate(context.Background(), opt)
log_status(err)
res, _, err = c.Bucket.GetAccelerate(context.Background())
log_status(err)
fmt.Printf("%+v\n", res)
opt.Status = "Suspended"
_, err = c.Bucket.PutAccelerate(context.Background(), opt)
log_status(err)
res, _, err = c.Bucket.GetAccelerate(context.Background())
log_status(err)
fmt.Printf("%+v\n", res)
}

View File

@@ -0,0 +1,66 @@
package main
import (
"context"
"net/url"
"os"
"net/http"
"fmt"
"github.com/tencentyun/cos-go-sdk-v5"
"github.com/tencentyun/cos-go-sdk-v5/debug"
)
func log_status(err error) {
if err == nil {
return
}
if cos.IsNotFoundError(err) {
// WARN
fmt.Println("WARN: Resource is not existed")
} else if e, ok := cos.IsCOSError(err); ok {
fmt.Printf("ERROR: Code: %v\n", e.Code)
fmt.Printf("ERROR: Message: %v\n", e.Message)
fmt.Printf("ERROR: Resource: %v\n", e.Resource)
fmt.Printf("ERROR: RequestId: %v\n", e.RequestID)
// ERROR
} else {
fmt.Printf("ERROR: %v\n", err)
// ERROR
}
}
func main() {
u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com")
b := &cos.BaseURL{BucketURL: u}
c := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: os.Getenv("COS_SECRETID"),
SecretKey: os.Getenv("COS_SECRETKEY"),
Transport: &debug.DebugRequestTransport{
RequestHeader: true,
RequestBody: true,
ResponseHeader: true,
ResponseBody: true,
},
},
})
opt := &cos.MultiCopyOptions{
OptCopy: &cos.ObjectCopyOptions{
&cos.ObjectCopyHeaderOptions{
XCosStorageClass: "Archive",
},
nil,
},
ThreadPoolSize: 10,
}
source := "exampleobject"
soruceURL := fmt.Sprintf("%s/%s", u.Host, source)
dest := fmt.Sprintf("destobject")
res, _, err := c.Object.MultiCopy(context.Background(), dest, soruceURL, opt)
log_status(err)
fmt.Printf("res:%+v\n", res)
}

View File

@@ -79,10 +79,10 @@ func main() {
resp, err := c.Object.UploadPart( resp, err := c.Object.UploadPart(
context.Background(), name, uploadID, 1, fd, opt, context.Background(), name, uploadID, 1, fd, opt,
) )
log_status(err)
optcom.Parts = append(optcom.Parts, cos.Object{ optcom.Parts = append(optcom.Parts, cos.Object{
PartNumber: 1, ETag: resp.Header.Get("ETag"), PartNumber: 1, ETag: resp.Header.Get("ETag"),
}) })
log_status(err)
f := strings.NewReader("test heoo") f := strings.NewReader("test heoo")
resp, err = c.Object.UploadPart( resp, err = c.Object.UploadPart(

View File

@@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"crypto/md5" "crypto/md5"
"crypto/sha1" "crypto/sha1"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@@ -12,6 +13,9 @@ import (
"strings" "strings"
) )
// 单次上传文件最大为5GB
const singleUploadMaxLength = 5 * 1024 * 1024 * 1024
// 计算 md5 或 sha1 时的分块大小 // 计算 md5 或 sha1 时的分块大小
const calDigestBlockSize = 1024 * 1024 * 10 const calDigestBlockSize = 1024 * 1024 * 10
@@ -109,7 +113,7 @@ func decodeURIComponent(s string) (string, error) {
} }
func DecodeURIComponent(s string) (string, error) { func DecodeURIComponent(s string) (string, error) {
return DecodeURIComponent(s) return decodeURIComponent(s)
} }
func EncodeURIComponent(s string) string { func EncodeURIComponent(s string) string {
@@ -140,3 +144,41 @@ func GetReaderLen(reader io.Reader) (length int64, err error) {
} }
return return
} }
func CheckReaderLen(reader io.Reader) error {
nlen, err := GetReaderLen(reader)
if err != nil || nlen < singleUploadMaxLength {
return nil
}
return errors.New("The single object size you upload can not be larger than 5GB")
}
func CopyOptionsToMulti(opt *ObjectCopyOptions) *InitiateMultipartUploadOptions {
if opt == nil {
return nil
}
optini := &InitiateMultipartUploadOptions{
opt.ACLHeaderOptions,
&ObjectPutHeaderOptions{},
}
if opt.ObjectCopyHeaderOptions == nil {
return optini
}
optini.ObjectPutHeaderOptions = &ObjectPutHeaderOptions{
CacheControl: opt.ObjectCopyHeaderOptions.CacheControl,
ContentDisposition: opt.ObjectCopyHeaderOptions.ContentDisposition,
ContentEncoding: opt.ObjectCopyHeaderOptions.ContentEncoding,
ContentType: opt.ObjectCopyHeaderOptions.ContentType,
ContentLanguage: opt.ObjectCopyHeaderOptions.ContentLanguage,
Expect: opt.ObjectCopyHeaderOptions.Expect,
Expires: opt.ObjectCopyHeaderOptions.Expires,
XCosMetaXXX: opt.ObjectCopyHeaderOptions.XCosMetaXXX,
XCosStorageClass: opt.ObjectCopyHeaderOptions.XCosStorageClass,
XCosServerSideEncryption: opt.ObjectCopyHeaderOptions.XCosServerSideEncryption,
XCosSSECustomerAglo: opt.ObjectCopyHeaderOptions.XCosSSECustomerAglo,
XCosSSECustomerKey: opt.ObjectCopyHeaderOptions.XCosSSECustomerKey,
XCosSSECustomerKeyMD5: opt.ObjectCopyHeaderOptions.XCosSSECustomerKeyMD5,
XOptionHeader: opt.ObjectCopyHeaderOptions.XOptionHeader,
}
return optini
}

View File

@@ -127,7 +127,7 @@ func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, a
authTime = NewAuthTime(expired) authTime = NewAuthTime(expired)
} }
authorization := newAuthorization(ak, sk, req, authTime) authorization := newAuthorization(ak, sk, req, authTime)
sign := encodeURIComponent(authorization, []byte{'&','='}) sign := encodeURIComponent(authorization, []byte{'&', '='})
if req.URL.RawQuery == "" { if req.URL.RawQuery == "" {
req.URL.RawQuery = fmt.Sprintf("%s", sign) req.URL.RawQuery = fmt.Sprintf("%s", sign)
@@ -181,6 +181,9 @@ type ObjectPutOptions struct {
// //
// https://www.qcloud.com/document/product/436/7749 // https://www.qcloud.com/document/product/436/7749
func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) { func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
if err := CheckReaderLen(r); err != nil {
return nil, err
}
if opt != nil && opt.Listener != nil { if opt != nil && opt.Listener != nil {
totalBytes, err := GetReaderLen(r) totalBytes, err := GetReaderLen(r)
if err != nil { if err != nil {
@@ -255,6 +258,8 @@ type ObjectCopyResult struct {
XMLName xml.Name `xml:"CopyObjectResult"` XMLName xml.Name `xml:"CopyObjectResult"`
ETag string `xml:"ETag,omitempty"` ETag string `xml:"ETag,omitempty"`
LastModified string `xml:"LastModified,omitempty"` LastModified string `xml:"LastModified,omitempty"`
CRC64 string `xml:"CRC64,omitempty"`
VersionId string `xml:"VersionId,omitempty"`
} }
// Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G
@@ -558,12 +563,12 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
results <- &res results <- &res
} }
fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
// UploadPart do not support the chunk trsf, so need to add the content-length // UploadPart do not support the chunk trsf, so need to add the content-length
j.Opt.ContentLength = int(j.Chunk.Size) j.Opt.ContentLength = int(j.Chunk.Size)
rt := j.RetryTimes rt := j.RetryTimes
for { for {
fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number, resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
&io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt) &io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt)
res.PartNumber = j.Chunk.Number res.PartNumber = j.Chunk.Number
@@ -585,8 +590,8 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
} }
} }
func DividePart(fileSize int64) (int64, int64) { func DividePart(fileSize int64, last int) (int64, int64) {
partSize := int64(1 * 1024 * 1024) partSize := int64(last * 1024 * 1024)
partNum := fileSize / partSize partNum := fileSize / partSize
for partNum >= 10000 { for partNum >= 10000 {
partSize = partSize * 2 partSize = partSize * 2
@@ -618,7 +623,7 @@ func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int,
return 0, nil, 0, errors.New("Too many parts, out of 10000") return 0, nil, 0, errors.New("Too many parts, out of 10000")
} }
} else { } else {
partNum, partSize = DividePart(stat.Size()) partNum, partSize = DividePart(stat.Size(), 1)
} }
var chunks []Chunk var chunks []Chunk
@@ -802,28 +807,30 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
progressCallback(listener, event) progressCallback(listener, event)
// 4.Push jobs // 4.Push jobs
for _, chunk := range chunks { go func() {
if chunk.Done { for _, chunk := range chunks {
continue if chunk.Done {
continue
}
partOpt := &ObjectUploadPartOptions{}
if optini != nil && optini.ObjectPutHeaderOptions != nil {
partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
partOpt.XCosTrafficLimit = optini.XCosTrafficLimit
}
job := &Jobs{
Name: name,
RetryTimes: 3,
FilePath: filepath,
UploadId: uploadID,
Chunk: chunk,
Opt: partOpt,
}
chjobs <- job
} }
partOpt := &ObjectUploadPartOptions{} close(chjobs)
if optini != nil && optini.ObjectPutHeaderOptions != nil { }()
partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
partOpt.XCosTrafficLimit = optini.XCosTrafficLimit
}
job := &Jobs{
Name: name,
RetryTimes: 3,
FilePath: filepath,
UploadId: uploadID,
Chunk: chunk,
Opt: partOpt,
}
chjobs <- job
}
close(chjobs)
// 5.Recv the resp etag to complete // 5.Recv the resp etag to complete
for i := 0; i < partNum; i++ { for i := 0; i < partNum; i++ {
@@ -854,12 +861,16 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes) event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes)
progressCallback(listener, event) progressCallback(listener, event)
} }
close(chresults)
sort.Sort(ObjectList(optcom.Parts)) sort.Sort(ObjectList(optcom.Parts))
event = newProgressEvent(ProgressCompletedEvent, 0, consumedBytes, totalBytes) event = newProgressEvent(ProgressCompletedEvent, 0, consumedBytes, totalBytes)
progressCallback(listener, event) progressCallback(listener, event)
v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom) v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
if err != nil {
s.AbortMultipartUpload(ctx, name, uploadID)
}
return v, resp, err return v, resp, err
} }

View File

@@ -7,6 +7,9 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"net/url"
"sort"
"strings"
) )
// InitiateMultipartUploadOptions is the option of InitateMultipartUpload // InitiateMultipartUploadOptions is the option of InitateMultipartUpload
@@ -41,16 +44,17 @@ func (s *ObjectService) InitiateMultipartUpload(ctx context.Context, name string
// ObjectUploadPartOptions is the options of upload-part // ObjectUploadPartOptions is the options of upload-part
type ObjectUploadPartOptions struct { type ObjectUploadPartOptions struct {
Expect string `header:"Expect,omitempty" url:"-"` Expect string `header:"Expect,omitempty" url:"-"`
XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"` XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
ContentLength int `header:"Content-Length,omitempty" url:"-"` ContentLength int `header:"Content-Length,omitempty" url:"-"`
ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"` XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"` XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
// 上传进度, ProgressCompleteEvent不能表示对应API调用成功API是否调用成功的判断标准为返回err==nil // 上传进度, ProgressCompleteEvent不能表示对应API调用成功API是否调用成功的判断标准为返回err==nil
Listener ProgressListener `header:"-" url:"-" xml:"-"` Listener ProgressListener `header:"-" url:"-" xml:"-"`
} }
@@ -64,6 +68,9 @@ type ObjectUploadPartOptions struct {
// //
// https://www.qcloud.com/document/product/436/7750 // https://www.qcloud.com/document/product/436/7750
func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, partNumber int, r io.Reader, opt *ObjectUploadPartOptions) (*Response, error) { func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, partNumber int, r io.Reader, opt *ObjectUploadPartOptions) (*Response, error) {
if err := CheckReaderLen(r); err != nil {
return nil, err
}
if opt != nil && opt.Listener != nil { if opt != nil && opt.Listener != nil {
totalBytes, err := GetReaderLen(r) totalBytes, err := GetReaderLen(r)
if err != nil { if err != nil {
@@ -303,3 +310,206 @@ func (s *ObjectService) ListUploads(ctx context.Context, opt *ObjectListUploadsO
resp, err := s.client.send(ctx, sendOpt) resp, err := s.client.send(ctx, sendOpt)
return &res, resp, err return &res, resp, err
} }
type MultiCopyOptions struct {
OptCopy *ObjectCopyOptions
PartSize int64
ThreadPoolSize int
}
type CopyJobs struct {
Name string
UploadId string
RetryTimes int
Chunk Chunk
Opt *ObjectCopyPartOptions
}
type CopyResults struct {
PartNumber int
Resp *Response
err error
res *CopyPartResult
}
func copyworker(s *ObjectService, jobs <-chan *CopyJobs, results chan<- *CopyResults) {
for j := range jobs {
var copyres CopyResults
j.Opt.XCosCopySourceRange = fmt.Sprintf("bytes=%d-%d", j.Chunk.OffSet, j.Chunk.OffSet+j.Chunk.Size-1)
rt := j.RetryTimes
for {
res, resp, err := s.CopyPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number, j.Opt.XCosCopySource, j.Opt)
copyres.PartNumber = j.Chunk.Number
copyres.Resp = resp
copyres.err = err
copyres.res = res
if err != nil {
rt--
if rt == 0 {
results <- &copyres
break
}
continue
}
results <- &copyres
break
}
}
}
func (s *ObjectService) innerHead(ctx context.Context, sourceURL string, opt *ObjectHeadOptions, id []string) (resp *Response, err error) {
surl := strings.SplitN(sourceURL, "/", 2)
if len(surl) < 2 {
err = errors.New(fmt.Sprintf("sourceURL format error: %s", sourceURL))
return
}
u, err := url.Parse(fmt.Sprintf("https://%s", surl[0]))
if err != nil {
return
}
b := &BaseURL{BucketURL: u}
client := NewClient(b, &http.Client{
Transport: s.client.client.Transport,
})
if len(id) > 0 {
resp, err = client.Object.Head(ctx, surl[1], nil, id[0])
} else {
resp, err = client.Object.Head(ctx, surl[1], nil)
}
return
}
func SplitCopyFileIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, error) {
var partNum int64
if partSize > 0 {
partSize = partSize * 1024 * 1024
partNum = totalBytes / partSize
if partNum >= 10000 {
return nil, 0, errors.New("Too many parts, out of 10000")
}
} else {
partNum, partSize = DividePart(totalBytes, 64)
}
var chunks []Chunk
var chunk = Chunk{}
for i := int64(0); i < partNum; i++ {
chunk.Number = int(i + 1)
chunk.OffSet = i * partSize
chunk.Size = partSize
chunks = append(chunks, chunk)
}
if totalBytes%partSize > 0 {
chunk.Number = len(chunks) + 1
chunk.OffSet = int64(len(chunks)) * partSize
chunk.Size = totalBytes % partSize
chunks = append(chunks, chunk)
partNum++
}
return chunks, int(partNum), nil
}
func (s *ObjectService) MultiCopy(ctx context.Context, name string, sourceURL string, opt *MultiCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
resp, err := s.innerHead(ctx, sourceURL, nil, id)
if err != nil {
return nil, nil, err
}
totalBytes := resp.ContentLength
surl := strings.SplitN(sourceURL, "/", 2)
if len(surl) < 2 {
return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
}
var u string
if len(id) == 1 {
u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
} else if len(id) == 0 {
u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
} else {
return nil, nil, errors.New("wrong params")
}
if opt == nil {
opt = &MultiCopyOptions{}
}
chunks, partNum, err := SplitCopyFileIntoChunks(totalBytes, opt.PartSize)
if err != nil {
return nil, nil, err
}
if partNum == 0 || totalBytes < singleUploadMaxLength {
if len(id) > 0 {
return s.Copy(ctx, name, sourceURL, opt.OptCopy, id[0])
} else {
return s.Copy(ctx, name, sourceURL, opt.OptCopy)
}
}
optini := CopyOptionsToMulti(opt.OptCopy)
var uploadID string
res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
if err != nil {
return nil, nil, err
}
uploadID = res.UploadID
var poolSize int
if opt.ThreadPoolSize > 0 {
poolSize = opt.ThreadPoolSize
} else {
poolSize = 1
}
chjobs := make(chan *CopyJobs, 100)
chresults := make(chan *CopyResults, 10000)
optcom := &CompleteMultipartUploadOptions{}
for w := 1; w <= poolSize; w++ {
go copyworker(s, chjobs, chresults)
}
go func() {
for _, chunk := range chunks {
partOpt := &ObjectCopyPartOptions{
XCosCopySource: u,
}
job := &CopyJobs{
Name: name,
RetryTimes: 3,
UploadId: uploadID,
Chunk: chunk,
Opt: partOpt,
}
chjobs <- job
}
close(chjobs)
}()
err = nil
for i := 0; i < partNum; i++ {
res := <-chresults
if res.res == nil || res.err != nil {
err = fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
break
}
etag := res.res.ETag
optcom.Parts = append(optcom.Parts, Object{
PartNumber: res.PartNumber, ETag: etag},
)
}
close(chresults)
if err != nil {
return nil, nil, err
}
sort.Sort(ObjectList(optcom.Parts))
v, resp, err := s.CompleteMultipartUpload(ctx, name, uploadID, optcom)
if err != nil {
s.AbortMultipartUpload(ctx, name, uploadID)
}
cpres := &ObjectCopyResult{
ETag: v.ETag,
CRC64: resp.Header.Get("x-cos-hash-crc64ecma"),
VersionId: resp.Header.Get("x-cos-version-id"),
}
return cpres, resp, err
}

View File

@@ -97,6 +97,10 @@ func (r *teeReader) Close() error {
return nil return nil
} }
func (r *teeReader) Size() int64 {
return r.totalBytes
}
func TeeReader(reader io.Reader, writer io.Writer, total int64, listener ProgressListener) *teeReader { func TeeReader(reader io.Reader, writer io.Writer, total int64, listener ProgressListener) *teeReader {
return &teeReader{ return &teeReader{
reader: reader, reader: reader,