diff --git a/ci.go b/ci.go index 75ffdf4..f2bc6b1 100644 --- a/ci.go +++ b/ci.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "encoding/xml" + "hash/crc64" "io" "net/http" "os" @@ -209,12 +210,16 @@ func (s *CIService) Put(ctx context.Context, name string, r io.Reader, opt *Obje if err := CheckReaderLen(r); err != nil { return nil, nil, err } + totalBytes, err := GetReaderLen(r) + if err != nil && opt != nil && opt.Listener != nil { + return nil, nil, err + } + reader := TeeReader(r, nil, totalBytes, nil) + if s.client.Conf.EnableCRC { + reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA)) + } if opt != nil && opt.Listener != nil { - totalBytes, err := GetReaderLen(r) - if err != nil { - return nil, nil, err - } - r = TeeReader(r, nil, totalBytes, opt.Listener) + reader.listener = opt.Listener } var res ImageProcessResult @@ -222,7 +227,7 @@ func (s *CIService) Put(ctx context.Context, name string, r io.Reader, opt *Obje baseURL: s.client.BaseURL.BucketURL, uri: "/" + encodeURIComponent(name), method: http.MethodPut, - body: r, + body: reader, optHeader: opt, result: &res, } diff --git a/cos.go b/cos.go index e455050..7969c05 100644 --- a/cos.go +++ b/cos.go @@ -22,7 +22,7 @@ import ( const ( // Version current go sdk version - Version = "0.7.22" + Version = "0.7.23" userAgent = "cos-go-sdk-v5/" + Version contentTypeXML = "application/xml" defaultServiceBaseURL = "http://service.cos.myqcloud.com" @@ -217,6 +217,18 @@ func (c *Client) doAPI(ctx context.Context, req *http.Request, result interface{ return response, err } + // need CRC64 verification + if reader, ok := req.Body.(*teeReader); ok { + if c.Conf.EnableCRC && reader.writer != nil { + localcrc := reader.Crc64() + scoscrc := response.Header.Get("x-cos-hash-crc64ecma") + icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64) + if icoscrc != localcrc { + return response, fmt.Errorf("verification failed, want:%v, return:%v", localcrc, icoscrc) + } + } + } + if result != nil { if w, ok := result.(io.Writer); ok { io.Copy(w, resp.Body) diff --git a/costesting/ci_test.go b/costesting/ci_test.go index 1793dc4..5672537 100644 --- a/costesting/ci_test.go +++ b/costesting/ci_test.go @@ -476,6 +476,37 @@ func (s *CosTestSuite) TestPutGetDeleteObjectByFile_10MB() { assert.Nil(s.T(), err, "remove local file Failed") } +func (s *CosTestSuite) TestPutGetDeleteObjectByUpload_10MB() { + // Create tmp file + filePath := "tmpfile" + time.Now().Format(time.RFC3339) + newfile, err := os.Create(filePath) + assert.Nil(s.T(), err, "create tmp file Failed") + defer newfile.Close() + + name := "test/objectUpload" + time.Now().Format(time.RFC3339) + b := make([]byte, 1024*1024*10) + _, err = rand.Read(b) + + newfile.Write(b) + opt := &cos.MultiUploadOptions{ + PartSize: 1, + ThreadPoolSize: 3, + } + _, _, err = s.Client.Object.Upload(context.Background(), name, filePath, opt) + assert.Nil(s.T(), err, "PutObject Failed") + + // Over write tmp file + _, err = s.Client.Object.GetToFile(context.Background(), name, filePath, nil) + assert.Nil(s.T(), err, "HeadObject Failed") + + _, err = s.Client.Object.Delete(context.Background(), name) + assert.Nil(s.T(), err, "DeleteObject Failed") + + // remove the local tmp file + err = os.Remove(filePath) + assert.Nil(s.T(), err, "remove local file Failed") +} + func (s *CosTestSuite) TestPutGetDeleteObjectSpecialName() { f := strings.NewReader("test") name := s.SepFileName + time.Now().Format(time.RFC3339) diff --git a/helper.go b/helper.go index bccacb8..521355d 100644 --- a/helper.go +++ b/helper.go @@ -149,6 +149,8 @@ func GetReaderLen(reader io.Reader) (length int64, err error) { } case *io.LimitedReader: length = int64(v.N) + case *LimitedReadCloser: + length = int64(v.N) case FixedLengthReader: length = v.Size() default: @@ -194,3 +196,29 @@ func CopyOptionsToMulti(opt *ObjectCopyOptions) *InitiateMultipartUploadOptions } return optini } + +// 浅拷贝ObjectPutOptions +func cloneObjectPutOptions(opt *ObjectPutOptions) *ObjectPutOptions { + res := &ObjectPutOptions{ + &ACLHeaderOptions{}, + &ObjectPutHeaderOptions{}, + } + if opt != nil { + if opt.ACLHeaderOptions != nil { + *res.ACLHeaderOptions = *opt.ACLHeaderOptions + } + if opt.ObjectPutHeaderOptions != nil { + *res.ObjectPutHeaderOptions = *opt.ObjectPutHeaderOptions + } + } + return res +} + +// 浅拷贝ObjectUploadPartOptions +func cloneObjectUploadPartOptions(opt *ObjectUploadPartOptions) *ObjectUploadPartOptions { + var res ObjectUploadPartOptions + if opt != nil { + res = *opt + } + return &res +} diff --git a/object.go b/object.go index 5f6251a..459e13b 100644 --- a/object.go +++ b/object.go @@ -6,6 +6,7 @@ import ( "encoding/xml" "errors" "fmt" + "hash/crc64" "io" "io/ioutil" "net/http" @@ -147,7 +148,7 @@ type ObjectPutHeaderOptions struct { ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"` ContentType string `header:"Content-Type,omitempty" url:"-"` ContentMD5 string `header:"Content-MD5,omitempty" url:"-"` - ContentLength int `header:"Content-Length,omitempty" url:"-"` + ContentLength int64 `header:"Content-Length,omitempty" url:"-"` ContentLanguage string `header:"Content-Language,omitempty" url:"-"` Expect string `header:"Expect,omitempty" url:"-"` Expires string `header:"Expires,omitempty" url:"-"` @@ -179,26 +180,27 @@ type ObjectPutOptions struct { // Put Object请求可以将一个文件(Oject)上传至指定Bucket。 // -// 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength -// // https://www.qcloud.com/document/product/436/7749 func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) { if err := CheckReaderLen(r); err != nil { return nil, err } + totalBytes, err := GetReaderLen(r) + if err != nil && opt != nil && opt.Listener != nil { + return nil, err + } + reader := TeeReader(r, nil, totalBytes, nil) + if s.client.Conf.EnableCRC { + reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA)) + } if opt != nil && opt.Listener != nil { - totalBytes, err := GetReaderLen(r) - if err != nil { - return nil, err - } - r = TeeReader(r, nil, totalBytes, opt.Listener) + reader.listener = opt.Listener } - sendOpt := sendOptions{ baseURL: s.client.BaseURL.BucketURL, uri: "/" + encodeURIComponent(name), method: http.MethodPut, - body: r, + body: reader, optHeader: opt, } resp, err := s.client.send(ctx, &sendOpt) @@ -556,38 +558,54 @@ type Results struct { err error } +func LimitReadCloser(r io.Reader, n int64) io.Reader { + var lc LimitedReadCloser + lc.R = r + lc.N = n + return &lc +} + +type LimitedReadCloser struct { + io.LimitedReader +} + +func (lc *LimitedReadCloser) Close() error { + if r, ok := lc.R.(io.ReadCloser); ok { + return r.Close() + } + return nil +} + func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { for j := range jobs { - fd, err := os.Open(j.FilePath) - var res Results - if err != nil { - res.err = err - res.PartNumber = j.Chunk.Number - res.Resp = nil - results <- &res - } - - // UploadPart do not support the chunk trsf, so need to add the content-length - j.Opt.ContentLength = int(j.Chunk.Size) + j.Opt.ContentLength = j.Chunk.Size rt := j.RetryTimes for { + // http.Request.Body can be Closed in request + fd, err := os.Open(j.FilePath) + var res Results + if err != nil { + res.err = err + res.PartNumber = j.Chunk.Number + res.Resp = nil + results <- &res + break + } fd.Seek(j.Chunk.OffSet, os.SEEK_SET) resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number, - &io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt) + LimitReadCloser(fd, j.Chunk.Size), j.Opt) res.PartNumber = j.Chunk.Number res.Resp = resp res.err = err if err != nil { rt-- if rt == 0 { - fd.Close() results <- &res break } continue } - fd.Close() results <- &res break } diff --git a/object_part.go b/object_part.go index 77a0be7..63d9b50 100644 --- a/object_part.go +++ b/object_part.go @@ -5,6 +5,7 @@ import ( "encoding/xml" "errors" "fmt" + "hash/crc64" "io" "net/http" "net/url" @@ -47,7 +48,7 @@ func (s *ObjectService) InitiateMultipartUpload(ctx context.Context, name string type ObjectUploadPartOptions struct { Expect string `header:"Expect,omitempty" url:"-"` XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"` - ContentLength int `header:"Content-Length,omitempty" url:"-"` + ContentLength int64 `header:"Content-Length,omitempty" url:"-"` ContentMD5 string `header:"Content-MD5,omitempty" url:"-"` XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"` XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"` @@ -68,16 +69,28 @@ type ObjectUploadPartOptions struct { // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ContentLength // // https://www.qcloud.com/document/product/436/7750 -func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, partNumber int, r io.Reader, opt *ObjectUploadPartOptions) (*Response, error) { +func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, partNumber int, r io.Reader, uopt *ObjectUploadPartOptions) (*Response, error) { if err := CheckReaderLen(r); err != nil { return nil, err } - if opt != nil && opt.Listener != nil { - totalBytes, err := GetReaderLen(r) - if err != nil { - return nil, err + // opt 不为 nil + opt := cloneObjectUploadPartOptions(uopt) + totalBytes, err := GetReaderLen(r) + if err != nil && opt.Listener != nil { + return nil, err + } + // 分块上传不支持 Chunk 上传 + if err == nil { + if opt != nil && opt.ContentLength == 0 { + opt.ContentLength = totalBytes } - r = TeeReader(r, nil, totalBytes, opt.Listener) + } + reader := TeeReader(r, nil, totalBytes, nil) + if s.client.Conf.EnableCRC { + reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA)) + } + if opt != nil && opt.Listener != nil { + reader.listener = opt.Listener } u := fmt.Sprintf("/%s?partNumber=%d&uploadId=%s", encodeURIComponent(name), partNumber, uploadID) sendOpt := sendOptions{ @@ -85,7 +98,7 @@ func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, p uri: u, method: http.MethodPut, optHeader: opt, - body: r, + body: reader, } resp, err := s.client.send(ctx, &sendOpt) return resp, err diff --git a/object_part_test.go b/object_part_test.go index 59ee607..03b1225 100644 --- a/object_part_test.go +++ b/object_part_test.go @@ -5,9 +5,11 @@ import ( "context" "encoding/xml" "fmt" + "hash/crc64" "io/ioutil" "net/http" "reflect" + "strconv" "testing" ) @@ -102,11 +104,14 @@ func TestObjectService_UploadPart(t *testing.T) { testFormValues(t, r, vs) b, _ := ioutil.ReadAll(r.Body) + tb := crc64.MakeTable(crc64.ECMA) + crc := crc64.Update(0, tb, b) v := string(b) want := "hello" if !reflect.DeepEqual(v, want) { t.Errorf("Object.UploadPart request body: %#v, want %#v", v, want) } + w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(crc, 10)) }) r := bytes.NewReader([]byte("hello")) diff --git a/object_test.go b/object_test.go index cce6677..5eee886 100644 --- a/object_test.go +++ b/object_test.go @@ -3,12 +3,19 @@ package cos import ( "bytes" "context" + "crypto/rand" + "encoding/hex" "encoding/xml" "fmt" + "hash/crc64" "io/ioutil" "net/http" + "net/url" + "os" "reflect" + "strconv" "testing" + "time" ) func TestObjectService_Get(t *testing.T) { @@ -59,25 +66,110 @@ func TestObjectService_Put(t *testing.T) { } name := "test/hello.txt" + retry := 0 + final := 10 mux.HandleFunc("/test/hello.txt", func(w http.ResponseWriter, r *http.Request) { testMethod(t, r, http.MethodPut) testHeader(t, r, "x-cos-acl", "private") testHeader(t, r, "Content-Type", "text/html") - b, _ := ioutil.ReadAll(r.Body) - v := string(b) - want := "hello" - if !reflect.DeepEqual(v, want) { - t.Errorf("Object.Put request body: %#v, want %#v", v, want) + if retry%2 == 0 { + b, _ := ioutil.ReadAll(r.Body) + tb := crc64.MakeTable(crc64.ECMA) + crc := crc64.Update(0, tb, b) + v := string(b) + want := "hello" + if !reflect.DeepEqual(v, want) { + t.Errorf("Object.Put request body: %#v, want %#v", v, want) + } + realcrc := crc64.Update(0, tb, []byte("hello")) + if !reflect.DeepEqual(crc, realcrc) { + t.Errorf("Object.Put crc: %v, want: %v", crc, realcrc) + } + w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(crc, 10)) + if retry != final { + w.WriteHeader(http.StatusGatewayTimeout) + } + } else { + w.Header().Add("x-cos-hash-crc64ecma", "123456789") } }) - r := bytes.NewReader([]byte("hello")) - _, err := client.Object.Put(context.Background(), name, r, opt) + for retry <= final { + r := bytes.NewReader([]byte("hello")) + _, err := client.Object.Put(context.Background(), name, r, opt) + if retry < final && err == nil { + t.Fatalf("Error must not nil when retry < final") + } + if retry == final && err != nil { + t.Fatalf("Put Error: %v", err) + } + retry++ + } +} + +func TestObjectService_PutFromFile(t *testing.T) { + setup() + defer teardown() + + filePath := "tmpfile" + time.Now().Format(time.RFC3339) + newfile, err := os.Create(filePath) if err != nil { - t.Fatalf("Object.Put returned error: %v", err) + t.Fatalf("create tmp file failed") } + defer os.Remove(filePath) + // 源文件内容 + b := make([]byte, 1024*1024*3) + _, err = rand.Read(b) + newfile.Write(b) + newfile.Close() + + tb := crc64.MakeTable(crc64.ECMA) + realcrc := crc64.Update(0, tb, b) + opt := &ObjectPutOptions{ + ObjectPutHeaderOptions: &ObjectPutHeaderOptions{ + ContentType: "text/html", + }, + ACLHeaderOptions: &ACLHeaderOptions{ + XCosACL: "private", + }, + } + name := "test/hello.txt" + retry := 0 + final := 4 + mux.HandleFunc("/test/hello.txt", func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodPut) + testHeader(t, r, "x-cos-acl", "private") + testHeader(t, r, "Content-Type", "text/html") + if retry%2 == 0 { + bs, _ := ioutil.ReadAll(r.Body) + crc := crc64.Update(0, tb, bs) + if !reflect.DeepEqual(bs, b) { + t.Errorf("Object.Put request body Error") + } + if !reflect.DeepEqual(crc, realcrc) { + t.Errorf("Object.Put crc: %v, want: %v", crc, realcrc) + } + w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(crc, 10)) + if retry != final { + w.WriteHeader(http.StatusGatewayTimeout) + } + } else { + w.Header().Add("x-cos-hash-crc64ecma", "123456789") + } + }) + + for retry <= final { + _, err := client.Object.PutFromFile(context.Background(), name, filePath, opt) + if retry < final && err == nil { + t.Fatalf("Error must not nil when retry < final") + } + if retry == final && err != nil { + t.Fatalf("Put Error: %v", err) + } + retry++ + } } func TestObjectService_Delete(t *testing.T) { @@ -114,7 +206,6 @@ func TestObjectService_Head(t *testing.T) { if err != nil { t.Fatalf("Object.Head returned error: %v", err) } - } func TestObjectService_Options(t *testing.T) { @@ -272,3 +363,90 @@ func TestObjectService_Copy(t *testing.T) { t.Errorf("Object.Copy returned %+v, want %+v", ref, want) } } + +func TestObjectService_Upload(t *testing.T) { + setup() + defer teardown() + + filePath := "tmpfile" + time.Now().Format(time.RFC3339) + newfile, err := os.Create(filePath) + if err != nil { + t.Fatalf("create tmp file failed") + } + defer os.Remove(filePath) + // 源文件内容 + b := make([]byte, 1024*1024*10) + _, err = rand.Read(b) + newfile.Write(b) + newfile.Close() + + // 已上传内容, 10个分块 + rb := make([][]byte, 10) + uploadid := "test-cos-multiupload-uploadid" + partmap := make(map[int64]int) + mux.HandleFunc("/test.go.upload", func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPut { // 分块上传 + r.ParseForm() + part, _ := strconv.ParseInt(r.Form.Get("partNumber"), 10, 64) + if partmap[part] == 0 { + // 重试检验1 + partmap[part]++ + ioutil.ReadAll(r.Body) + w.WriteHeader(http.StatusGatewayTimeout) + } else if partmap[part] == 1 { + // 重试校验2 + partmap[part]++ + w.Header().Add("x-cos-hash-crc64ecma", "123456789") + } else { // 正确上传 + bs, _ := ioutil.ReadAll(r.Body) + rb[part-1] = bs + md := hex.EncodeToString(calMD5Digest(bs)) + crc := crc64.Update(0, crc64.MakeTable(crc64.ECMA), bs) + w.Header().Add("ETag", md) + w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(crc, 10)) + } + } else { + testMethod(t, r, http.MethodPost) + initreq := url.Values{} + initreq.Set("uploads", "") + compreq := url.Values{} + compreq.Set("uploadId", uploadid) + r.ParseForm() + if reflect.DeepEqual(r.Form, initreq) { + // 初始化分块上传 + fmt.Fprintf(w, ` + + %v + %v + `, "test.go.upload", uploadid) + } else if reflect.DeepEqual(r.Form, compreq) { + // 完成分块上传 + tb := crc64.MakeTable(crc64.ECMA) + crc := uint64(0) + ccv := make([]uint64, 10) + for i, v := range rb { + ccv[i] = crc64.Update(0, crc64.MakeTable(crc64.ECMA), v) + crc = crc64.Update(crc, tb, v) + } + w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(crc, 10)) + fmt.Fprintf(w, ` + /test.go.upload + + test.go.upload + "%v" + `, hex.EncodeToString(calMD5Digest(b))) + } else { + t.Errorf("TestObjectService_Upload Unknown Request") + } + } + }) + + opt := &MultiUploadOptions{ + ThreadPoolSize: 3, + PartSize: 1, + } + _, _, err = client.Object.Upload(context.Background(), "test.go.upload", filePath, opt) + if err != nil { + t.Fatalf("Object.Upload returned error: %v", err) + } +} diff --git a/progress.go b/progress.go index 4c1ba4a..e60b95a 100644 --- a/progress.go +++ b/progress.go @@ -2,6 +2,7 @@ package cos import ( "fmt" + "hash" "io" ) @@ -101,6 +102,15 @@ func (r *teeReader) Size() int64 { return r.totalBytes } +func (r *teeReader) Crc64() uint64 { + if r.writer != nil { + if th, ok := r.writer.(hash.Hash64); ok { + return th.Sum64() + } + } + return 0 +} + func TeeReader(reader io.Reader, writer io.Writer, total int64, listener ProgressListener) *teeReader { return &teeReader{ reader: reader,