Browse Source

add append && add retry

master
jojoliang 3 years ago
parent
commit
6d06f3452d
  1. 4
      bucket_accelerate.go
  2. 5
      bucket_accelerate_test.go
  3. 4
      bucket_acl.go
  4. 6
      bucket_cors.go
  5. 4
      bucket_domain.go
  6. 18
      bucket_domain_test.go
  7. 6
      bucket_encryption.go
  8. 4
      bucket_intelligenttiering.go
  9. 8
      bucket_inventory.go
  10. 4
      bucket_logging.go
  11. 6
      bucket_origin.go
  12. 4
      bucket_policy.go
  13. 4
      bucket_referer.go
  14. 6
      bucket_replication.go
  15. 6
      bucket_tagging.go
  16. 4
      bucket_version.go
  17. 6
      bucket_website.go
  18. 11
      cos.go
  19. 16
      costesting/ci_test.go
  20. 74
      example/object/append.go
  21. 91
      object.go
  22. 4
      object_acl.go
  23. 10
      object_part.go
  24. 188
      object_test.go
  25. 11
      progress.go

4
bucket_accelerate.go

@ -20,7 +20,7 @@ func (s *BucketService) PutAccelerate(ctx context.Context, opt *BucketPutAcceler
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err return resp, err
} }
@ -32,6 +32,6 @@ func (s *BucketService) GetAccelerate(ctx context.Context) (*BucketGetAccelerate
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return &res, resp, err return &res, resp, err
} }

5
bucket_accelerate_test.go

@ -51,6 +51,7 @@ func TestBucketService_PutAccelerate(t *testing.T) {
Type: "COS", Type: "COS",
} }
rt := 0
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, "PUT") testMethod(t, r, "PUT")
vs := values{ vs := values{
@ -65,6 +66,10 @@ func TestBucketService_PutAccelerate(t *testing.T) {
if !reflect.DeepEqual(body, want) { if !reflect.DeepEqual(body, want) {
t.Errorf("Bucket.PutAccelerate request\n body: %+v\n, want %+v\n", body, want) t.Errorf("Bucket.PutAccelerate request\n body: %+v\n, want %+v\n", body, want)
} }
rt++
if rt < 3 {
w.WriteHeader(http.StatusBadGateway)
}
}) })
_, err := client.Bucket.PutAccelerate(context.Background(), opt) _, err := client.Bucket.PutAccelerate(context.Background(), opt)

4
bucket_acl.go

@ -19,7 +19,7 @@ func (s *BucketService) GetACL(ctx context.Context) (*BucketGetACLResult, *Respo
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
if err == nil { if err == nil {
decodeACL(resp, &res) decodeACL(resp, &res)
} }
@ -60,6 +60,6 @@ func (s *BucketService) PutACL(ctx context.Context, opt *BucketPutACLOptions) (*
body: body, body: body,
optHeader: header, optHeader: header,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }

6
bucket_cors.go

@ -33,7 +33,7 @@ func (s *BucketService) GetCORS(ctx context.Context) (*BucketGetCORSResult, *Res
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return &res, resp, err return &res, resp, err
} }
@ -53,7 +53,7 @@ func (s *BucketService) PutCORS(ctx context.Context, opt *BucketPutCORSOptions)
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }
@ -66,6 +66,6 @@ func (s *BucketService) DeleteCORS(ctx context.Context) (*Response, error) {
uri: "/?cors", uri: "/?cors",
method: http.MethodDelete, method: http.MethodDelete,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }

4
bucket_domain.go

@ -22,7 +22,7 @@ func (s *BucketService) PutDomain(ctx context.Context, opt *BucketPutDomainOptio
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err return resp, err
} }
@ -34,6 +34,6 @@ func (s *BucketService) GetDomain(ctx context.Context) (*BucketGetDomainResult,
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return &res, resp, err return &res, resp, err
} }

18
bucket_domain_test.go

@ -13,12 +13,18 @@ func TestBucketService_GetDomain(t *testing.T) {
setup() setup()
defer teardown() defer teardown()
rt := 0
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, "GET") testMethod(t, r, "GET")
vs := values{ vs := values{
"domain": "", "domain": "",
} }
testFormValues(t, r, vs) testFormValues(t, r, vs)
rt++
if rt < 3 {
w.WriteHeader(http.StatusGatewayTimeout)
}
fmt.Fprint(w, `<DomainConfiguration> fmt.Fprint(w, `<DomainConfiguration>
<DomainRule> <DomainRule>
<Status>ENABLED</Status> <Status>ENABLED</Status>
@ -59,13 +65,17 @@ func TestBucketService_PutDomain(t *testing.T) {
ForcedReplacement: "CNAME", ForcedReplacement: "CNAME",
} }
rt := 0
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, "PUT") testMethod(t, r, "PUT")
vs := values{ vs := values{
"domain": "", "domain": "",
} }
testFormValues(t, r, vs) testFormValues(t, r, vs)
rt++
if rt < 3 {
w.WriteHeader(http.StatusGatewayTimeout)
}
body := new(BucketPutDomainOptions) body := new(BucketPutDomainOptions)
xml.NewDecoder(r.Body).Decode(body) xml.NewDecoder(r.Body).Decode(body)
want := opt want := opt
@ -87,6 +97,7 @@ func TestBucketService_DeleteDomain(t *testing.T) {
opt := &BucketPutDomainOptions{} opt := &BucketPutDomainOptions{}
rt := 0
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodPut) testMethod(t, r, http.MethodPut)
vs := values{ vs := values{
@ -94,6 +105,11 @@ func TestBucketService_DeleteDomain(t *testing.T) {
} }
testFormValues(t, r, vs) testFormValues(t, r, vs)
rt++
if rt < 3 {
w.WriteHeader(http.StatusGatewayTimeout)
return
}
body := new(BucketPutDomainOptions) body := new(BucketPutDomainOptions)
xml.NewDecoder(r.Body).Decode(body) xml.NewDecoder(r.Body).Decode(body)
want := opt want := opt

6
bucket_encryption.go

@ -24,7 +24,7 @@ func (s *BucketService) PutEncryption(ctx context.Context, opt *BucketPutEncrypt
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err return resp, err
} }
@ -36,7 +36,7 @@ func (s *BucketService) GetEncryption(ctx context.Context) (*BucketGetEncryption
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return &res, resp, err return &res, resp, err
} }
@ -46,6 +46,6 @@ func (s *BucketService) DeleteEncryption(ctx context.Context) (*Response, error)
uri: "/?encryption", uri: "/?encryption",
method: http.MethodDelete, method: http.MethodDelete,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err return resp, err
} }

4
bucket_intelligenttiering.go

@ -29,7 +29,7 @@ func (s *BucketService) PutIntelligentTiering(ctx context.Context, opt *BucketPu
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }
@ -41,7 +41,7 @@ func (s *BucketService) GetIntelligentTiering(ctx context.Context) (*BucketGetIn
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return &res, resp, err return &res, resp, err
} }

8
bucket_inventory.go

@ -74,7 +74,7 @@ func (s *BucketService) PutInventory(ctx context.Context, id string, opt *Bucket
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }
@ -89,7 +89,7 @@ func (s *BucketService) GetInventory(ctx context.Context, id string) (*BucketGet
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return &res, resp, err return &res, resp, err
} }
@ -101,7 +101,7 @@ func (s *BucketService) DeleteInventory(ctx context.Context, id string) (*Respon
uri: u, uri: u,
method: http.MethodDelete, method: http.MethodDelete,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }
@ -120,7 +120,7 @@ func (s *BucketService) ListInventoryConfigurations(ctx context.Context, token s
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return &res, resp, err return &res, resp, err
} }

4
bucket_logging.go

@ -31,7 +31,7 @@ func (s *BucketService) PutLogging(ctx context.Context, opt *BucketPutLoggingOpt
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }
@ -44,7 +44,7 @@ func (s *BucketService) GetLogging(ctx context.Context) (*BucketGetLoggingResult
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return &res, resp, err return &res, resp, err
} }

6
bucket_origin.go

@ -64,7 +64,7 @@ func (s *BucketService) PutOrigin(ctx context.Context, opt *BucketPutOriginOptio
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err return resp, err
} }
@ -76,7 +76,7 @@ func (s *BucketService) GetOrigin(ctx context.Context) (*BucketGetOriginResult,
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return &res, resp, err return &res, resp, err
} }
@ -86,6 +86,6 @@ func (s *BucketService) DeleteOrigin(ctx context.Context) (*Response, error) {
uri: "/?origin", uri: "/?origin",
method: http.MethodDelete, method: http.MethodDelete,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err return resp, err
} }

4
bucket_policy.go

@ -53,7 +53,7 @@ func (s *BucketService) GetPolicy(ctx context.Context) (*BucketGetPolicyResult,
method: http.MethodGet, method: http.MethodGet,
result: &bs, result: &bs,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
if err == nil { if err == nil {
err = json.Unmarshal(bs.Bytes(), &res) err = json.Unmarshal(bs.Bytes(), &res)
} }
@ -66,6 +66,6 @@ func (s *BucketService) DeletePolicy(ctx context.Context) (*Response, error) {
uri: "/?policy", uri: "/?policy",
method: http.MethodDelete, method: http.MethodDelete,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err return resp, err
} }

4
bucket_referer.go

@ -23,7 +23,7 @@ func (s *BucketService) PutReferer(ctx context.Context, opt *BucketPutRefererOpt
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err return resp, err
} }
@ -35,6 +35,6 @@ func (s *BucketService) GetReferer(ctx context.Context) (*BucketGetRefererResult
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return &res, resp, err return &res, resp, err
} }

6
bucket_replication.go

@ -38,7 +38,7 @@ func (s *BucketService) PutBucketReplication(ctx context.Context, opt *PutBucket
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }
@ -52,7 +52,7 @@ func (s *BucketService) GetBucketReplication(ctx context.Context) (*GetBucketRep
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return &res, resp, err return &res, resp, err
} }
@ -64,6 +64,6 @@ func (s *BucketService) DeleteBucketReplication(ctx context.Context) (*Response,
uri: "/?replication", uri: "/?replication",
method: http.MethodDelete, method: http.MethodDelete,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }

6
bucket_tagging.go

@ -29,7 +29,7 @@ func (s *BucketService) GetTagging(ctx context.Context) (*BucketGetTaggingResult
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return &res, resp, err return &res, resp, err
} }
@ -51,7 +51,7 @@ func (s *BucketService) PutTagging(ctx context.Context, opt *BucketPutTaggingOpt
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }
@ -64,6 +64,6 @@ func (s *BucketService) DeleteTagging(ctx context.Context) (*Response, error) {
uri: "/?tagging", uri: "/?tagging",
method: http.MethodDelete, method: http.MethodDelete,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }

4
bucket_version.go

@ -24,7 +24,7 @@ func (s *BucketService) PutVersioning(ctx context.Context, opt *BucketPutVersion
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }
@ -37,6 +37,6 @@ func (s *BucketService) GetVersioning(ctx context.Context) (*BucketGetVersionRes
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return &res, resp, err return &res, resp, err
} }

6
bucket_website.go

@ -44,7 +44,7 @@ func (s *BucketService) PutWebsite(ctx context.Context, opt *BucketPutWebsiteOpt
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err return resp, err
} }
@ -56,7 +56,7 @@ func (s *BucketService) GetWebsite(ctx context.Context) (*BucketGetWebsiteResult
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return &res, resp, err return &res, resp, err
} }
@ -66,6 +66,6 @@ func (s *BucketService) DeleteWebsite(ctx context.Context) (*Response, error) {
uri: "/?website", uri: "/?website",
method: http.MethodDelete, method: http.MethodDelete,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err return resp, err
} }

11
cos.go

@ -22,7 +22,7 @@ import (
const ( const (
// Version current go sdk version // Version current go sdk version
Version = "0.7.29"
Version = "0.7.30"
userAgent = "cos-go-sdk-v5/" + Version userAgent = "cos-go-sdk-v5/" + Version
contentTypeXML = "application/xml" contentTypeXML = "application/xml"
defaultServiceBaseURL = "http://service.cos.myqcloud.com" defaultServiceBaseURL = "http://service.cos.myqcloud.com"
@ -244,6 +244,10 @@ func (c *Client) doAPI(ctx context.Context, req *http.Request, result interface{
err = checkResponse(resp) err = checkResponse(resp)
if err != nil { if err != nil {
// StatusCode != 2xx when Get Object
if !closeBody {
resp.Body.Close()
}
// even though there was an error, we still return the response // even though there was an error, we still return the response
// in case the caller wants to inspect it further // in case the caller wants to inspect it further
return response, err return response, err
@ -251,7 +255,7 @@ func (c *Client) doAPI(ctx context.Context, req *http.Request, result interface{
// need CRC64 verification // need CRC64 verification
if reader, ok := req.Body.(*teeReader); ok { if reader, ok := req.Body.(*teeReader); ok {
if c.Conf.EnableCRC && reader.writer != nil {
if c.Conf.EnableCRC && reader.writer != nil && !reader.disableCheckSum {
localcrc := reader.Crc64() localcrc := reader.Crc64()
scoscrc := response.Header.Get("x-cos-hash-crc64ecma") scoscrc := response.Header.Get("x-cos-hash-crc64ecma")
icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64) icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64)
@ -298,7 +302,8 @@ type sendOptions struct {
func (c *Client) doRetry(ctx context.Context, opt *sendOptions) (resp *Response, err error) { func (c *Client) doRetry(ctx context.Context, opt *sendOptions) (resp *Response, err error) {
if opt.body != nil { if opt.body != nil {
if _, ok := opt.body.(io.Reader); ok { if _, ok := opt.body.(io.Reader); ok {
return c.send(ctx, opt)
resp, err = c.send(ctx, opt)
return
} }
} }
nr := 0 nr := 0

16
costesting/ci_test.go

@ -2,6 +2,7 @@ package cos
// Basic imports // Basic imports
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -812,6 +813,21 @@ func (s *CosTestSuite) TestMultiUpload() {
assert.Nil(s.T(), err, "remove tmp file failed") assert.Nil(s.T(), err, "remove tmp file failed")
} }
func (s *CosTestSuite) TestAppend() {
name := "append" + time.Now().Format(time.RFC3339)
b1 := make([]byte, 1024*1024*10)
_, err := rand.Read(b1)
pos, _, err := s.Client.Object.Append(context.Background(), name, 0, bytes.NewReader(b1), nil)
assert.Nil(s.T(), err, "append object failed")
assert.Equal(s.T(), len(b1), pos, "append object pos error")
b2 := make([]byte, 12345)
rand.Read(b2)
pos, _, err = s.Client.Object.Append(context.Background(), name, pos, bytes.NewReader(b2), nil)
assert.Nil(s.T(), err, "append object failed")
assert.Equal(s.T(), len(b1)+len(b2), pos, "append object pos error")
}
/* /*
func (s *CosTestSuite) TestBatch() { func (s *CosTestSuite) TestBatch() {
client := cos.NewClient(s.Client.BaseURL, &http.Client{ client := cos.NewClient(s.Client.BaseURL, &http.Client{

74
example/object/append.go

@ -0,0 +1,74 @@
package main
import (
"context"
"fmt"
"net/url"
"os"
"strings"
"time"
"net/http"
"github.com/agin719/cos-go-sdk-v5"
"github.com/tencentyun/cos-go-sdk-v5/debug"
)
func log_status(err error) {
if err == nil {
return
}
if cos.IsNotFoundError(err) {
// WARN
fmt.Println("WARN: Resource is not existed")
} else if e, ok := cos.IsCOSError(err); ok {
fmt.Printf("ERROR: Code: %v\n", e.Code)
fmt.Printf("ERROR: Message: %v\n", e.Message)
fmt.Printf("ERROR: Resource: %v\n", e.Resource)
fmt.Printf("ERROR: RequestId: %v\n", e.RequestID)
// ERROR
} else {
fmt.Printf("ERROR: %v\n", err)
// ERROR
}
}
func main() {
u, _ := url.Parse("http://test-1259654469.cos.ap-guangzhou.myqcloud.com")
b := &cos.BaseURL{BucketURL: u}
c := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: os.Getenv("COS_SECRETID"),
SecretKey: os.Getenv("COS_SECRETKEY"),
Transport: &debug.DebugRequestTransport{
RequestHeader: true,
// Notice when put a large file and set need the request body, might happend out of memory error.
RequestBody: false,
ResponseHeader: true,
ResponseBody: false,
},
},
})
opt := &cos.ObjectPutOptions{
ObjectPutHeaderOptions: &cos.ObjectPutHeaderOptions{
ContentType: "text/html",
Listener: &cos.DefaultProgressListener{},
},
ACLHeaderOptions: &cos.ACLHeaderOptions{
XCosACL: "private",
},
}
name := "append" + time.Now().Format(time.RFC3339)
str1 := "test append object 1"
pos, _, err := c.Object.Append(context.Background(), name, 0, strings.NewReader(str1), opt)
log_status(err)
fmt.Printf("pos: %d\n", pos)
str2 := "test append object 2"
pos, _, err = c.Object.Append(context.Background(), name, pos, strings.NewReader(str2), opt)
log_status(err)
fmt.Printf("pos: %d\n", pos)
}

91
object.go

@ -3,6 +3,7 @@ package cos
import ( import (
"context" "context"
"crypto/md5" "crypto/md5"
"encoding/hex"
"encoding/json" "encoding/json"
"encoding/xml" "encoding/xml"
"errors" "errors"
@ -72,7 +73,7 @@ func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOpti
optHeader: opt, optHeader: opt,
disableCloseBody: true, disableCloseBody: true,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
if opt != nil && opt.Listener != nil { if opt != nil && opt.Listener != nil {
if err == nil && resp != nil { if err == nil && resp != nil {
@ -349,7 +350,7 @@ func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *O
optHeader: copyOpt, optHeader: copyOpt,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
// If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
if err == nil && resp.StatusCode == 200 { if err == nil && resp.StatusCode == 200 {
if res.ETag == "" { if res.ETag == "" {
@ -389,7 +390,7 @@ func (s *ObjectService) Delete(ctx context.Context, name string, opt ...*ObjectD
optHeader: optHeader, optHeader: optHeader,
optQuery: optHeader, optQuery: optHeader,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }
@ -422,7 +423,7 @@ func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOp
method: http.MethodHead, method: http.MethodHead,
optHeader: opt, optHeader: opt,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" { if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0]) resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
} }
@ -478,13 +479,11 @@ func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *Objec
body: opt, body: opt,
optHeader: opt, optHeader: opt,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }
// TODO Append 接口在优化未开放使用
//
// Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。 // Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
// 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。 // 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
// //
@ -498,21 +497,59 @@ func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *Objec
// 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
// //
// https://www.qcloud.com/document/product/436/7741 // https://www.qcloud.com/document/product/436/7741
// func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
// u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
// if position != 0{
// opt = nil
// }
// sendOpt := sendOptions{
// baseURL: s.client.BaseURL.BucketURL,
// uri: u,
// method: http.MethodPost,
// optHeader: opt,
// body: r,
// }
// resp, err := s.client.send(ctx, &sendOpt)
// return resp, err
// }
func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (int, *Response, error) {
res := position
if r == nil {
return res, nil, fmt.Errorf("reader is nil")
}
if err := CheckReaderLen(r); err != nil {
return res, nil, err
}
opt = CloneObjectPutOptions(opt)
totalBytes, err := GetReaderLen(r)
if err != nil && opt != nil && opt.Listener != nil {
if opt.ContentLength == 0 {
return res, nil, err
}
totalBytes = opt.ContentLength
}
if err == nil {
// 与 go http 保持一致, 非bytes.Buffer/bytes.Reader/strings.Reader需用户指定ContentLength
if opt != nil && opt.ContentLength == 0 && IsLenReader(r) {
opt.ContentLength = totalBytes
}
}
reader := TeeReader(r, nil, totalBytes, nil)
if s.client.Conf.EnableCRC {
reader.writer = md5.New() // MD5校验
reader.disableCheckSum = true
}
if opt != nil && opt.Listener != nil {
reader.listener = opt.Listener
}
u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: u,
method: http.MethodPost,
optHeader: opt,
body: reader,
}
resp, err := s.client.send(ctx, &sendOpt)
if err == nil {
// 数据校验
if s.client.Conf.EnableCRC && reader.writer != nil {
wanted := hex.EncodeToString(reader.Sum())
if wanted != resp.Header.Get("x-cos-content-sha1") {
return res, resp, fmt.Errorf("append verification failed, want:%v, return:%v", wanted, resp.Header.Get("x-cos-content-sha1"))
}
}
np, err := strconv.ParseInt(resp.Header.Get("x-cos-next-append-position"), 10, 64)
return int(np), resp, err
}
return res, resp, err
}
// ObjectDeleteMultiOptions is the option of DeleteMulti // ObjectDeleteMultiOptions is the option of DeleteMulti
type ObjectDeleteMultiOptions struct { type ObjectDeleteMultiOptions struct {
@ -547,7 +584,7 @@ func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiO
body: opt, body: opt,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return &res, resp, err return &res, resp, err
} }
@ -1256,7 +1293,7 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
} }
job := &Jobs{ job := &Jobs{
Name: name, Name: name,
RetryTimes: 3,
RetryTimes: 1,
FilePath: filepath, FilePath: filepath,
Chunk: chunk, Chunk: chunk,
DownOpt: &downOpt, DownOpt: &downOpt,
@ -1340,7 +1377,7 @@ func (s *ObjectService) PutTagging(ctx context.Context, name string, opt *Object
method: http.MethodPut, method: http.MethodPut,
body: opt, body: opt,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err return resp, err
} }
@ -1361,7 +1398,7 @@ func (s *ObjectService) GetTagging(ctx context.Context, name string, id ...strin
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return &res, resp, err return &res, resp, err
} }
@ -1380,6 +1417,6 @@ func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...st
uri: u, uri: u,
method: http.MethodDelete, method: http.MethodDelete,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err return resp, err
} }

4
object_acl.go

@ -19,7 +19,7 @@ func (s *ObjectService) GetACL(ctx context.Context, name string) (*ObjectGetACLR
method: http.MethodGet, method: http.MethodGet,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
if err == nil { if err == nil {
decodeACL(resp, &res) decodeACL(resp, &res)
} }
@ -61,6 +61,6 @@ func (s *ObjectService) PutACL(ctx context.Context, name string, opt *ObjectPutA
optHeader: header, optHeader: header,
body: body, body: body,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }

10
object_part.go

@ -40,7 +40,7 @@ func (s *ObjectService) InitiateMultipartUpload(ctx context.Context, name string
optHeader: opt, optHeader: opt,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return &res, resp, err return &res, resp, err
} }
@ -148,7 +148,7 @@ func (s *ObjectService) ListParts(ctx context.Context, name, uploadID string, op
result: &res, result: &res,
optQuery: opt, optQuery: opt,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return &res, resp, err return &res, resp, err
} }
@ -209,7 +209,7 @@ func (s *ObjectService) CompleteMultipartUpload(ctx context.Context, name, uploa
body: opt, body: opt,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
// If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
if err == nil && resp.StatusCode == 200 { if err == nil && resp.StatusCode == 200 {
if res.ETag == "" { if res.ETag == "" {
@ -232,7 +232,7 @@ func (s *ObjectService) AbortMultipartUpload(ctx context.Context, name, uploadID
uri: u, uri: u,
method: http.MethodDelete, method: http.MethodDelete,
} }
resp, err := s.client.send(ctx, &sendOpt)
resp, err := s.client.doRetry(ctx, &sendOpt)
return resp, err return resp, err
} }
@ -328,7 +328,7 @@ func (s *ObjectService) ListUploads(ctx context.Context, opt *ObjectListUploadsO
optQuery: opt, optQuery: opt,
result: &res, result: &res,
} }
resp, err := s.client.send(ctx, sendOpt)
resp, err := s.client.doRetry(ctx, sendOpt)
return &res, resp, err return &res, resp, err
} }

188
object_test.go

@ -12,6 +12,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
math_rand "math/rand" math_rand "math/rand"
"net"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@ -101,6 +102,68 @@ func TestObjectService_GetToFile(t *testing.T) {
} }
} }
func TestObjectService_GetRetry(t *testing.T) {
setup()
defer teardown()
u, _ := url.Parse(server.URL)
client := NewClient(&BaseURL{u, u, u, u}, &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: 1 * time.Second,
},
})
name := "test/hello.txt"
contentLength := 1024 * 1024 * 10
data := make([]byte, contentLength)
index := 0
mux.HandleFunc("/test/hello.txt", func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, "GET")
vs := values{
"response-content-type": "text/html",
}
index++
if index%3 != 0 {
time.Sleep(time.Second * 2)
}
testFormValues(t, r, vs)
strRange := r.Header.Get("Range")
slice1 := strings.Split(strRange, "=")
slice2 := strings.Split(slice1[1], "-")
start, _ := strconv.ParseInt(slice2[0], 10, 64)
end, _ := strconv.ParseInt(slice2[1], 10, 64)
io.Copy(w, bytes.NewBuffer(data[start:end+1]))
})
for i := 0; i < 3; i++ {
math_rand.Seed(time.Now().UnixNano())
rangeStart := math_rand.Intn(contentLength)
rangeEnd := rangeStart + math_rand.Intn(contentLength-rangeStart)
if rangeEnd == rangeStart || rangeStart >= contentLength-1 {
continue
}
opt := &ObjectGetOptions{
ResponseContentType: "text/html",
Range: fmt.Sprintf("bytes=%v-%v", rangeStart, rangeEnd),
}
resp, err := client.Object.Get(context.Background(), name, opt)
if err != nil {
t.Fatalf("Object.Get returned error: %v", err)
}
b, _ := ioutil.ReadAll(resp.Body)
if bytes.Compare(b, data[rangeStart:rangeEnd+1]) != 0 {
t.Errorf("Object.Get Failed")
}
}
}
func TestObjectService_GetPresignedURL(t *testing.T) { func TestObjectService_GetPresignedURL(t *testing.T) {
setup() setup()
defer teardown() defer teardown()
@ -347,46 +410,52 @@ func TestObjectService_PostRestore(t *testing.T) {
} }
// func TestObjectService_Append(t *testing.T) {
// setup()
// defer teardown()
func TestObjectService_Append_Simple(t *testing.T) {
setup()
defer teardown()
// opt := &ObjectPutOptions{
// ObjectPutHeaderOptions: &ObjectPutHeaderOptions{
// ContentType: "text/html",
// },
// ACLHeaderOptions: &ACLHeaderOptions{
// XCosACL: "private",
// },
// }
// name := "test/hello.txt"
// position := 0
opt := &ObjectPutOptions{
ObjectPutHeaderOptions: &ObjectPutHeaderOptions{
ContentType: "text/html",
},
ACLHeaderOptions: &ACLHeaderOptions{
XCosACL: "private",
},
}
name := "test/hello.txt"
position := 0
// mux.HandleFunc("/test/hello.txt", func(w http.ResponseWriter, r *http.Request) {
// vs := values{
// "append": "",
// "position": "0",
// }
// testFormValues(t, r, vs)
mux.HandleFunc("/test/hello.txt", func(w http.ResponseWriter, r *http.Request) {
vs := values{
"append": "",
"position": "0",
}
testFormValues(t, r, vs)
// testMethod(t, r, http.MethodPost)
// testHeader(t, r, "x-cos-acl", "private")
// testHeader(t, r, "Content-Type", "text/html")
testMethod(t, r, http.MethodPost)
testHeader(t, r, "x-cos-acl", "private")
testHeader(t, r, "Content-Type", "text/html")
// b, _ := ioutil.ReadAll(r.Body)
// v := string(b)
// want := "hello"
// if !reflect.DeepEqual(v, want) {
// t.Errorf("Object.Append request body: %#v, want %#v", v, want)
// }
// })
// r := bytes.NewReader([]byte("hello"))
// _, err := client.Object.Append(context.Background(), name, position, r, opt)
// if err != nil {
// t.Fatalf("Object.Append returned error: %v", err)
// }
// }
b, _ := ioutil.ReadAll(r.Body)
v := string(b)
want := "hello"
if !reflect.DeepEqual(v, want) {
t.Errorf("Object.Append request body: %#v, want %#v", v, want)
}
w.Header().Add("x-cos-content-sha1", hex.EncodeToString(calMD5Digest(b)))
w.Header().Add("x-cos-next-append-position", strconv.FormatInt(int64(len(b)), 10))
})
r := bytes.NewReader([]byte("hello"))
p, _, err := client.Object.Append(context.Background(), name, position, r, opt)
if err != nil {
t.Fatalf("Object.Append returned error: %v", err)
}
if p != len("hello") {
t.Fatalf("Object.Append position error, want: %v, return: %v", len("hello"), p)
}
}
func TestObjectService_DeleteMulti(t *testing.T) { func TestObjectService_DeleteMulti(t *testing.T) {
setup() setup()
@ -519,6 +588,52 @@ func TestObjectService_Copy(t *testing.T) {
} }
} }
func TestObjectService_Append(t *testing.T) {
setup()
defer teardown()
size := 1111 * 1111 * 63
b := make([]byte, size)
p := int(math_rand.Int31n(int32(size)))
var buf bytes.Buffer
mux.HandleFunc("/test.append", func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, "POST")
bs, _ := ioutil.ReadAll(r.Body)
buf.Write(bs)
w.Header().Add("x-cos-content-sha1", hex.EncodeToString(calMD5Digest(bs)))
w.Header().Add("x-cos-next-append-position", strconv.FormatInt(int64(buf.Len()), 10))
})
pos, _, err := client.Object.Append(context.Background(), "test.append", 0, bytes.NewReader(b[:p]), nil)
if err != nil {
t.Fatalf("Object.Append return error %v", err)
}
if pos != p {
t.Fatalf("Object.Append pos error, returned:%v, wanted:%v", pos, p)
}
opt := &ObjectPutOptions{
ObjectPutHeaderOptions: &ObjectPutHeaderOptions{
ContentType: "text/html",
Listener: &DefaultProgressListener{},
},
ACLHeaderOptions: &ACLHeaderOptions{
XCosACL: "private",
},
}
pos, _, err = client.Object.Append(context.Background(), "test.append", pos, bytes.NewReader(b[p:]), opt)
if err != nil {
t.Fatalf("Object.Append return error %v", err)
}
if pos != size {
t.Fatalf("Object.Append pos error, returned:%v, wanted:%v", pos, size)
}
if bytes.Compare(b, buf.Bytes()) != 0 {
t.Fatalf("Object.Append Compare failed")
}
}
func TestObjectService_Upload(t *testing.T) { func TestObjectService_Upload(t *testing.T) {
setup() setup()
defer teardown() defer teardown()
@ -833,6 +948,7 @@ func TestObjectService_DownloadWithCheckPoint(t *testing.T) {
t.Fatalf("Object.Download failed, odd:%v, even:%v", oddcount, evencount) t.Fatalf("Object.Download failed, odd:%v, even:%v", oddcount, evencount)
} }
} }
func TestObjectService_GetTagging(t *testing.T) { func TestObjectService_GetTagging(t *testing.T) {
setup() setup()
defer teardown() defer teardown()

11
progress.go

@ -57,6 +57,7 @@ type teeReader struct {
consumedBytes int64 consumedBytes int64
totalBytes int64 totalBytes int64
listener ProgressListener listener ProgressListener
disableCheckSum bool
} }
func (r *teeReader) Read(p []byte) (int, error) { func (r *teeReader) Read(p []byte) (int, error) {
@ -111,6 +112,15 @@ func (r *teeReader) Crc64() uint64 {
return 0 return 0
} }
func (r *teeReader) Sum() []byte {
if r.writer != nil {
if th, ok := r.writer.(hash.Hash); ok {
return th.Sum(nil)
}
}
return []byte{}
}
func TeeReader(reader io.Reader, writer io.Writer, total int64, listener ProgressListener) *teeReader { func TeeReader(reader io.Reader, writer io.Writer, total int64, listener ProgressListener) *teeReader {
return &teeReader{ return &teeReader{
reader: reader, reader: reader,
@ -118,6 +128,7 @@ func TeeReader(reader io.Reader, writer io.Writer, total int64, listener Progres
consumedBytes: 0, consumedBytes: 0,
totalBytes: total, totalBytes: total,
listener: listener, listener: listener,
disableCheckSum: false,
} }
} }

Loading…
Cancel
Save