From b0532f56c62b132897e0a3b333cbc43f1f5393da Mon Sep 17 00:00:00 2001 From: jojoliang Date: Wed, 29 Sep 2021 17:32:17 +0800 Subject: [PATCH] add retry options --- auth.go | 1 + auth_test.go | 71 ++++++++++++++++++++++++++++++++++++++++ cos.go | 32 ++++++++++++++++-- example/object/get_with_retry.go | 59 +++++++++++++++++++++++++++++++++ object_test.go | 8 +++++ 5 files changed, 169 insertions(+), 2 deletions(-) create mode 100644 example/object/get_with_retry.go diff --git a/auth.go b/auth.go index 6e06781..0a3e58f 100644 --- a/auth.go +++ b/auth.go @@ -65,6 +65,7 @@ var ciParameters = map[string]bool{ "imageview2/": true, } +// 非线程安全,只能在进程初始化(而不是Client初始化)时做设置 func SetNeedSignHeaders(key string, val bool) { NeedSignHeaders[key] = val } diff --git a/auth_test.go b/auth_test.go index e735e5b..ea4d79e 100644 --- a/auth_test.go +++ b/auth_test.go @@ -2,7 +2,11 @@ package cos import ( "context" + "fmt" "net/http" + "net/http/httptest" + "strconv" + "strings" "testing" "time" ) @@ -52,3 +56,70 @@ func TestAuthorizationTransport(t *testing.T) { req, _ := http.NewRequest("GET", client.BaseURL.BucketURL.String(), nil) client.doAPI(context.Background(), req, nil, true) } + +func TestCVMCredentialsTransport(t *testing.T) { + setup() + defer teardown() + uri := client.BaseURL.BucketURL.String() + ak := "test_ak" + sk := "test_sk" + token := "test_token" + + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("x-cos-security-token") != token { + t.Errorf("CVMCredentialsTransport x-cos-security-token error, want:%v, return:%v\n", token, r.Header.Get("x-cos-security-token")) + } + auth := r.Header.Get("Authorization") + if auth == "" { + t.Error("CVMCredentialsTransport didn't add Authorization header") + } + field := strings.Split(auth, "&") + if len(field) != 7 { + t.Errorf("CVMCredentialsTransport Authorization header format error: %v\n", auth) + } + st_et := strings.Split(strings.Split(field[2], "=")[1], ";") + st, _ := strconv.ParseInt(st_et[0], 10, 64) + et, _ := strconv.ParseInt(st_et[1], 10, 64) + authTime := &AuthTime{ + SignStartTime: time.Unix(st, 0), + SignEndTime: time.Unix(et, 0), + KeyStartTime: time.Unix(st, 0), + KeyEndTime: time.Unix(et, 0), + } + host := strings.TrimLeft(uri, "http://") + req, _ := http.NewRequest("GET", uri, nil) + req.Header.Add("Host", host) + expect := newAuthorization(ak, sk, req, authTime) + if expect != auth { + t.Errorf("CVMCredentialsTransport Authorization error, want:%v, return:%v\n", expect, auth) + } + }) + + // CVM http server + cvm_mux := http.NewServeMux() + cvm_server := httptest.NewServer(cvm_mux) + defer cvm_server.Close() + // 将默认 CVM Host 修改成测试IP:PORT + defaultCVMMetaHost = strings.TrimLeft(cvm_server.URL, "http://") + + cvm_mux.HandleFunc("/"+defaultCVMCredURI, func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, "cvm_read_cos_only") + }) + cvm_mux.HandleFunc("/"+defaultCVMCredURI+"/cvm_read_cos_only", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprint(w, fmt.Sprintf(`{ + "TmpSecretId": "%s", + "TmpSecretKey": "%s", + "ExpiredTime": %v, + "Expiration": "now", + "Token": "%s", + "Code": "Success" + }`, ak, sk, time.Now().Unix()+3600, token)) + }) + + client.client.Transport = &CVMCredentialsTransport{} + req, _ := http.NewRequest("GET", client.BaseURL.BucketURL.String(), nil) + client.doAPI(context.Background(), req, nil, true) + + req, _ = http.NewRequest("GET", client.BaseURL.BucketURL.String(), nil) + client.doAPI(context.Background(), req, nil, true) +} diff --git a/cos.go b/cos.go index 1e411d5..c10fcef 100644 --- a/cos.go +++ b/cos.go @@ -13,6 +13,7 @@ import ( "reflect" "strings" "text/template" + "time" "strconv" @@ -72,9 +73,15 @@ func NewBucketURL(bucketName, region string, secure bool) *url.URL { return u } +type RetryOptions struct { + Count int + Interval time.Duration + StatusCode []int +} type Config struct { EnableCRC bool RequestBodyClose bool + RetryOpt RetryOptions } // Client is a client manages communication with the COS API. @@ -125,6 +132,10 @@ func NewClient(uri *BaseURL, httpClient *http.Client) *Client { Conf: &Config{ EnableCRC: true, RequestBodyClose: false, + RetryOpt: RetryOptions{ + Count: 3, + Interval: time.Duration(0), + }, }, } c.common.client = c @@ -309,14 +320,31 @@ func (c *Client) doRetry(ctx context.Context, opt *sendOptions) (resp *Response, return } } + count := 1 + if count < c.Conf.RetryOpt.Count { + count = c.Conf.RetryOpt.Count + } nr := 0 - for nr < 3 { + interval := c.Conf.RetryOpt.Interval + for nr < count { resp, err = c.send(ctx, opt) if err != nil { if resp != nil && resp.StatusCode <= 499 { - break + dobreak := true + for _, v := range c.Conf.RetryOpt.StatusCode { + if resp.StatusCode == v { + dobreak = false + break + } + } + if dobreak { + break + } } nr++ + if interval > 0 && nr < count { + time.Sleep(interval) + } continue } break diff --git a/example/object/get_with_retry.go b/example/object/get_with_retry.go new file mode 100644 index 0000000..f9c7cc0 --- /dev/null +++ b/example/object/get_with_retry.go @@ -0,0 +1,59 @@ +package main + +import ( + "context" + "fmt" + "net/url" + + "net/http" + "os" + "time" + + "github.com/agin719/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + // Notice when put a large file and set need the request body, might happend out of memory error. + RequestBody: false, + ResponseHeader: true, + ResponseBody: false, + }, + }, + }) + // Get 请求配置重试 + c.Conf.RetryOpt.Count = 3 // 错误重试次数,默认重试3次 + c.Conf.RetryOpt.Interval = time.Millisecond // 错误重试间隔时间,默认0 + c.Conf.RetryOpt.StatusCode = []int{200} // 默认5xx都会重试,该参数配置其余需要重试的响应码 + + name := "exampleobject" + _, err := c.Object.Get(context.Background(), name, nil) + log_status(err) +} diff --git a/object_test.go b/object_test.go index bfc7c4a..02d6299 100644 --- a/object_test.go +++ b/object_test.go @@ -121,6 +121,7 @@ func TestObjectService_GetRetry(t *testing.T) { ResponseHeaderTimeout: 1 * time.Second, }, }) + client.Conf.RetryOpt.StatusCode = []int{499} name := "test/hello.txt" contentLength := 1024 * 1024 * 10 data := make([]byte, contentLength) @@ -132,6 +133,10 @@ func TestObjectService_GetRetry(t *testing.T) { } index++ if index%3 != 0 { + if index > 6 { + w.WriteHeader(499) + return + } time.Sleep(time.Second * 2) } testFormValues(t, r, vs) @@ -163,6 +168,9 @@ func TestObjectService_GetRetry(t *testing.T) { t.Errorf("Object.Get Failed") } } + if index != 9 { + t.Errorf("retry time error, retry count: %v\n", index) + } } func TestObjectService_GetPresignedURL(t *testing.T) {