Browse Source

add fetch task api && add cvm role

master
jojoliang 3 years ago
parent
commit
d2e9d919bd
  1. 134
      auth.go
  2. 3
      cos.go
  3. 3
      cos_test.go
  4. 21
      error.go
  5. 64
      example/object/fetch_task.go
  6. 52
      example/object/get_with_cvm_role.go
  7. 86
      object.go
  8. 113
      object_test.go

134
auth.go

@ -3,8 +3,10 @@ package cos
import (
"crypto/hmac"
"crypto/sha1"
"encoding/json"
"fmt"
"hash"
"io/ioutil"
"net/http"
"net/url"
"sort"
@ -13,9 +15,18 @@ import (
"time"
)
const sha1SignAlgorithm = "sha1"
const privateHeaderPrefix = "x-cos-"
const defaultAuthExpire = time.Hour
const (
sha1SignAlgorithm = "sha1"
privateHeaderPrefix = "x-cos-"
defaultAuthExpire = time.Hour
)
var (
defaultCVMAuthExpire = int64(600)
defaultCVMSchema = "http"
defaultCVMMetaHost = "metadata.tencentyun.com"
defaultCVMCredURI = "latest/meta-data/cam/security-credentials"
)
// 需要校验的 Headers 列表
var needSignHeaders = map[string]bool{
@ -318,3 +329,120 @@ func (t *AuthorizationTransport) transport() http.RoundTripper {
}
return http.DefaultTransport
}
type CVMSecurityCredentials struct {
TmpSecretId string `json:",omitempty"`
TmpSecretKey string `json:",omitempty"`
ExpiredTime int64 `json:",omitempty"`
Expiration string `json:",omitempty"`
Token string `json:",omitempty"`
Code string `json:",omitempty"`
}
type CVMCredentialsTransport struct {
RoleName string
Transport http.RoundTripper
secretID string
secretKey string
sessionToken string
expiredTime int64
rwLocker sync.RWMutex
}
func (t *CVMCredentialsTransport) GetRoles() ([]string, error) {
urlname := fmt.Sprintf("%s://%s/%s", defaultCVMSchema, defaultCVMMetaHost, defaultCVMCredURI)
resp, err := http.Get(urlname)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
bs, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("get cvm security-credentials role failed, StatusCode: %v, Body: %v", resp.StatusCode, string(bs))
}
bs, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
roles := strings.Split(strings.TrimSpace(string(bs)), "\n")
if len(roles) == 0 {
return nil, fmt.Errorf("get cvm security-credentials role failed, No valid cam role was found")
}
return roles, nil
}
// https://cloud.tencent.com/document/product/213/4934
func (t *CVMCredentialsTransport) UpdateCredential(now int64) (string, string, string, error) {
t.rwLocker.Lock()
defer t.rwLocker.Unlock()
if t.expiredTime > now+defaultCVMAuthExpire {
return t.secretID, t.secretKey, t.sessionToken, nil
}
roleName := t.RoleName
if roleName == "" {
roles, err := t.GetRoles()
if err != nil {
return t.secretID, t.secretKey, t.sessionToken, err
}
roleName = roles[0]
}
urlname := fmt.Sprintf("%s://%s/%s/%s", defaultCVMSchema, defaultCVMMetaHost, defaultCVMCredURI, roleName)
resp, err := http.Get(urlname)
if err != nil {
return t.secretID, t.secretKey, t.sessionToken, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
bs, _ := ioutil.ReadAll(resp.Body)
return t.secretID, t.secretKey, t.sessionToken, fmt.Errorf("call cvm security-credentials failed, StatusCode: %v, Body: %v", resp.StatusCode, string(bs))
}
var cred CVMSecurityCredentials
err = json.NewDecoder(resp.Body).Decode(&cred)
if err != nil {
return t.secretID, t.secretKey, t.sessionToken, err
}
if cred.Code != "Success" {
return t.secretID, t.secretKey, t.sessionToken, fmt.Errorf("call cvm security-credentials failed, Code:%v", cred.Code)
}
t.secretID, t.secretKey, t.sessionToken, t.expiredTime = cred.TmpSecretId, cred.TmpSecretKey, cred.Token, cred.ExpiredTime
return t.secretID, t.secretKey, t.sessionToken, nil
}
func (t *CVMCredentialsTransport) GetCredential() (string, string, string, error) {
now := time.Now().Unix()
t.rwLocker.RLock()
// 提前 defaultCVMAuthExpire 获取重新获取临时密钥
if t.expiredTime <= now+defaultCVMAuthExpire {
expiredTime := t.expiredTime
t.rwLocker.RUnlock()
secretID, secretKey, secretToken, err := t.UpdateCredential(now)
// 获取临时密钥失败但密钥未过期
if err != nil && now < expiredTime {
err = nil
}
return secretID, secretKey, secretToken, err
}
defer t.rwLocker.RUnlock()
return t.secretID, t.secretKey, t.sessionToken, nil
}
func (t *CVMCredentialsTransport) RoundTrip(req *http.Request) (*http.Response, error) {
ak, sk, token, err := t.GetCredential()
if err != nil {
return nil, err
}
req = cloneRequest(req)
// 增加 Authorization header
authTime := NewAuthTime(defaultAuthExpire)
AddAuthorizationHeader(ak, sk, token, req, authTime)
resp, err := t.transport().RoundTrip(req)
return resp, err
}
func (t *CVMCredentialsTransport) transport() http.RoundTripper {
if t.Transport != nil {
return t.Transport
}
return http.DefaultTransport
}

3
cos.go

@ -44,6 +44,8 @@ type BaseURL struct {
BatchURL *url.URL
// 访问 CI 的基础 URL
CIURL *url.URL
// 访问 Fetch Task 的基础 URL
FetchURL *url.URL
}
// NewBucketURL 生成 BaseURL 所需的 BucketURL
@ -110,6 +112,7 @@ func NewClient(uri *BaseURL, httpClient *http.Client) *Client {
baseURL.ServiceURL = uri.ServiceURL
baseURL.BatchURL = uri.BatchURL
baseURL.CIURL = uri.CIURL
baseURL.FetchURL = uri.FetchURL
}
if baseURL.ServiceURL == nil {
baseURL.ServiceURL, _ = url.Parse(defaultServiceBaseURL)

3
cos_test.go

@ -30,9 +30,8 @@ func setup() {
// test server
mux = http.NewServeMux()
server = httptest.NewServer(mux)
u, _ := url.Parse(server.URL)
client = NewClient(&BaseURL{u, u, u, u}, nil)
client = NewClient(&BaseURL{u, u, u, u, u}, nil)
}
// teardown closes the test HTTP server.

21
error.go

@ -1,10 +1,13 @@
package cos
import (
"encoding/json"
"encoding/xml"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
)
// ErrorResponse 包含 API 返回的错误信息
@ -39,6 +42,12 @@ func (r *ErrorResponse) Error() string {
r.Response.StatusCode, r.Code, r.Message, RequestID, TraceID)
}
type jsonError struct {
Code int `json:"code,omitempty"`
Message string `json:"message,omitempty"`
RequestID string `json:"request_id,omitempty"`
}
// 检查 response 是否是出错时的返回的 response
func checkResponse(r *http.Response) error {
if c := r.StatusCode; 200 <= c && c <= 299 {
@ -49,6 +58,18 @@ func checkResponse(r *http.Response) error {
if err == nil && data != nil {
xml.Unmarshal(data, errorResponse)
}
// 是否为 json 格式
if errorResponse.Code == "" {
ctype := strings.TrimLeft(r.Header.Get("Content-Type"), " ")
if strings.HasPrefix(ctype, "application/json") {
var jerror jsonError
json.Unmarshal(data, &jerror)
errorResponse.Code = strconv.Itoa(jerror.Code)
errorResponse.Message = jerror.Message
errorResponse.RequestID = jerror.RequestID
}
}
return errorResponse
}

64
example/object/fetch_task.go

@ -0,0 +1,64 @@
package main
import (
"context"
"fmt"
"github.com/agin719/cos-go-sdk-v5"
"github.com/tencentyun/cos-go-sdk-v5/debug"
"net/http"
"net/url"
"os"
"time"
)
func log_status(err error) {
if err == nil {
return
}
if cos.IsNotFoundError(err) {
// WARN
fmt.Println("WARN: Resource is not existed")
} else if e, ok := cos.IsCOSError(err); ok {
fmt.Printf("ERROR: Code: %v\n", e.Code)
fmt.Printf("ERROR: Message: %v\n", e.Message)
fmt.Printf("ERROR: Resource: %v\n", e.Resource)
fmt.Printf("ERROR: RequestId: %v\n", e.RequestID)
// ERROR
} else {
fmt.Printf("ERROR: %v\n", err)
// ERROR
}
}
func main() {
u, e := url.Parse("http://ap-guangzhou.migration.myqcloud.com")
log_status(e)
b := &cos.BaseURL{BucketURL: u, FetchURL: u}
c := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: os.Getenv("COS_SECRETID"),
SecretKey: os.Getenv("COS_SECRETKEY"),
Transport: &debug.DebugRequestTransport{
RequestHeader: true,
RequestBody: true,
ResponseHeader: true,
ResponseBody: true,
},
},
})
bucket := "test-1259654469"
opt := &cos.PutFetchTaskOptions{
Url: "http://" + bucket + ".cos.ap-guangzhou.myqcloud.com/exampleobject",
Key: "exampleobject",
}
res, _, err := c.Object.PutFetchTask(context.Background(), bucket, opt)
log_status(err)
fmt.Printf("res: %+v\n", res)
time.Sleep(time.Second * 3)
rs, _, err := c.Object.GetFetchTask(context.Background(), bucket, res.Data.TaskId)
log_status(err)
fmt.Printf("res: %+v\n", rs)
}

52
example/object/get_with_cvm_role.go

@ -0,0 +1,52 @@
package main
import (
"context"
"fmt"
"net/url"
"net/http"
"github.com/agin719/cos-go-sdk-v5"
"github.com/tencentyun/cos-go-sdk-v5/debug"
)
func log_status(err error) {
if err == nil {
return
}
if cos.IsNotFoundError(err) {
// WARN
fmt.Println("WARN: Resource is not existed")
} else if e, ok := cos.IsCOSError(err); ok {
fmt.Printf("ERROR: Code: %v\n", e.Code)
fmt.Printf("ERROR: Message: %v\n", e.Message)
fmt.Printf("ERROR: Resource: %v\n", e.Resource)
fmt.Printf("ERROR: RequestId: %v\n", e.RequestID)
// ERROR
} else {
fmt.Printf("ERROR: %v\n", err)
// ERROR
}
}
func main() {
u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com")
b := &cos.BaseURL{BucketURL: u}
c := cos.NewClient(b, &http.Client{
// 使用 CVMCredentialsTransport
Transport: &cos.CVMCredentialsTransport{
Transport: &debug.DebugRequestTransport{
RequestHeader: true,
// Notice when put a large file and set need the request body, might happend out of memory error.
RequestBody: false,
ResponseHeader: true,
ResponseBody: false,
},
},
})
name := "exampleobject"
_, err := c.Object.Get(context.Background(), name, nil)
log_status(err)
}

86
object.go

@ -1,6 +1,7 @@
package cos
import (
"bytes"
"context"
"crypto/md5"
"encoding/hex"
@ -1428,3 +1429,88 @@ func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...st
resp, err := s.client.doRetry(ctx, sendOpt)
return resp, err
}
type PutFetchTaskOptions struct {
Url string `json:"Url,omitempty" header:"-" xml:"-"`
Key string `json:"Key,omitempty" header:"-" xml:"-"`
MD5 string `json:"MD5,omitempty" header:"-" xml:"-"`
OnKeyExist string `json:"OnKeyExist,omitempty" header:"-" xml:"-"`
IgnoreSameKey bool `json:"IgnoreSameKey,omitempty" header:"-" xml:"-"`
SuccessCallbackUrl string `json:"SuccessCallbackUrl,omitempty" header:"-" xml:"-"`
FailureCallbackUrl string `json:"FailureCallbackUrl,omitempty" header:"-" xml:"-"`
XOptionHeader *http.Header `json:"-", xml:"-" header:"-,omitempty"`
}
type PutFetchTaskResult struct {
Code int `json:"code,omitempty"`
Message string `json:"message,omitempty"`
RequestId string `json:"request_id,omitempty"`
Data struct {
TaskId string `json:"taskId,omitempty"`
} `json:"Data,omitempty"`
}
type GetFetchTaskResult struct {
Code int `json:"code,omitempty"`
Message string `json:"message,omitempty"`
RequestId string `json:"request_id,omitempty"`
Data struct {
Code string `json:"code,omitempty"`
Message string `json:"msg,omitempty"`
Percent int `json:"percent,omitempty"`
Status string `json:"status,omitempty"`
} `json:"data,omitempty"`
}
type innerFetchTaskHeader struct {
XOptionHeader *http.Header `json:"-", xml:"-" header:"-,omitempty"`
}
func (s *ObjectService) PutFetchTask(ctx context.Context, bucket string, opt *PutFetchTaskOptions) (*PutFetchTaskResult, *Response, error) {
var buf bytes.Buffer
var res PutFetchTaskResult
if opt == nil {
opt = &PutFetchTaskOptions{}
}
header := innerFetchTaskHeader{
XOptionHeader: &http.Header{},
}
if opt.XOptionHeader != nil {
header.XOptionHeader = cloneHeader(opt.XOptionHeader)
}
header.XOptionHeader.Set("Content-Type", "application/json")
bs, err := json.Marshal(opt)
if err != nil {
return nil, nil, err
}
reader := bytes.NewBuffer(bs)
sendOpt := &sendOptions{
baseURL: s.client.BaseURL.FetchURL,
uri: fmt.Sprintf("/%s/", bucket),
method: http.MethodPost,
optHeader: &header,
body: reader,
result: &buf,
}
resp, err := s.client.send(ctx, sendOpt)
if buf.Len() > 0 {
err = json.Unmarshal(buf.Bytes(), &res)
}
return &res, resp, err
}
func (s *ObjectService) GetFetchTask(ctx context.Context, bucket string, taskid string) (*GetFetchTaskResult, *Response, error) {
var buf bytes.Buffer
var res GetFetchTaskResult
sendOpt := &sendOptions{
baseURL: s.client.BaseURL.FetchURL,
uri: fmt.Sprintf("/%s/%s", bucket, encodeURIComponent(taskid)),
method: http.MethodGet,
result: &buf,
}
resp, err := s.client.send(ctx, sendOpt)
if buf.Len() > 0 {
err = json.Unmarshal(buf.Bytes(), &res)
}
return &res, resp, err
}

113
object_test.go

@ -5,6 +5,7 @@ import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
@ -106,7 +107,7 @@ func TestObjectService_GetRetry(t *testing.T) {
setup()
defer teardown()
u, _ := url.Parse(server.URL)
client := NewClient(&BaseURL{u, u, u, u}, &http.Client{
client := NewClient(&BaseURL{u, u, u, u, u}, &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
@ -1051,3 +1052,113 @@ func TestObjectService_DeleteTagging(t *testing.T) {
}
}
func TestObjectService_PutFetchTask(t *testing.T) {
setup()
defer teardown()
opt := &PutFetchTaskOptions{
Url: "http://examplebucket-1250000000.cos.ap-guangzhou.myqcloud.com/exampleobject",
Key: "exampleobject",
MD5: "MD5",
OnKeyExist: "OnKeyExist",
IgnoreSameKey: true,
SuccessCallbackUrl: "SuccessCallbackUrl",
FailureCallbackUrl: "FailureCallbackUrl",
XOptionHeader: &http.Header{},
}
opt.XOptionHeader.Add("Content-Type", "application/json")
opt.XOptionHeader.Add("Content-Type", "application/xml")
opt.XOptionHeader.Add("Cache-Control", "max-age=10")
opt.XOptionHeader.Add("Cache-Control", "max-stale=10")
res := &PutFetchTaskResult{
Code: 0,
Message: "SUCCESS",
RequestId: "NjE0ZGMxMDhfMmZjMjNiMGFfNWY2N18yOTRjYWE=",
Data: struct {
TaskId string `json:"taskId,omitempty"`
}{
TaskId: "NjE0ZGMxMDhfMmZjMjNiMGFfNWY2N18yOTRjYWE=",
},
}
mux.HandleFunc("/examplebucket-1250000000/", func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodPost)
opt.XOptionHeader.Set("Content-Type", "application/json")
for k, v := range *opt.XOptionHeader {
if k != "Content-Type" {
if !reflect.DeepEqual(r.Header[k], v) {
t.Errorf("Object.PutFetchTask request header: %+v, want %+v", r.Header[k], v)
}
continue
}
if r.Header.Get(k) != "application/json" || len(r.Header[k]) != 1 {
t.Errorf("Object.PutFetchTask request header: %+v, want %+v", r.Header[k], v)
}
}
v := new(PutFetchTaskOptions)
json.NewDecoder(r.Body).Decode(v)
want := opt
v.XOptionHeader = opt.XOptionHeader
if !reflect.DeepEqual(v, want) {
t.Errorf("Object.PutFetchTask request body: %+v, want %+v", v, want)
}
fmt.Fprint(w, `{
"code":0,
"message":"SUCCESS",
"request_id":"NjE0ZGMxMDhfMmZjMjNiMGFfNWY2N18yOTRjYWE=",
"data":{"taskid":"NjE0ZGMxMDhfMmZjMjNiMGFfNWY2N18yOTRjYWE="}
}`)
})
r, _, err := client.Object.PutFetchTask(context.Background(), "examplebucket-1250000000", opt)
if err != nil {
t.Fatalf("Object.PutFetchTask returned error: %v", err)
}
if !reflect.DeepEqual(r, res) {
t.Errorf("object.PutFetchTask res: %+v, want: %+v", r, res)
}
}
func TestObjectService_GetFetchTask(t *testing.T) {
setup()
defer teardown()
res := &GetFetchTaskResult{
Code: 0,
Message: "SUCCESS",
RequestId: "NjE0ZGNiMDVfMmZjMjNiMGFfNWY2N18yOTRjYWM=",
Data: struct {
Code string `json:"code,omitempty"`
Message string `json:"msg,omitempty"`
Percent int `json:"percent,omitempty"`
Status string `json:"status,omitempty"`
}{
Code: "Forbidden",
Message: "The specified download can not be allowed.",
Percent: 0,
Status: "TASK_FAILED",
},
}
mux.HandleFunc("/examplebucket-1250000000/NjE0ZGMxMDhfMmZjMjNiMGFfNWY2N18yOTRjYWE=", func(w http.ResponseWriter, r *http.Request) {
testMethod(t, r, http.MethodGet)
fmt.Fprint(w, `{
"code":0,
"message":"SUCCESS",
"request_id":"NjE0ZGNiMDVfMmZjMjNiMGFfNWY2N18yOTRjYWM=",
"data": {
"code":"Forbidden",
"msg":"The specified download can not be allowed.",
"percent":0,
"status":"TASK_FAILED"
}
}`)
})
r, _, err := client.Object.GetFetchTask(context.Background(), "examplebucket-1250000000", "NjE0ZGMxMDhfMmZjMjNiMGFfNWY2N18yOTRjYWE=")
if err != nil {
t.Fatalf("Object.GetFetchTask returned error: %v", err)
}
if !reflect.DeepEqual(r, res) {
t.Errorf("object.GetFetchTask res: %+v, want: %+v", r, res)
}
}
Loading…
Cancel
Save