Browse Source

Merge pull request #113 from agin719/cos-v4-dev

latest 0.7.23 stable
master
agin719 4 years ago
committed by GitHub
parent
commit
95ac8f1e3b
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      ci.go
  2. 14
      cos.go
  3. 31
      costesting/ci_test.go
  4. 42
      helper.go
  5. 75
      object.go
  6. 30
      object_part.go
  7. 5
      object_part_test.go
  8. 196
      object_test.go
  9. 10
      progress.go

26
ci.go

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"encoding/xml"
"hash/crc64"
"io"
"net/http"
"os"
@ -205,16 +206,27 @@ func (s *CIService) GetVideoAuditingJob(ctx context.Context, jobid string) (*Get
}
// ci put https://cloud.tencent.com/document/product/460/18147
func (s *CIService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*ImageProcessResult, *Response, error) {
func (s *CIService) Put(ctx context.Context, name string, r io.Reader, uopt *ObjectPutOptions) (*ImageProcessResult, *Response, error) {
if err := CheckReaderLen(r); err != nil {
return nil, nil, err
}
if opt != nil && opt.Listener != nil {
totalBytes, err := GetReaderLen(r)
if err != nil {
return nil, nil, err
opt := cloneObjectPutOptions(uopt)
totalBytes, err := GetReaderLen(r)
if err != nil && opt != nil && opt.Listener != nil {
return nil, nil, err
}
if err == nil {
// 与 go http 保持一致, 非bytes.Buffer/bytes.Reader/strings.Reader由用户指定ContentLength, 或使用 Chunk 上传
if opt != nil && opt.ContentLength == 0 && IsLenReader(r) {
opt.ContentLength = totalBytes
}
r = TeeReader(r, nil, totalBytes, opt.Listener)
}
reader := TeeReader(r, nil, totalBytes, nil)
if s.client.Conf.EnableCRC {
reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA))
}
if opt != nil && opt.Listener != nil {
reader.listener = opt.Listener
}
var res ImageProcessResult
@ -222,7 +234,7 @@ func (s *CIService) Put(ctx context.Context, name string, r io.Reader, opt *Obje
baseURL: s.client.BaseURL.BucketURL,
uri: "/" + encodeURIComponent(name),
method: http.MethodPut,
body: r,
body: reader,
optHeader: opt,
result: &res,
}

14
cos.go

@ -22,7 +22,7 @@ import (
const (
// Version current go sdk version
Version = "0.7.22"
Version = "0.7.23"
userAgent = "cos-go-sdk-v5/" + Version
contentTypeXML = "application/xml"
defaultServiceBaseURL = "http://service.cos.myqcloud.com"
@ -217,6 +217,18 @@ func (c *Client) doAPI(ctx context.Context, req *http.Request, result interface{
return response, err
}
// need CRC64 verification
if reader, ok := req.Body.(*teeReader); ok {
if c.Conf.EnableCRC && reader.writer != nil {
localcrc := reader.Crc64()
scoscrc := response.Header.Get("x-cos-hash-crc64ecma")
icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64)
if icoscrc != localcrc {
return response, fmt.Errorf("verification failed, want:%v, return:%v", localcrc, icoscrc)
}
}
}
if result != nil {
if w, ok := result.(io.Writer); ok {
io.Copy(w, resp.Body)

31
costesting/ci_test.go

@ -476,6 +476,37 @@ func (s *CosTestSuite) TestPutGetDeleteObjectByFile_10MB() {
assert.Nil(s.T(), err, "remove local file Failed")
}
func (s *CosTestSuite) TestPutGetDeleteObjectByUpload_10MB() {
// Create tmp file
filePath := "tmpfile" + time.Now().Format(time.RFC3339)
newfile, err := os.Create(filePath)
assert.Nil(s.T(), err, "create tmp file Failed")
defer newfile.Close()
name := "test/objectUpload" + time.Now().Format(time.RFC3339)
b := make([]byte, 1024*1024*10)
_, err = rand.Read(b)
newfile.Write(b)
opt := &cos.MultiUploadOptions{
PartSize: 1,
ThreadPoolSize: 3,
}
_, _, err = s.Client.Object.Upload(context.Background(), name, filePath, opt)
assert.Nil(s.T(), err, "PutObject Failed")
// Over write tmp file
_, err = s.Client.Object.GetToFile(context.Background(), name, filePath, nil)
assert.Nil(s.T(), err, "HeadObject Failed")
_, err = s.Client.Object.Delete(context.Background(), name)
assert.Nil(s.T(), err, "DeleteObject Failed")
// remove the local tmp file
err = os.Remove(filePath)
assert.Nil(s.T(), err, "remove local file Failed")
}
func (s *CosTestSuite) TestPutGetDeleteObjectSpecialName() {
f := strings.NewReader("test")
name := s.SepFileName + time.Now().Format(time.RFC3339)

42
helper.go

@ -149,6 +149,8 @@ func GetReaderLen(reader io.Reader) (length int64, err error) {
}
case *io.LimitedReader:
length = int64(v.N)
case *LimitedReadCloser:
length = int64(v.N)
case FixedLengthReader:
length = v.Size()
default:
@ -157,6 +159,20 @@ func GetReaderLen(reader io.Reader) (length int64, err error) {
return
}
func IsLenReader(reader io.Reader) bool {
switch reader.(type) {
case *bytes.Buffer:
return true
case *bytes.Reader:
return true
case *strings.Reader:
return true
default:
return false
}
return false
}
func CheckReaderLen(reader io.Reader) error {
nlen, err := GetReaderLen(reader)
if err != nil || nlen < singleUploadMaxLength {
@ -194,3 +210,29 @@ func CopyOptionsToMulti(opt *ObjectCopyOptions) *InitiateMultipartUploadOptions
}
return optini
}
// 浅拷贝ObjectPutOptions
func cloneObjectPutOptions(opt *ObjectPutOptions) *ObjectPutOptions {
res := &ObjectPutOptions{
&ACLHeaderOptions{},
&ObjectPutHeaderOptions{},
}
if opt != nil {
if opt.ACLHeaderOptions != nil {
*res.ACLHeaderOptions = *opt.ACLHeaderOptions
}
if opt.ObjectPutHeaderOptions != nil {
*res.ObjectPutHeaderOptions = *opt.ObjectPutHeaderOptions
}
}
return res
}
// 浅拷贝ObjectUploadPartOptions
func cloneObjectUploadPartOptions(opt *ObjectUploadPartOptions) *ObjectUploadPartOptions {
var res ObjectUploadPartOptions
if opt != nil {
res = *opt
}
return &res
}

75
object.go

@ -6,6 +6,7 @@ import (
"encoding/xml"
"errors"
"fmt"
"hash/crc64"
"io"
"io/ioutil"
"net/http"
@ -147,7 +148,7 @@ type ObjectPutHeaderOptions struct {
ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
ContentType string `header:"Content-Type,omitempty" url:"-"`
ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
ContentLength int `header:"Content-Length,omitempty" url:"-"`
ContentLength int64 `header:"Content-Length,omitempty" url:"-"`
ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
Expect string `header:"Expect,omitempty" url:"-"`
Expires string `header:"Expires,omitempty" url:"-"`
@ -179,26 +180,34 @@ type ObjectPutOptions struct {
// Put Object请求可以将一个文件(Oject)上传至指定Bucket。
//
// 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
//
// https://www.qcloud.com/document/product/436/7749
func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, uopt *ObjectPutOptions) (*Response, error) {
if err := CheckReaderLen(r); err != nil {
return nil, err
}
if opt != nil && opt.Listener != nil {
totalBytes, err := GetReaderLen(r)
if err != nil {
return nil, err
opt := cloneObjectPutOptions(uopt)
totalBytes, err := GetReaderLen(r)
if err != nil && opt != nil && opt.Listener != nil {
return nil, err
}
if err == nil {
// 与 go http 保持一致, 非bytes.Buffer/bytes.Reader/strings.Reader由用户指定ContentLength, 或使用 Chunk 上传
if opt != nil && opt.ContentLength == 0 && IsLenReader(r) {
opt.ContentLength = totalBytes
}
r = TeeReader(r, nil, totalBytes, opt.Listener)
}
reader := TeeReader(r, nil, totalBytes, nil)
if s.client.Conf.EnableCRC {
reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA))
}
if opt != nil && opt.Listener != nil {
reader.listener = opt.Listener
}
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: "/" + encodeURIComponent(name),
method: http.MethodPut,
body: r,
body: reader,
optHeader: opt,
}
resp, err := s.client.send(ctx, &sendOpt)
@ -556,38 +565,54 @@ type Results struct {
err error
}
func LimitReadCloser(r io.Reader, n int64) io.Reader {
var lc LimitedReadCloser
lc.R = r
lc.N = n
return &lc
}
type LimitedReadCloser struct {
io.LimitedReader
}
func (lc *LimitedReadCloser) Close() error {
if r, ok := lc.R.(io.ReadCloser); ok {
return r.Close()
}
return nil
}
func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
for j := range jobs {
fd, err := os.Open(j.FilePath)
var res Results
if err != nil {
res.err = err
res.PartNumber = j.Chunk.Number
res.Resp = nil
results <- &res
}
// UploadPart do not support the chunk trsf, so need to add the content-length
j.Opt.ContentLength = int(j.Chunk.Size)
j.Opt.ContentLength = j.Chunk.Size
rt := j.RetryTimes
for {
// http.Request.Body can be Closed in request
fd, err := os.Open(j.FilePath)
var res Results
if err != nil {
res.err = err
res.PartNumber = j.Chunk.Number
res.Resp = nil
results <- &res
break
}
fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
&io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt)
LimitReadCloser(fd, j.Chunk.Size), j.Opt)
res.PartNumber = j.Chunk.Number
res.Resp = resp
res.err = err
if err != nil {
rt--
if rt == 0 {
fd.Close()
results <- &res
break
}
continue
}
fd.Close()
results <- &res
break
}

30
object_part.go

@ -5,6 +5,7 @@ import (
"encoding/xml"
"errors"
"fmt"
"hash/crc64"
"io"
"net/http"
"net/url"
@ -47,7 +48,7 @@ func (s *ObjectService) InitiateMultipartUpload(ctx context.Context, name string
type ObjectUploadPartOptions struct {
Expect string `header:"Expect,omitempty" url:"-"`
XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
ContentLength int `header:"Content-Length,omitempty" url:"-"`
ContentLength int64 `header:"Content-Length,omitempty" url:"-"`
ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
@ -68,16 +69,29 @@ type ObjectUploadPartOptions struct {
// 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ContentLength
//
// https://www.qcloud.com/document/product/436/7750
func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, partNumber int, r io.Reader, opt *ObjectUploadPartOptions) (*Response, error) {
func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, partNumber int, r io.Reader, uopt *ObjectUploadPartOptions) (*Response, error) {
if err := CheckReaderLen(r); err != nil {
return nil, err
}
if opt != nil && opt.Listener != nil {
totalBytes, err := GetReaderLen(r)
if err != nil {
return nil, err
// opt 不为 nil
opt := cloneObjectUploadPartOptions(uopt)
totalBytes, err := GetReaderLen(r)
if err != nil && opt.Listener != nil {
return nil, err
}
// 分块上传不支持 Chunk 上传
if err == nil {
// 与 go http 保持一致, 非bytes.Buffer/bytes.Reader/strings.Reader需用户指定ContentLength
if opt != nil && opt.ContentLength == 0 && IsLenReader(r) {
opt.ContentLength = totalBytes
}
r = TeeReader(r, nil, totalBytes, opt.Listener)
}
reader := TeeReader(r, nil, totalBytes, nil)
if s.client.Conf.EnableCRC {
reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA))
}
if opt != nil && opt.Listener != nil {
reader.listener = opt.Listener
}
u := fmt.Sprintf("/%s?partNumber=%d&uploadId=%s", encodeURIComponent(name), partNumber, uploadID)
sendOpt := sendOptions{
@ -85,7 +99,7 @@ func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, p
uri: u,
method: http.MethodPut,
optHeader: opt,
body: r,
body: reader,
}
resp, err := s.client.send(ctx, &sendOpt)
return resp, err

5
object_part_test.go

@ -5,9 +5,11 @@ import (
"context"
"encoding/xml"
"fmt"
"hash/crc64"
"io/ioutil"
"net/http"
"reflect"
"strconv"
"testing"
)
@ -102,11 +104,14 @@ func TestObjectService_UploadPart(t *testing.T) {
testFormValues(t, r, vs)
b, _ := ioutil.ReadAll(r.Body)
tb := crc64.MakeTable(crc64.ECMA)
crc := crc64.Update(0, tb, b)
v := string(b)
want := "hello"
if !reflect.DeepEqual(v, want) {
t.Errorf("Object.UploadPart request body: %#v, want %#v", v, want)
}
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(crc, 10))
})
r := bytes.NewReader([]byte("hello"))

196
object_test.go

@ -3,12 +3,19 @@ package cos
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/xml"
"fmt"
"hash/crc64"
"io/ioutil"
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"testing"
"time"
)
func TestObjectService_Get(t *testing.T) {
@ -59,25 +66,110 @@ func TestObjectService_Put(t *testing.T) {
}
name := "test/hello.txt"
retry := 0
final := 10
mux.HandleFunc("/test/hello.txt", func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodPut)
testHeader(t, r, "x-cos-acl", "private")
testHeader(t, r, "Content-Type", "text/html")
b, _ := ioutil.ReadAll(r.Body)
v := string(b)
want := "hello"
if !reflect.DeepEqual(v, want) {
t.Errorf("Object.Put request body: %#v, want %#v", v, want)
if retry%2 == 0 {
b, _ := ioutil.ReadAll(r.Body)
tb := crc64.MakeTable(crc64.ECMA)
crc := crc64.Update(0, tb, b)
v := string(b)
want := "hello"
if !reflect.DeepEqual(v, want) {
t.Errorf("Object.Put request body: %#v, want %#v", v, want)
}
realcrc := crc64.Update(0, tb, []byte("hello"))
if !reflect.DeepEqual(crc, realcrc) {
t.Errorf("Object.Put crc: %v, want: %v", crc, realcrc)
}
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(crc, 10))
if retry != final {
w.WriteHeader(http.StatusGatewayTimeout)
}
} else {
w.Header().Add("x-cos-hash-crc64ecma", "123456789")
}
})
r := bytes.NewReader([]byte("hello"))
_, err := client.Object.Put(context.Background(), name, r, opt)
for retry <= final {
r := bytes.NewReader([]byte("hello"))
_, err := client.Object.Put(context.Background(), name, r, opt)
if retry < final && err == nil {
t.Fatalf("Error must not nil when retry < final")
}
if retry == final && err != nil {
t.Fatalf("Put Error: %v", err)
}
retry++
}
}
func TestObjectService_PutFromFile(t *testing.T) {
setup()
defer teardown()
filePath := "tmpfile" + time.Now().Format(time.RFC3339)
newfile, err := os.Create(filePath)
if err != nil {
t.Fatalf("Object.Put returned error: %v", err)
t.Fatalf("create tmp file failed")
}
defer os.Remove(filePath)
// 源文件内容
b := make([]byte, 1024*1024*3)
_, err = rand.Read(b)
newfile.Write(b)
newfile.Close()
tb := crc64.MakeTable(crc64.ECMA)
realcrc := crc64.Update(0, tb, b)
opt := &ObjectPutOptions{
ObjectPutHeaderOptions: &ObjectPutHeaderOptions{
ContentType: "text/html",
},
ACLHeaderOptions: &ACLHeaderOptions{
XCosACL: "private",
},
}
name := "test/hello.txt"
retry := 0
final := 4
mux.HandleFunc("/test/hello.txt", func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodPut)
testHeader(t, r, "x-cos-acl", "private")
testHeader(t, r, "Content-Type", "text/html")
if retry%2 == 0 {
bs, _ := ioutil.ReadAll(r.Body)
crc := crc64.Update(0, tb, bs)
if !reflect.DeepEqual(bs, b) {
t.Errorf("Object.Put request body Error")
}
if !reflect.DeepEqual(crc, realcrc) {
t.Errorf("Object.Put crc: %v, want: %v", crc, realcrc)
}
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(crc, 10))
if retry != final {
w.WriteHeader(http.StatusGatewayTimeout)
}
} else {
w.Header().Add("x-cos-hash-crc64ecma", "123456789")
}
})
for retry <= final {
_, err := client.Object.PutFromFile(context.Background(), name, filePath, opt)
if retry < final && err == nil {
t.Fatalf("Error must not nil when retry < final")
}
if retry == final && err != nil {
t.Fatalf("Put Error: %v", err)
}
retry++
}
}
func TestObjectService_Delete(t *testing.T) {
@ -114,7 +206,6 @@ func TestObjectService_Head(t *testing.T) {
if err != nil {
t.Fatalf("Object.Head returned error: %v", err)
}
}
func TestObjectService_Options(t *testing.T) {
@ -272,3 +363,90 @@ func TestObjectService_Copy(t *testing.T) {
t.Errorf("Object.Copy returned %+v, want %+v", ref, want)
}
}
func TestObjectService_Upload(t *testing.T) {
setup()
defer teardown()
filePath := "tmpfile" + time.Now().Format(time.RFC3339)
newfile, err := os.Create(filePath)
if err != nil {
t.Fatalf("create tmp file failed")
}
defer os.Remove(filePath)
// 源文件内容
b := make([]byte, 1024*1024*10)
_, err = rand.Read(b)
newfile.Write(b)
newfile.Close()
// 已上传内容, 10个分块
rb := make([][]byte, 10)
uploadid := "test-cos-multiupload-uploadid"
partmap := make(map[int64]int)
mux.HandleFunc("/test.go.upload", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPut { // 分块上传
r.ParseForm()
part, _ := strconv.ParseInt(r.Form.Get("partNumber"), 10, 64)
if partmap[part] == 0 {
// 重试检验1
partmap[part]++
ioutil.ReadAll(r.Body)
w.WriteHeader(http.StatusGatewayTimeout)
} else if partmap[part] == 1 {
// 重试校验2
partmap[part]++
w.Header().Add("x-cos-hash-crc64ecma", "123456789")
} else { // 正确上传
bs, _ := ioutil.ReadAll(r.Body)
rb[part-1] = bs
md := hex.EncodeToString(calMD5Digest(bs))
crc := crc64.Update(0, crc64.MakeTable(crc64.ECMA), bs)
w.Header().Add("ETag", md)
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(crc, 10))
}
} else {
testMethod(t, r, http.MethodPost)
initreq := url.Values{}
initreq.Set("uploads", "")
compreq := url.Values{}
compreq.Set("uploadId", uploadid)
r.ParseForm()
if reflect.DeepEqual(r.Form, initreq) {
// 初始化分块上传
fmt.Fprintf(w, `<InitiateMultipartUploadResult>
<Bucket></Bucket>
<Key>%v</Key>
<UploadId>%v</UploadId>
</InitiateMultipartUploadResult>`, "test.go.upload", uploadid)
} else if reflect.DeepEqual(r.Form, compreq) {
// 完成分块上传
tb := crc64.MakeTable(crc64.ECMA)
crc := uint64(0)
ccv := make([]uint64, 10)
for i, v := range rb {
ccv[i] = crc64.Update(0, crc64.MakeTable(crc64.ECMA), v)
crc = crc64.Update(crc, tb, v)
}
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(crc, 10))
fmt.Fprintf(w, `<CompleteMultipartUploadResult>
<Location>/test.go.upload</Location>
<Bucket></Bucket>
<Key>test.go.upload</Key>
<ETag>&quot;%v&quot;</ETag>
</CompleteMultipartUploadResult>`, hex.EncodeToString(calMD5Digest(b)))
} else {
t.Errorf("TestObjectService_Upload Unknown Request")
}
}
})
opt := &MultiUploadOptions{
ThreadPoolSize: 3,
PartSize: 1,
}
_, _, err = client.Object.Upload(context.Background(), "test.go.upload", filePath, opt)
if err != nil {
t.Fatalf("Object.Upload returned error: %v", err)
}
}

10
progress.go

@ -2,6 +2,7 @@ package cos
import (
"fmt"
"hash"
"io"
)
@ -101,6 +102,15 @@ func (r *teeReader) Size() int64 {
return r.totalBytes
}
func (r *teeReader) Crc64() uint64 {
if r.writer != nil {
if th, ok := r.writer.(hash.Hash64); ok {
return th.Sum64()
}
}
return 0
}
func TeeReader(reader io.Reader, writer io.Writer, total int64, listener ProgressListener) *teeReader {
return &teeReader{
reader: reader,

Loading…
Cancel
Save