Browse Source

add retry options

master
jojoliang 3 years ago
parent
commit
b0532f56c6
  1. 1
      auth.go
  2. 71
      auth_test.go
  3. 30
      cos.go
  4. 59
      example/object/get_with_retry.go
  5. 8
      object_test.go

1
auth.go

@ -65,6 +65,7 @@ var ciParameters = map[string]bool{
"imageview2/": true, "imageview2/": true,
} }
// 非线程安全,只能在进程初始化(而不是Client初始化)时做设置
func SetNeedSignHeaders(key string, val bool) { func SetNeedSignHeaders(key string, val bool) {
NeedSignHeaders[key] = val NeedSignHeaders[key] = val
} }

71
auth_test.go

@ -2,7 +2,11 @@ package cos
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"net/http/httptest"
"strconv"
"strings"
"testing" "testing"
"time" "time"
) )
@ -52,3 +56,70 @@ func TestAuthorizationTransport(t *testing.T) {
req, _ := http.NewRequest("GET", client.BaseURL.BucketURL.String(), nil) req, _ := http.NewRequest("GET", client.BaseURL.BucketURL.String(), nil)
client.doAPI(context.Background(), req, nil, true) client.doAPI(context.Background(), req, nil, true)
} }
func TestCVMCredentialsTransport(t *testing.T) {
setup()
defer teardown()
uri := client.BaseURL.BucketURL.String()
ak := "test_ak"
sk := "test_sk"
token := "test_token"
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("x-cos-security-token") != token {
t.Errorf("CVMCredentialsTransport x-cos-security-token error, want:%v, return:%v\n", token, r.Header.Get("x-cos-security-token"))
}
auth := r.Header.Get("Authorization")
if auth == "" {
t.Error("CVMCredentialsTransport didn't add Authorization header")
}
field := strings.Split(auth, "&")
if len(field) != 7 {
t.Errorf("CVMCredentialsTransport Authorization header format error: %v\n", auth)
}
st_et := strings.Split(strings.Split(field[2], "=")[1], ";")
st, _ := strconv.ParseInt(st_et[0], 10, 64)
et, _ := strconv.ParseInt(st_et[1], 10, 64)
authTime := &AuthTime{
SignStartTime: time.Unix(st, 0),
SignEndTime: time.Unix(et, 0),
KeyStartTime: time.Unix(st, 0),
KeyEndTime: time.Unix(et, 0),
}
host := strings.TrimLeft(uri, "http://")
req, _ := http.NewRequest("GET", uri, nil)
req.Header.Add("Host", host)
expect := newAuthorization(ak, sk, req, authTime)
if expect != auth {
t.Errorf("CVMCredentialsTransport Authorization error, want:%v, return:%v\n", expect, auth)
}
})
// CVM http server
cvm_mux := http.NewServeMux()
cvm_server := httptest.NewServer(cvm_mux)
defer cvm_server.Close()
// 将默认 CVM Host 修改成测试IP:PORT
defaultCVMMetaHost = strings.TrimLeft(cvm_server.URL, "http://")
cvm_mux.HandleFunc("/"+defaultCVMCredURI, func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, "cvm_read_cos_only")
})
cvm_mux.HandleFunc("/"+defaultCVMCredURI+"/cvm_read_cos_only", func(w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, fmt.Sprintf(`{
"TmpSecretId": "%s",
"TmpSecretKey": "%s",
"ExpiredTime": %v,
"Expiration": "now",
"Token": "%s",
"Code": "Success"
}`, ak, sk, time.Now().Unix()+3600, token))
})
client.client.Transport = &CVMCredentialsTransport{}
req, _ := http.NewRequest("GET", client.BaseURL.BucketURL.String(), nil)
client.doAPI(context.Background(), req, nil, true)
req, _ = http.NewRequest("GET", client.BaseURL.BucketURL.String(), nil)
client.doAPI(context.Background(), req, nil, true)
}

30
cos.go

@ -13,6 +13,7 @@ import (
"reflect" "reflect"
"strings" "strings"
"text/template" "text/template"
"time"
"strconv" "strconv"
@ -72,9 +73,15 @@ func NewBucketURL(bucketName, region string, secure bool) *url.URL {
return u return u
} }
type RetryOptions struct {
Count int
Interval time.Duration
StatusCode []int
}
type Config struct { type Config struct {
EnableCRC bool EnableCRC bool
RequestBodyClose bool RequestBodyClose bool
RetryOpt RetryOptions
} }
// Client is a client manages communication with the COS API. // Client is a client manages communication with the COS API.
@ -125,6 +132,10 @@ func NewClient(uri *BaseURL, httpClient *http.Client) *Client {
Conf: &Config{ Conf: &Config{
EnableCRC: true, EnableCRC: true,
RequestBodyClose: false, RequestBodyClose: false,
RetryOpt: RetryOptions{
Count: 3,
Interval: time.Duration(0),
},
}, },
} }
c.common.client = c c.common.client = c
@ -309,14 +320,31 @@ func (c *Client) doRetry(ctx context.Context, opt *sendOptions) (resp *Response,
return return
} }
} }
count := 1
if count < c.Conf.RetryOpt.Count {
count = c.Conf.RetryOpt.Count
}
nr := 0 nr := 0
for nr < 3 {
interval := c.Conf.RetryOpt.Interval
for nr < count {
resp, err = c.send(ctx, opt) resp, err = c.send(ctx, opt)
if err != nil { if err != nil {
if resp != nil && resp.StatusCode <= 499 { if resp != nil && resp.StatusCode <= 499 {
dobreak := true
for _, v := range c.Conf.RetryOpt.StatusCode {
if resp.StatusCode == v {
dobreak = false
break
}
}
if dobreak {
break break
} }
}
nr++ nr++
if interval > 0 && nr < count {
time.Sleep(interval)
}
continue continue
} }
break break

59
example/object/get_with_retry.go

@ -0,0 +1,59 @@
package main
import (
"context"
"fmt"
"net/url"
"net/http"
"os"
"time"
"github.com/agin719/cos-go-sdk-v5"
"github.com/tencentyun/cos-go-sdk-v5/debug"
)
func log_status(err error) {
if err == nil {
return
}
if cos.IsNotFoundError(err) {
// WARN
fmt.Println("WARN: Resource is not existed")
} else if e, ok := cos.IsCOSError(err); ok {
fmt.Printf("ERROR: Code: %v\n", e.Code)
fmt.Printf("ERROR: Message: %v\n", e.Message)
fmt.Printf("ERROR: Resource: %v\n", e.Resource)
fmt.Printf("ERROR: RequestId: %v\n", e.RequestID)
// ERROR
} else {
fmt.Printf("ERROR: %v\n", err)
// ERROR
}
}
func main() {
u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com")
b := &cos.BaseURL{BucketURL: u}
c := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: os.Getenv("COS_SECRETID"),
SecretKey: os.Getenv("COS_SECRETKEY"),
Transport: &debug.DebugRequestTransport{
RequestHeader: true,
// Notice when put a large file and set need the request body, might happend out of memory error.
RequestBody: false,
ResponseHeader: true,
ResponseBody: false,
},
},
})
// Get 请求配置重试
c.Conf.RetryOpt.Count = 3 // 错误重试次数,默认重试3次
c.Conf.RetryOpt.Interval = time.Millisecond // 错误重试间隔时间,默认0
c.Conf.RetryOpt.StatusCode = []int{200} // 默认5xx都会重试,该参数配置其余需要重试的响应码
name := "exampleobject"
_, err := c.Object.Get(context.Background(), name, nil)
log_status(err)
}

8
object_test.go

@ -121,6 +121,7 @@ func TestObjectService_GetRetry(t *testing.T) {
ResponseHeaderTimeout: 1 * time.Second, ResponseHeaderTimeout: 1 * time.Second,
}, },
}) })
client.Conf.RetryOpt.StatusCode = []int{499}
name := "test/hello.txt" name := "test/hello.txt"
contentLength := 1024 * 1024 * 10 contentLength := 1024 * 1024 * 10
data := make([]byte, contentLength) data := make([]byte, contentLength)
@ -132,6 +133,10 @@ func TestObjectService_GetRetry(t *testing.T) {
} }
index++ index++
if index%3 != 0 { if index%3 != 0 {
if index > 6 {
w.WriteHeader(499)
return
}
time.Sleep(time.Second * 2) time.Sleep(time.Second * 2)
} }
testFormValues(t, r, vs) testFormValues(t, r, vs)
@ -163,6 +168,9 @@ func TestObjectService_GetRetry(t *testing.T) {
t.Errorf("Object.Get Failed") t.Errorf("Object.Get Failed")
} }
} }
if index != 9 {
t.Errorf("retry time error, retry count: %v\n", index)
}
} }
func TestObjectService_GetPresignedURL(t *testing.T) { func TestObjectService_GetPresignedURL(t *testing.T) {

Loading…
Cancel
Save