From d2e9d919bdd6dba423ac39f0aa2d882ff85a4963 Mon Sep 17 00:00:00 2001 From: jojoliang Date: Sun, 26 Sep 2021 16:32:21 +0800 Subject: [PATCH] add fetch task api && add cvm role --- auth.go | 134 +++++++++++++++++++++++++++++++++++- cos.go | 3 + cos_test.go | 3 +- error.go | 21 ++++++ example/object/fetch_task.go | 64 +++++++++++++++++ example/object/get_with_cvm_role.go | 52 ++++++++++++++ object.go | 86 +++++++++++++++++++++++ object_test.go | 113 +++++++++++++++++++++++++++++- 8 files changed, 470 insertions(+), 6 deletions(-) create mode 100644 example/object/fetch_task.go create mode 100644 example/object/get_with_cvm_role.go diff --git a/auth.go b/auth.go index 61f2b7b..4d7a163 100644 --- a/auth.go +++ b/auth.go @@ -3,8 +3,10 @@ package cos import ( "crypto/hmac" "crypto/sha1" + "encoding/json" "fmt" "hash" + "io/ioutil" "net/http" "net/url" "sort" @@ -13,9 +15,18 @@ import ( "time" ) -const sha1SignAlgorithm = "sha1" -const privateHeaderPrefix = "x-cos-" -const defaultAuthExpire = time.Hour +const ( + sha1SignAlgorithm = "sha1" + privateHeaderPrefix = "x-cos-" + defaultAuthExpire = time.Hour +) + +var ( + defaultCVMAuthExpire = int64(600) + defaultCVMSchema = "http" + defaultCVMMetaHost = "metadata.tencentyun.com" + defaultCVMCredURI = "latest/meta-data/cam/security-credentials" +) // 需要校验的 Headers 列表 var needSignHeaders = map[string]bool{ @@ -318,3 +329,120 @@ func (t *AuthorizationTransport) transport() http.RoundTripper { } return http.DefaultTransport } + +type CVMSecurityCredentials struct { + TmpSecretId string `json:",omitempty"` + TmpSecretKey string `json:",omitempty"` + ExpiredTime int64 `json:",omitempty"` + Expiration string `json:",omitempty"` + Token string `json:",omitempty"` + Code string `json:",omitempty"` +} + +type CVMCredentialsTransport struct { + RoleName string + Transport http.RoundTripper + secretID string + secretKey string + sessionToken string + expiredTime int64 + rwLocker sync.RWMutex +} + +func (t *CVMCredentialsTransport) GetRoles() ([]string, error) { + urlname := fmt.Sprintf("%s://%s/%s", defaultCVMSchema, defaultCVMMetaHost, defaultCVMCredURI) + resp, err := http.Get(urlname) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode > 299 { + bs, _ := ioutil.ReadAll(resp.Body) + return nil, fmt.Errorf("get cvm security-credentials role failed, StatusCode: %v, Body: %v", resp.StatusCode, string(bs)) + } + bs, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + roles := strings.Split(strings.TrimSpace(string(bs)), "\n") + if len(roles) == 0 { + return nil, fmt.Errorf("get cvm security-credentials role failed, No valid cam role was found") + } + return roles, nil +} + +// https://cloud.tencent.com/document/product/213/4934 +func (t *CVMCredentialsTransport) UpdateCredential(now int64) (string, string, string, error) { + t.rwLocker.Lock() + defer t.rwLocker.Unlock() + if t.expiredTime > now+defaultCVMAuthExpire { + return t.secretID, t.secretKey, t.sessionToken, nil + } + roleName := t.RoleName + if roleName == "" { + roles, err := t.GetRoles() + if err != nil { + return t.secretID, t.secretKey, t.sessionToken, err + } + roleName = roles[0] + } + urlname := fmt.Sprintf("%s://%s/%s/%s", defaultCVMSchema, defaultCVMMetaHost, defaultCVMCredURI, roleName) + resp, err := http.Get(urlname) + if err != nil { + return t.secretID, t.secretKey, t.sessionToken, err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode > 299 { + bs, _ := ioutil.ReadAll(resp.Body) + return t.secretID, t.secretKey, t.sessionToken, fmt.Errorf("call cvm security-credentials failed, StatusCode: %v, Body: %v", resp.StatusCode, string(bs)) + } + var cred CVMSecurityCredentials + err = json.NewDecoder(resp.Body).Decode(&cred) + if err != nil { + return t.secretID, t.secretKey, t.sessionToken, err + } + if cred.Code != "Success" { + return t.secretID, t.secretKey, t.sessionToken, fmt.Errorf("call cvm security-credentials failed, Code:%v", cred.Code) + } + t.secretID, t.secretKey, t.sessionToken, t.expiredTime = cred.TmpSecretId, cred.TmpSecretKey, cred.Token, cred.ExpiredTime + return t.secretID, t.secretKey, t.sessionToken, nil +} + +func (t *CVMCredentialsTransport) GetCredential() (string, string, string, error) { + now := time.Now().Unix() + t.rwLocker.RLock() + // 提前 defaultCVMAuthExpire 获取重新获取临时密钥 + if t.expiredTime <= now+defaultCVMAuthExpire { + expiredTime := t.expiredTime + t.rwLocker.RUnlock() + secretID, secretKey, secretToken, err := t.UpdateCredential(now) + // 获取临时密钥失败但密钥未过期 + if err != nil && now < expiredTime { + err = nil + } + return secretID, secretKey, secretToken, err + } + defer t.rwLocker.RUnlock() + return t.secretID, t.secretKey, t.sessionToken, nil +} + +func (t *CVMCredentialsTransport) RoundTrip(req *http.Request) (*http.Response, error) { + ak, sk, token, err := t.GetCredential() + if err != nil { + return nil, err + } + req = cloneRequest(req) + // 增加 Authorization header + authTime := NewAuthTime(defaultAuthExpire) + AddAuthorizationHeader(ak, sk, token, req, authTime) + + resp, err := t.transport().RoundTrip(req) + return resp, err +} + +func (t *CVMCredentialsTransport) transport() http.RoundTripper { + if t.Transport != nil { + return t.Transport + } + return http.DefaultTransport +} diff --git a/cos.go b/cos.go index e42806c..1e411d5 100644 --- a/cos.go +++ b/cos.go @@ -44,6 +44,8 @@ type BaseURL struct { BatchURL *url.URL // 访问 CI 的基础 URL CIURL *url.URL + // 访问 Fetch Task 的基础 URL + FetchURL *url.URL } // NewBucketURL 生成 BaseURL 所需的 BucketURL @@ -110,6 +112,7 @@ func NewClient(uri *BaseURL, httpClient *http.Client) *Client { baseURL.ServiceURL = uri.ServiceURL baseURL.BatchURL = uri.BatchURL baseURL.CIURL = uri.CIURL + baseURL.FetchURL = uri.FetchURL } if baseURL.ServiceURL == nil { baseURL.ServiceURL, _ = url.Parse(defaultServiceBaseURL) diff --git a/cos_test.go b/cos_test.go index ecdcebd..8fb6598 100644 --- a/cos_test.go +++ b/cos_test.go @@ -30,9 +30,8 @@ func setup() { // test server mux = http.NewServeMux() server = httptest.NewServer(mux) - u, _ := url.Parse(server.URL) - client = NewClient(&BaseURL{u, u, u, u}, nil) + client = NewClient(&BaseURL{u, u, u, u, u}, nil) } // teardown closes the test HTTP server. diff --git a/error.go b/error.go index c101339..8f0a7c8 100644 --- a/error.go +++ b/error.go @@ -1,10 +1,13 @@ package cos import ( + "encoding/json" "encoding/xml" "fmt" "io/ioutil" "net/http" + "strconv" + "strings" ) // ErrorResponse 包含 API 返回的错误信息 @@ -39,6 +42,12 @@ func (r *ErrorResponse) Error() string { r.Response.StatusCode, r.Code, r.Message, RequestID, TraceID) } +type jsonError struct { + Code int `json:"code,omitempty"` + Message string `json:"message,omitempty"` + RequestID string `json:"request_id,omitempty"` +} + // 检查 response 是否是出错时的返回的 response func checkResponse(r *http.Response) error { if c := r.StatusCode; 200 <= c && c <= 299 { @@ -49,6 +58,18 @@ func checkResponse(r *http.Response) error { if err == nil && data != nil { xml.Unmarshal(data, errorResponse) } + // 是否为 json 格式 + if errorResponse.Code == "" { + ctype := strings.TrimLeft(r.Header.Get("Content-Type"), " ") + if strings.HasPrefix(ctype, "application/json") { + var jerror jsonError + json.Unmarshal(data, &jerror) + errorResponse.Code = strconv.Itoa(jerror.Code) + errorResponse.Message = jerror.Message + errorResponse.RequestID = jerror.RequestID + } + + } return errorResponse } diff --git a/example/object/fetch_task.go b/example/object/fetch_task.go new file mode 100644 index 0000000..70afbf2 --- /dev/null +++ b/example/object/fetch_task.go @@ -0,0 +1,64 @@ +package main + +import ( + "context" + "fmt" + "github.com/agin719/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" + "net/http" + "net/url" + "os" + "time" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + u, e := url.Parse("http://ap-guangzhou.migration.myqcloud.com") + log_status(e) + b := &cos.BaseURL{BucketURL: u, FetchURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + RequestBody: true, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + bucket := "test-1259654469" + opt := &cos.PutFetchTaskOptions{ + Url: "http://" + bucket + ".cos.ap-guangzhou.myqcloud.com/exampleobject", + Key: "exampleobject", + } + + res, _, err := c.Object.PutFetchTask(context.Background(), bucket, opt) + log_status(err) + fmt.Printf("res: %+v\n", res) + + time.Sleep(time.Second * 3) + + rs, _, err := c.Object.GetFetchTask(context.Background(), bucket, res.Data.TaskId) + log_status(err) + fmt.Printf("res: %+v\n", rs) +} diff --git a/example/object/get_with_cvm_role.go b/example/object/get_with_cvm_role.go new file mode 100644 index 0000000..a5a2272 --- /dev/null +++ b/example/object/get_with_cvm_role.go @@ -0,0 +1,52 @@ +package main + +import ( + "context" + "fmt" + "net/url" + + "net/http" + + "github.com/agin719/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + // 使用 CVMCredentialsTransport + Transport: &cos.CVMCredentialsTransport{ + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + // Notice when put a large file and set need the request body, might happend out of memory error. + RequestBody: false, + ResponseHeader: true, + ResponseBody: false, + }, + }, + }) + + name := "exampleobject" + _, err := c.Object.Get(context.Background(), name, nil) + log_status(err) +} diff --git a/object.go b/object.go index 0177b83..62be451 100644 --- a/object.go +++ b/object.go @@ -1,6 +1,7 @@ package cos import ( + "bytes" "context" "crypto/md5" "encoding/hex" @@ -1428,3 +1429,88 @@ func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...st resp, err := s.client.doRetry(ctx, sendOpt) return resp, err } + +type PutFetchTaskOptions struct { + Url string `json:"Url,omitempty" header:"-" xml:"-"` + Key string `json:"Key,omitempty" header:"-" xml:"-"` + MD5 string `json:"MD5,omitempty" header:"-" xml:"-"` + OnKeyExist string `json:"OnKeyExist,omitempty" header:"-" xml:"-"` + IgnoreSameKey bool `json:"IgnoreSameKey,omitempty" header:"-" xml:"-"` + SuccessCallbackUrl string `json:"SuccessCallbackUrl,omitempty" header:"-" xml:"-"` + FailureCallbackUrl string `json:"FailureCallbackUrl,omitempty" header:"-" xml:"-"` + XOptionHeader *http.Header `json:"-", xml:"-" header:"-,omitempty"` +} + +type PutFetchTaskResult struct { + Code int `json:"code,omitempty"` + Message string `json:"message,omitempty"` + RequestId string `json:"request_id,omitempty"` + Data struct { + TaskId string `json:"taskId,omitempty"` + } `json:"Data,omitempty"` +} + +type GetFetchTaskResult struct { + Code int `json:"code,omitempty"` + Message string `json:"message,omitempty"` + RequestId string `json:"request_id,omitempty"` + Data struct { + Code string `json:"code,omitempty"` + Message string `json:"msg,omitempty"` + Percent int `json:"percent,omitempty"` + Status string `json:"status,omitempty"` + } `json:"data,omitempty"` +} + +type innerFetchTaskHeader struct { + XOptionHeader *http.Header `json:"-", xml:"-" header:"-,omitempty"` +} + +func (s *ObjectService) PutFetchTask(ctx context.Context, bucket string, opt *PutFetchTaskOptions) (*PutFetchTaskResult, *Response, error) { + var buf bytes.Buffer + var res PutFetchTaskResult + if opt == nil { + opt = &PutFetchTaskOptions{} + } + header := innerFetchTaskHeader{ + XOptionHeader: &http.Header{}, + } + if opt.XOptionHeader != nil { + header.XOptionHeader = cloneHeader(opt.XOptionHeader) + } + header.XOptionHeader.Set("Content-Type", "application/json") + bs, err := json.Marshal(opt) + if err != nil { + return nil, nil, err + } + reader := bytes.NewBuffer(bs) + sendOpt := &sendOptions{ + baseURL: s.client.BaseURL.FetchURL, + uri: fmt.Sprintf("/%s/", bucket), + method: http.MethodPost, + optHeader: &header, + body: reader, + result: &buf, + } + resp, err := s.client.send(ctx, sendOpt) + if buf.Len() > 0 { + err = json.Unmarshal(buf.Bytes(), &res) + } + return &res, resp, err +} + +func (s *ObjectService) GetFetchTask(ctx context.Context, bucket string, taskid string) (*GetFetchTaskResult, *Response, error) { + var buf bytes.Buffer + var res GetFetchTaskResult + sendOpt := &sendOptions{ + baseURL: s.client.BaseURL.FetchURL, + uri: fmt.Sprintf("/%s/%s", bucket, encodeURIComponent(taskid)), + method: http.MethodGet, + result: &buf, + } + resp, err := s.client.send(ctx, sendOpt) + if buf.Len() > 0 { + err = json.Unmarshal(buf.Bytes(), &res) + } + return &res, resp, err +} diff --git a/object_test.go b/object_test.go index 924f439..bfc7c4a 100644 --- a/object_test.go +++ b/object_test.go @@ -5,6 +5,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "encoding/json" "encoding/xml" "errors" "fmt" @@ -106,7 +107,7 @@ func TestObjectService_GetRetry(t *testing.T) { setup() defer teardown() u, _ := url.Parse(server.URL) - client := NewClient(&BaseURL{u, u, u, u}, &http.Client{ + client := NewClient(&BaseURL{u, u, u, u, u}, &http.Client{ Transport: &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: (&net.Dialer{ @@ -1051,3 +1052,113 @@ func TestObjectService_DeleteTagging(t *testing.T) { } } + +func TestObjectService_PutFetchTask(t *testing.T) { + setup() + defer teardown() + + opt := &PutFetchTaskOptions{ + Url: "http://examplebucket-1250000000.cos.ap-guangzhou.myqcloud.com/exampleobject", + Key: "exampleobject", + MD5: "MD5", + OnKeyExist: "OnKeyExist", + IgnoreSameKey: true, + SuccessCallbackUrl: "SuccessCallbackUrl", + FailureCallbackUrl: "FailureCallbackUrl", + XOptionHeader: &http.Header{}, + } + opt.XOptionHeader.Add("Content-Type", "application/json") + opt.XOptionHeader.Add("Content-Type", "application/xml") + opt.XOptionHeader.Add("Cache-Control", "max-age=10") + opt.XOptionHeader.Add("Cache-Control", "max-stale=10") + res := &PutFetchTaskResult{ + Code: 0, + Message: "SUCCESS", + RequestId: "NjE0ZGMxMDhfMmZjMjNiMGFfNWY2N18yOTRjYWE=", + Data: struct { + TaskId string `json:"taskId,omitempty"` + }{ + TaskId: "NjE0ZGMxMDhfMmZjMjNiMGFfNWY2N18yOTRjYWE=", + }, + } + mux.HandleFunc("/examplebucket-1250000000/", func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodPost) + opt.XOptionHeader.Set("Content-Type", "application/json") + for k, v := range *opt.XOptionHeader { + if k != "Content-Type" { + if !reflect.DeepEqual(r.Header[k], v) { + t.Errorf("Object.PutFetchTask request header: %+v, want %+v", r.Header[k], v) + } + continue + } + if r.Header.Get(k) != "application/json" || len(r.Header[k]) != 1 { + t.Errorf("Object.PutFetchTask request header: %+v, want %+v", r.Header[k], v) + } + } + v := new(PutFetchTaskOptions) + json.NewDecoder(r.Body).Decode(v) + want := opt + v.XOptionHeader = opt.XOptionHeader + if !reflect.DeepEqual(v, want) { + t.Errorf("Object.PutFetchTask request body: %+v, want %+v", v, want) + } + fmt.Fprint(w, `{ + "code":0, + "message":"SUCCESS", + "request_id":"NjE0ZGMxMDhfMmZjMjNiMGFfNWY2N18yOTRjYWE=", + "data":{"taskid":"NjE0ZGMxMDhfMmZjMjNiMGFfNWY2N18yOTRjYWE="} + }`) + }) + + r, _, err := client.Object.PutFetchTask(context.Background(), "examplebucket-1250000000", opt) + if err != nil { + t.Fatalf("Object.PutFetchTask returned error: %v", err) + } + if !reflect.DeepEqual(r, res) { + t.Errorf("object.PutFetchTask res: %+v, want: %+v", r, res) + } +} + +func TestObjectService_GetFetchTask(t *testing.T) { + setup() + defer teardown() + + res := &GetFetchTaskResult{ + Code: 0, + Message: "SUCCESS", + RequestId: "NjE0ZGNiMDVfMmZjMjNiMGFfNWY2N18yOTRjYWM=", + Data: struct { + Code string `json:"code,omitempty"` + Message string `json:"msg,omitempty"` + Percent int `json:"percent,omitempty"` + Status string `json:"status,omitempty"` + }{ + Code: "Forbidden", + Message: "The specified download can not be allowed.", + Percent: 0, + Status: "TASK_FAILED", + }, + } + mux.HandleFunc("/examplebucket-1250000000/NjE0ZGMxMDhfMmZjMjNiMGFfNWY2N18yOTRjYWE=", func(w http.ResponseWriter, r *http.Request) { + testMethod(t, r, http.MethodGet) + fmt.Fprint(w, `{ + "code":0, + "message":"SUCCESS", + "request_id":"NjE0ZGNiMDVfMmZjMjNiMGFfNWY2N18yOTRjYWM=", + "data": { + "code":"Forbidden", + "msg":"The specified download can not be allowed.", + "percent":0, + "status":"TASK_FAILED" + } + }`) + }) + + r, _, err := client.Object.GetFetchTask(context.Background(), "examplebucket-1250000000", "NjE0ZGMxMDhfMmZjMjNiMGFfNWY2N18yOTRjYWE=") + if err != nil { + t.Fatalf("Object.GetFetchTask returned error: %v", err) + } + if !reflect.DeepEqual(r, res) { + t.Errorf("object.GetFetchTask res: %+v, want: %+v", r, res) + } +}