diff --git a/ci.go b/ci.go index 2cf1713..60fcff0 100644 --- a/ci.go +++ b/ci.go @@ -1,14 +1,18 @@ package cos import ( + "context" "encoding/json" + "encoding/xml" + "net/http" ) +type CIService service + type PicOperations struct { IsPicInfo int `json:"is_pic_info,omitempty"` Rules []PicOperationsRules `json:"rules,omitemtpy"` } - type PicOperationsRules struct { Bucket string `json:"bucket,omitempty"` FileId string `json:"fileid"` @@ -16,9 +20,180 @@ type PicOperationsRules struct { } func EncodePicOperations(pic *PicOperations) string { + if pic == nil { + return "" + } bs, err := json.Marshal(pic) if err != nil { return "" } return string(bs) } + +type ImageProcessResult struct { + XMLName xml.Name `xml:"UploadResult"` + OriginalInfo *PicOriginalInfo `xml:"OriginalInfo,omitempty"` + ProcessObject *PicProcessObject `xml:"ProcessResults>Object,omitempty"` +} +type PicOriginalInfo struct { + Key string `xml:"Key,omitempty"` + Location string `xml:"Location,omitempty"` + ImageInfo *PicImageInfo `xml:"ImageInfo,omitempty"` +} +type PicImageInfo struct { + Format string `xml:"Format,omitempty"` + Width int `xml:"Width,omitempty"` + Height int `xml:"Height,omitempty"` + Size int `xml:"Size,omitempty"` + Quality int `xml:"Quality,omitempty"` +} +type PicProcessObject struct { + Key string `xml:"Key,omitempty"` + Location string `xml:"Location,omitempty"` + Format string `xml:"Format,omitempty"` + Width int `xml:"Width,omitempty"` + Height int `xml:"Height,omitempty"` + Size int `xml:"Size,omitempty"` + Quality int `xml:"Quality,omitempty"` +} + +type picOperationsHeader struct { + PicOperations string `header:"Pic-Operations" xml:"-" url:"-"` +} + +type ImageProcessOptions = PicOperations + +// 云上数据处理 https://cloud.tencent.com/document/product/460/18147 +func (s *CIService) ImageProcess(ctx context.Context, name string, opt *ImageProcessOptions) (*ImageProcessResult, *Response, error) { + header := &picOperationsHeader{ + PicOperations: EncodePicOperations(opt), + } + var res ImageProcessResult + sendOpt := sendOptions{ + baseURL: s.client.BaseURL.BucketURL, + uri: "/" + encodeURIComponent(name) + "?image_process", + method: http.MethodPost, + optHeader: header, + result: &res, + } + resp, err := s.client.send(ctx, &sendOpt) + return &res, resp, err +} + +type ImageRecognitionOptions struct { + CIProcess string `url:"ci-process,omitempty"` + DetectType string `url:"detect-type,omitempty"` +} + +type ImageRecognitionResult struct { + XMLName xml.Name `xml:"RecognitionResult"` + PornInfo *RecognitionInfo `xml:"PornInfo,omitempty"` + TerroristInfo *RecognitionInfo `xml:"TerroristInfo,omitempty"` + PoliticsInfo *RecognitionInfo `xml:"PoliticsInfo,omitempty"` + AdsInfo *RecognitionInfo `xml:"AdsInfo,omitempty"` +} +type RecognitionInfo struct { + Code int `xml:"Code,omitempty"` + Msg string `xml:"Msg,omitempty"` + HitFlag int `xml:"HitFlag,omitempty"` + Score int `xml:"Score,omitempty"` + Label string `xml:"Label,omitempty"` + Count int `xml:"Count,omitempty"` +} + +// 图片审核 https://cloud.tencent.com/document/product/460/37318 +func (s *CIService) ImageRecognition(ctx context.Context, name string, opt *ImageRecognitionOptions) (*ImageRecognitionResult, *Response, error) { + if opt != nil && opt.CIProcess == "" { + opt.CIProcess = "sensitive-content-recognition" + } + var res ImageRecognitionResult + sendOpt := sendOptions{ + baseURL: s.client.BaseURL.BucketURL, + uri: "/" + encodeURIComponent(name), + method: http.MethodGet, + optQuery: opt, + result: &res, + } + resp, err := s.client.send(ctx, &sendOpt) + return &res, resp, err +} + +type PutVideoAuditingJobOptions struct { + XMLName xml.Name `xml:"Request"` + InputObject string `xml:"Input>Object"` + Conf *VideoAuditingJobConf `xml:"Conf"` +} +type VideoAuditingJobConf struct { + DetectType string `xml:",omitempty"` + Snapshot *PutVideoAuditingJobSnapshot `xml:",omitempty"` + Callback string `xml:",omitempty"` +} +type PutVideoAuditingJobSnapshot struct { + Mode string `xml:",omitempty"` + Count int `xml:",omitempty"` + TimeInterval float32 `xml:",omitempty"` + Start float32 `xml:",omitempty"` +} + +type PutVideoAuditingJobResult struct { + XMLName xml.Name `xml:"Response"` + JobsDetail struct { + JobId string `xml:"JobId,omitempty"` + State string `xml:"State,omitempty"` + CreationTime string `xml:"CreationTime,omitempty"` + Object string `xml:"Object,omitempty"` + } `xml:"JobsDetail,omitempty"` +} + +func (s *CIService) PutVideoAuditingJob(ctx context.Context, opt *PutVideoAuditingJobOptions) (*PutVideoAuditingJobResult, *Response, error) { + var res PutVideoAuditingJobResult + sendOpt := sendOptions{ + baseURL: s.client.BaseURL.CIURL, + uri: "/video/auditing", + method: http.MethodPost, + body: opt, + result: &res, + } + resp, err := s.client.send(ctx, &sendOpt) + return &res, resp, err +} + +type GetVideoAuditingJobResult struct { + XMLName xml.Name `xml:"Response"` + JobsDetail *VideoAuditingJobDetail `xml:",omitempty"` + NonExistJobIds string `xml:",omitempty"` +} +type VideoAuditingJobDetail struct { + Code string `xml:",omitempty"` + Message string `xml:",omitempty"` + JobId string `xml:",omitempty"` + State string `xml:",omitempty"` + CreationTime string `xml:",omitempty"` + Object string `xml:",omitempty"` + SnapshotCount string `xml:",omitempty"` + result int `xml:",omitempty"` + PornInfo *RecognitionInfo `xml:",omitempty"` + TerrorismInfo *RecognitionInfo `xml:",omitempty"` + PoliticsInfo *RecognitionInfo `xml:",omitempty"` + AdsInfo *RecognitionInfo `xml:",omitempty"` + Snapshot *GetVideoAuditingJobSnapshot `xml:",omitempty"` +} +type GetVideoAuditingJobSnapshot struct { + Url string `xml:",omitempty"` + PornInfo *RecognitionInfo `xml:",omitempty"` + TerrorismInfo *RecognitionInfo `xml:",omitempty"` + PoliticsInfo *RecognitionInfo `xml:",omitempty"` + AdsInfo *RecognitionInfo `xml:",omitempty"` +} + +func (s *CIService) GetVideoAuditingJob(ctx context.Context, jobid string) (*GetVideoAuditingJobResult, *Response, error) { + var res GetVideoAuditingJobResult + sendOpt := sendOptions{ + baseURL: s.client.BaseURL.CIURL, + uri: "/video/auditing/" + jobid, + method: http.MethodGet, + result: &res, + } + resp, err := s.client.send(ctx, &sendOpt) + return &res, resp, err +} diff --git a/cos.go b/cos.go index 665084f..b3a7d6c 100644 --- a/cos.go +++ b/cos.go @@ -22,7 +22,7 @@ import ( const ( // Version current go sdk version - Version = "0.7.10" + Version = "0.7.11" userAgent = "cos-go-sdk-v5/" + Version contentTypeXML = "application/xml" defaultServiceBaseURL = "http://service.cos.myqcloud.com" @@ -42,6 +42,8 @@ type BaseURL struct { ServiceURL *url.URL // 访问 job API 的基础 URL (不包含 path 部分): http://example.com BatchURL *url.URL + // 访问 CI 的基础 URL + CIURL *url.URL } // NewBucketURL 生成 BaseURL 所需的 BucketURL @@ -82,6 +84,7 @@ type Client struct { Bucket *BucketService Object *ObjectService Batch *BatchService + CI *CIService } type service struct { @@ -99,6 +102,7 @@ func NewClient(uri *BaseURL, httpClient *http.Client) *Client { baseURL.BucketURL = uri.BucketURL baseURL.ServiceURL = uri.ServiceURL baseURL.BatchURL = uri.BatchURL + baseURL.CIURL = uri.CIURL } if baseURL.ServiceURL == nil { baseURL.ServiceURL, _ = url.Parse(defaultServiceBaseURL) @@ -114,6 +118,7 @@ func NewClient(uri *BaseURL, httpClient *http.Client) *Client { c.Bucket = (*BucketService)(&c.common) c.Object = (*ObjectService)(&c.common) c.Batch = (*BatchService)(&c.common) + c.CI = (*CIService)(&c.common) return c } @@ -244,9 +249,6 @@ func (c *Client) send(ctx context.Context, opt *sendOptions) (resp *Response, er } resp, err = c.doAPI(ctx, req, opt.result, !opt.disableCloseBody) - if err != nil { - return - } return } diff --git a/cos_test.go b/cos_test.go index f312779..ecdcebd 100644 --- a/cos_test.go +++ b/cos_test.go @@ -32,7 +32,7 @@ func setup() { server = httptest.NewServer(mux) u, _ := url.Parse(server.URL) - client = NewClient(&BaseURL{u, u, u}, nil) + client = NewClient(&BaseURL{u, u, u, u}, nil) } // teardown closes the test HTTP server. diff --git a/example/object/ci_image_process.go b/example/object/ci_image_process.go new file mode 100644 index 0000000..6fc1584 --- /dev/null +++ b/example/object/ci_image_process.go @@ -0,0 +1,63 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "net/url" + "os" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + // Notice when put a large file and set need the request body, might happend out of memory error. + RequestBody: false, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + + opt := &cos.ImageProcessOptions{ + IsPicInfo: 1, + Rules: []cos.PicOperationsRules{ + { + FileId: "format.jpg", + Rule: "imageView2/format/png", + }, + }, + } + name := "test.jpg" + res, _, err := c.CI.ImageProcess(context.Background(), name, opt) + log_status(err) + fmt.Printf("%+v\n", res) +} diff --git a/example/object/ci_image_recognition.go b/example/object/ci_image_recognition.go new file mode 100644 index 0000000..a17fd18 --- /dev/null +++ b/example/object/ci_image_recognition.go @@ -0,0 +1,56 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "net/url" + "os" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + RequestBody: true, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + opt := &cos.ImageRecognitionOptions{ + DetectType: "porn,terrorist,politics", + } + + name := "test.jpg" + res, _, err := c.CI.ImageRecognition(context.Background(), name, opt) + log_status(err) + fmt.Printf("%+v\n", res) +} diff --git a/example/object/ci_video_auditing_job.go b/example/object/ci_video_auditing_job.go new file mode 100644 index 0000000..9a5cbb7 --- /dev/null +++ b/example/object/ci_video_auditing_job.go @@ -0,0 +1,71 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "net/url" + "os" + "time" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + bu, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + cu, _ := url.Parse("https://test-1259654469.ci.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: bu, CIURL: cu} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + RequestBody: true, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + opt := &cos.PutVideoAuditingJobOptions{ + InputObject: "demo.mp4", + Conf: &cos.VideoAuditingJobConf{ + DetectType: "Porn,Terrorism,Politics,Ads", + Snapshot: &cos.PutVideoAuditingJobSnapshot{ + Mode: "Interval", + Start: 0.5, + TimeInterval: 50.5, + Count: 100, + }, + }, + } + + res, _, err := c.CI.PutVideoAuditingJob(context.Background(), opt) + log_status(err) + fmt.Printf("%+v\n", res) + + time.Sleep(3 * time.Second) + res2, _, err := c.CI.GetVideoAuditingJob(context.Background(), res.JobsDetail.JobId) + log_status(err) + fmt.Printf("%+v\n", res2) +} diff --git a/example/object/list_uploads.go b/example/object/list_uploads.go new file mode 100644 index 0000000..86d74c4 --- /dev/null +++ b/example/object/list_uploads.go @@ -0,0 +1,98 @@ +package main + +import ( + "context" + "fmt" + "math/rand" + "net/url" + "os" + "strings" + + "net/http" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func initUpload(c *cos.Client, name string) *cos.InitiateMultipartUploadResult { + v, _, err := c.Object.InitiateMultipartUpload(context.Background(), name, nil) + log_status(err) + fmt.Printf("%#v\n", v) + return v +} + +func uploadPart(c *cos.Client, name string, uploadID string, blockSize, n int) string { + + b := make([]byte, blockSize) + if _, err := rand.Read(b); err != nil { + log_status(err) + } + s := fmt.Sprintf("%X", b) + f := strings.NewReader(s) + + resp, err := c.Object.UploadPart( + context.Background(), name, uploadID, n, f, nil, + ) + log_status(err) + fmt.Printf("%s\n", resp.Status) + return resp.Header.Get("Etag") +} + +func main() { + u, _ := url.Parse("http://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + RequestBody: false, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + + name := "test/test_list_parts.go" + up := initUpload(c, name) + uploadID := up.UploadID + blockSize := 1024 * 1024 * 3 + + for i := 1; i < 5; i++ { + uploadPart(c, name, uploadID, blockSize, i) + } + opt := &cos.ObjectListUploadsOptions{ + Prefix: cos.EncodeURIComponent("test/test_list_parts"), + MaxUploads: 100, + } + v, _, err := c.Object.ListUploads(context.Background(), opt) + if err != nil { + log_status(err) + return + } + fmt.Printf("%+v\n", v) + for _, p := range v.Upload { + fmt.Printf("%+v\n", p) + fmt.Printf("%v, %v, %v\n", p.Key, p.UploadID, p.Initiated) + } +} diff --git a/example/object/select.go b/example/object/select.go new file mode 100644 index 0000000..a07a4fa --- /dev/null +++ b/example/object/select.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "fmt" + "net/url" + "os" + + "io/ioutil" + "net/http" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + // Notice when put a large file and set need the request body, might happend out of memory error. + RequestBody: false, + ResponseHeader: true, + ResponseBody: false, + }, + }, + }) + + opt := &cos.ObjectSelectOptions{ + Expression: "Select * from COSObject", + ExpressionType: "SQL", + InputSerialization: &cos.SelectInputSerialization{ + JSON: &cos.JSONInputSerialization{ + Type: "DOCUMENT", + }, + }, + OutputSerialization: &cos.SelectOutputSerialization{ + JSON: &cos.JSONOutputSerialization{ + RecordDelimiter: "\n", + }, + }, + RequestProgress: "TRUE", + } + res, err := c.Object.Select(context.Background(), "test.json", opt) + if err != nil { + panic(err) + } + defer res.Close() + data, err := ioutil.ReadAll(res) + if err != nil { + panic(err) + } + fmt.Printf("data: %v\n", string(data)) + resp, _ := res.(*cos.ObjectSelectResponse) + fmt.Printf("data: %+v\n", resp.Frame) + + // Select to File + _, err = c.Object.SelectToFile(context.Background(), "test.json", "./test.json", opt) + if err != nil { + panic(err) + } +} diff --git a/example/object/select_csv.go b/example/object/select_csv.go new file mode 100644 index 0000000..19c1262 --- /dev/null +++ b/example/object/select_csv.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "fmt" + "net/url" + "os" + + "io/ioutil" + "net/http" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + // Notice when put a large file and set need the request body, might happend out of memory error. + RequestBody: false, + ResponseHeader: true, + ResponseBody: false, + }, + }, + }) + + opt := &cos.ObjectSelectOptions{ + Expression: "Select * from COSObject", + ExpressionType: "SQL", + InputSerialization: &cos.SelectInputSerialization{ + CSV: &cos.CSVInputSerialization{ + FileHeaderInfo: "IGNORE", + }, + }, + OutputSerialization: &cos.SelectOutputSerialization{ + CSV: &cos.CSVOutputSerialization{ + RecordDelimiter: "\n", + }, + }, + RequestProgress: "TRUE", + } + res, err := c.Object.Select(context.Background(), "test.csv", opt) + if err != nil { + panic(err) + } + defer res.Close() + data, err := ioutil.ReadAll(res) + if err != nil { + panic(err) + } + fmt.Printf("data: %v\n", string(data)) + resp, _ := res.(*cos.ObjectSelectResponse) + fmt.Printf("data: %+v\n", resp.Frame) + + // Select To File + _, err = c.Object.SelectToFile(context.Background(), "test.csv", "./test.csv", opt) + if err != nil { + panic(err) + } +} diff --git a/helper.go b/helper.go index 08c5a35..f0ed8f3 100644 --- a/helper.go +++ b/helper.go @@ -6,6 +6,7 @@ import ( "crypto/sha1" "fmt" "net/http" + "net/url" ) // 计算 md5 或 sha1 时的分块大小 @@ -83,3 +84,19 @@ func encodeURIComponent(s string) string { b.WriteString(s[written:]) return b.String() } + +func decodeURIComponent(s string) (string, error) { + decodeStr, err := url.QueryUnescape(s) + if err != nil { + return s, err + } + return decodeStr, err +} + +func DecodeURIComponent(s string) (string, error) { + return DecodeURIComponent(s) +} + +func EncodeURIComponent(s string) string { + return encodeURIComponent(s) +} diff --git a/object.go b/object.go index d44e351..9b963ca 100644 --- a/object.go +++ b/object.go @@ -2,10 +2,12 @@ package cos import ( "context" + "crypto/md5" "encoding/xml" "errors" "fmt" "io" + "io/ioutil" "net/http" "net/url" "os" @@ -31,6 +33,8 @@ type ObjectGetOptions struct { XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"` XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"` XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` + + XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` } // presignedURLTestingOptions is the opt of presigned url @@ -146,7 +150,8 @@ type ObjectPutHeaderOptions struct { XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"` XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` //兼容其他自定义头部 - XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"` + XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"` + XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` } // ObjectPutOptions the options of put object @@ -484,12 +489,15 @@ type MultiUploadOptions struct { OptIni *InitiateMultipartUploadOptions PartSize int64 ThreadPoolSize int + CheckPoint bool } type Chunk struct { Number int OffSet int64 Size int64 + Done bool + ETag string } // jobs @@ -506,6 +514,7 @@ type Jobs struct { type Results struct { PartNumber int Resp *Response + err error } func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { @@ -513,6 +522,7 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { fd, err := os.Open(j.FilePath) var res Results if err != nil { + res.err = err res.PartNumber = j.Chunk.Number res.Resp = nil results <- &res @@ -528,6 +538,7 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { &io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt) res.PartNumber = j.Chunk.Number res.Resp = resp + res.err = err if err != nil { rt-- if rt == 0 { @@ -601,6 +612,80 @@ func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) } +func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) { + opt := &ObjectListUploadsOptions{ + Prefix: name, + EncodingType: "url", + } + res, _, err := s.ListUploads(ctx, opt) + if err != nil { + return "", err + } + if len(res.Upload) == 0 { + return "", nil + } + last := len(res.Upload) - 1 + for last >= 0 { + decodeKey, _ := decodeURIComponent(res.Upload[last].Key) + if decodeKey == name { + return decodeURIComponent(res.Upload[last].UploadID) + } + last = last - 1 + } + return "", nil +} + +func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error { + var uploadedParts []Object + isTruncated := true + opt := &ObjectListPartsOptions{ + EncodingType: "url", + } + for isTruncated { + res, _, err := s.ListParts(ctx, name, UploadID, opt) + if err != nil { + return err + } + if len(res.Parts) > 0 { + uploadedParts = append(uploadedParts, res.Parts...) + } + isTruncated = res.IsTruncated + opt.PartNumberMarker = res.NextPartNumberMarker + } + fd, err := os.Open(filepath) + if err != nil { + return err + } + defer fd.Close() + // 某个分块出错, 重置chunks + ret := func(e error) error { + for i, _ := range chunks { + chunks[i].Done = false + chunks[i].ETag = "" + } + return e + } + for _, part := range uploadedParts { + partNumber := part.PartNumber + if partNumber > partNum { + return ret(errors.New("Part Number is not consistent")) + } + partNumber = partNumber - 1 + fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET) + bs, err := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size)) + if err != nil { + return ret(err) + } + localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs)) + if localMD5 != part.ETag { + return ret(errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber))) + } + chunks[partNumber].Done = true + chunks[partNumber].ETag = part.ETag + } + return nil +} + // MultiUpload/Upload 为高级upload接口,并发分块上传 // 注意该接口目前只供参考 // @@ -620,6 +705,7 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string if err != nil { return nil, nil, err } + // filesize=0 , use simple upload if partNum == 0 { var opt0 *ObjectPutOptions if opt.OptIni != nil { @@ -639,13 +725,26 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string return result, rsp, nil } + var uploadID string + resumableFlag := false + if opt.CheckPoint { + var err error + uploadID, err = s.getResumableUploadID(ctx, name) + if err == nil && uploadID != "" { + err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum) + resumableFlag = (err == nil) + } + } + // 2.Init optini := opt.OptIni - res, _, err := s.InitiateMultipartUpload(ctx, name, optini) - if err != nil { - return nil, nil, err + if !resumableFlag { + res, _, err := s.InitiateMultipartUpload(ctx, name, optini) + if err != nil { + return nil, nil, err + } + uploadID = res.UploadID } - uploadID := res.UploadID var poolSize int if opt.ThreadPoolSize > 0 { poolSize = opt.ThreadPoolSize @@ -665,11 +764,15 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string // 4.Push jobs for _, chunk := range chunks { + if chunk.Done { + continue + } partOpt := &ObjectUploadPartOptions{} if optini != nil && optini.ObjectPutHeaderOptions != nil { partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5 + partOpt.XCosTrafficLimit = optini.XCosTrafficLimit } job := &Jobs{ Name: name, @@ -684,12 +787,18 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string close(chjobs) // 5.Recv the resp etag to complete - for i := 1; i <= partNum; i++ { + for i := 0; i < partNum; i++ { + if chunks[i].Done { + optcom.Parts = append(optcom.Parts, Object{ + PartNumber: chunks[i].Number, ETag: chunks[i].ETag}, + ) + continue + } res := <-chresults // Notice one part fail can not get the etag according. - if res.Resp == nil { + if res.Resp == nil || res.err != nil { // Some part already fail, can not to get the header inside. - return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, res.PartNumber) + return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error()) } // Notice one part fail can not get the etag according. etag := res.Resp.Header.Get("ETag") diff --git a/object_part.go b/object_part.go index 897afca..ee181b8 100644 --- a/object_part.go +++ b/object_part.go @@ -42,12 +42,14 @@ func (s *ObjectService) InitiateMultipartUpload(ctx context.Context, name string // ObjectUploadPartOptions is the options of upload-part type ObjectUploadPartOptions struct { Expect string `header:"Expect,omitempty" url:"-"` - XCosContentSHA1 string `header:"x-cos-content-sha1" url:"-"` + XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"` ContentLength int `header:"Content-Length,omitempty" url:"-"` XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"` XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"` XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` + + XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` } // UploadPart 请求实现在初始化以后的分块上传,支持的块的数量为1到10000,块的大小为1 MB 到5 GB。 @@ -244,3 +246,50 @@ func (s *ObjectService) CopyPart(ctx context.Context, name, uploadID string, par } return &res, resp, err } + +type ObjectListUploadsOptions struct { + Delimiter string `url:"Delimiter,omitempty"` + EncodingType string `url:"EncodingType,omitempty"` + Prefix string `url:"Prefix"` + MaxUploads int `url:"MaxUploads"` + KeyMarker string `url:"KeyMarker"` + UploadIdMarker string `url:"UploadIDMarker"` +} + +type ObjectListUploadsResult struct { + XMLName xml.Name `xml:"ListMultipartUploadsResult"` + Bucket string `xml:"Bucket,omitempty"` + EncodingType string `xml:"Encoding-Type,omitempty"` + KeyMarker string `xml:"KeyMarker,omitempty"` + UploadIdMarker string `xml:"UploadIdMarker,omitempty"` + NextKeyMarker string `xml:"NextKeyMarker,omitempty"` + NextUploadIdMarker string `xml:"NextUploadIdMarker,omitempty"` + MaxUploads string `xml:"MaxUploads,omitempty"` + IsTruncated bool `xml:"IsTruncated,omitempty"` + Prefix string `xml:"Prefix,omitempty"` + Delimiter string `xml:"Delimiter,omitempty"` + Upload []ListUploadsResultUpload `xml:"Upload,omitempty"` + CommonPrefixes []string `xml:"CommonPrefixes>Prefix,omitempty"` +} + +type ListUploadsResultUpload struct { + Key string `xml:"Key,omitempty"` + UploadID string `xml:"UploadId,omitempty"` + StorageClass string `xml:"StorageClass,omitempty"` + Initiator *Initiator `xml:"Initiator,omitempty"` + Owner *Owner `xml:"Owner,omitempty"` + Initiated string `xml:"Initiated,omitempty"` +} + +func (s *ObjectService) ListUploads(ctx context.Context, opt *ObjectListUploadsOptions) (*ObjectListUploadsResult, *Response, error) { + var res ObjectListUploadsResult + sendOpt := &sendOptions{ + baseURL: s.client.BaseURL.BucketURL, + uri: "/?uploads", + method: http.MethodGet, + optQuery: opt, + result: &res, + } + resp, err := s.client.send(ctx, sendOpt) + return &res, resp, err +} diff --git a/object_select.go b/object_select.go new file mode 100644 index 0000000..4269b5a --- /dev/null +++ b/object_select.go @@ -0,0 +1,444 @@ +package cos + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/xml" + "fmt" + "hash/crc32" + "io" + "io/ioutil" + "net/http" + "os" + "time" +) + +type JSONInputSerialization struct { + Type string `xml:"Type"` +} + +type CSVInputSerialization struct { + RecordDelimiter string `xml:"RecordDelimiter,omitempty"` + FieldDelimiter string `xml:"FieldDelimiter,omitempty"` + QuoteCharacter string `xml:"QuoteCharacter,omitempty"` + QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"` + AllowQuotedRecordDelimiter string `xml:"AllowQuotedRecordDelimiter,omitempty"` + FileHeaderInfo string `xml:"FileHeaderInfo,omitempty"` + Comments string `xml:"Comments,omitempty"` +} + +type SelectInputSerialization struct { + CompressionType string `xml:"CompressionType,omitempty"` + CSV *CSVInputSerialization `xml:"CSV,omitempty"` + JSON *JSONInputSerialization `xml:"JSON,omitempty"` +} + +type JSONOutputSerialization struct { + RecordDelimiter string `xml:"RecordDelimiter,omitempty"` +} + +type CSVOutputSerialization struct { + QuoteFileds string `xml:"QuoteFileds,omitempty"` + RecordDelimiter string `xml:"RecordDelimiter,omitempty"` + FieldDelimiter string `xml:"FieldDelimiter,omitempty"` + QuoteCharacter string `xml:"QuoteCharacter,omitempty"` + QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"` +} + +type SelectOutputSerialization struct { + CSV *CSVOutputSerialization `xml:"CSV,omitempty"` + JSON *JSONOutputSerialization `xml:"JSON,omitempty"` +} + +type ObjectSelectOptions struct { + XMLName xml.Name `xml:"SelectRequest"` + Expression string `xml:"Expression"` + ExpressionType string `xml:"ExpressionType"` + InputSerialization *SelectInputSerialization `xml:"InputSerialization"` + OutputSerialization *SelectOutputSerialization `xml:"OutputSerialization"` + RequestProgress string `xml:"RequestProgress>Enabled,omitempty"` +} + +func (s *ObjectService) Select(ctx context.Context, name string, opt *ObjectSelectOptions) (io.ReadCloser, error) { + u := fmt.Sprintf("/%s?select&select-type=2", encodeURIComponent(name)) + sendOpt := sendOptions{ + baseURL: s.client.BaseURL.BucketURL, + uri: u, + method: http.MethodPost, + body: opt, + disableCloseBody: true, + } + resp, err := s.client.send(ctx, &sendOpt) + if err != nil { + return nil, err + } + result := &ObjectSelectResponse{ + Headers: resp.Header, + Body: resp.Body, + StatusCode: resp.StatusCode, + Frame: &ObjectSelectResult{ + NextFrame: true, + Payload: []byte{}, + }, + Finish: false, + } + + return result, nil +} + +func (s *ObjectService) SelectToFile(ctx context.Context, name, file string, opt *ObjectSelectOptions) (*ObjectSelectResponse, error) { + resp, err := s.Select(ctx, name, opt) + if err != nil { + return nil, err + } + res, _ := resp.(*ObjectSelectResponse) + defer func() { + io.Copy(ioutil.Discard, resp) + resp.Close() + }() + + fd, err := os.OpenFile(file, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(0664)) + if err != nil { + return res, err + } + + _, err = io.Copy(fd, resp) + fd.Close() + res.Finish = true + return res, err +} + +const ( + kReadTimeout = 3 + kMessageType = ":message-type" + kEventType = ":event-type" + kContentType = ":content-type" + + kRecordsFrameType = iota + kContinuationFrameType + kProgressFrameType + kStatsFrameType + kEndFrameType + kErrorFrameType +) + +type ProgressFrame struct { + XMLName xml.Name `xml:"Progress"` + BytesScanned int `xml:"BytesScanned"` + BytesProcessed int `xml:"BytesProcessed"` + BytesReturned int `xml:"BytesReturned"` +} + +type StatsFrame struct { + XMLName xml.Name `xml:"Stats"` + BytesScanned int `xml:"BytesScanned"` + BytesProcessed int `xml:"BytesProcessed"` + BytesReturned int `xml:"BytesReturned"` +} + +type DataFrame struct { + ContentType string + ConsumedBytesLength int32 + LeftBytesLength int32 +} + +type ErrorFrame struct { + Code string + Message string +} + +func (e *ErrorFrame) Error() string { + return fmt.Sprintf("Error Code: %s, Error Message: %s", e.Code, e.Message) +} + +type ObjectSelectResult struct { + TotalFrameLength int32 + TotalHeaderLength int32 + NextFrame bool + FrameType int + Payload []byte + DataFrame DataFrame + ProgressFrame ProgressFrame + StatsFrame StatsFrame + ErrorFrame *ErrorFrame +} + +type ObjectSelectResponse struct { + StatusCode int + Headers http.Header + Body io.ReadCloser + Frame *ObjectSelectResult + Finish bool +} + +func (osr *ObjectSelectResponse) Read(p []byte) (n int, err error) { + n, err = osr.readFrames(p) + return +} +func (osr *ObjectSelectResponse) Close() error { + return osr.Body.Close() +} + +func (osr *ObjectSelectResponse) readFrames(p []byte) (int, error) { + if osr.Finish { + return 0, io.EOF + } + if osr.Frame.ErrorFrame != nil { + return 0, osr.Frame.ErrorFrame + } + + var err error + var nlen int + dlen := len(p) + + for nlen < dlen { + if osr.Frame.NextFrame == true { + osr.Frame.NextFrame = false + err := osr.analysisPrelude() + if err != nil { + return nlen, err + } + err = osr.analysisHeader() + if err != nil { + return nlen, err + } + } + switch osr.Frame.FrameType { + case kRecordsFrameType: + n, err := osr.analysisRecords(p[nlen:]) + if err != nil { + return nlen, err + } + nlen += n + case kContinuationFrameType: + err = osr.payloadChecksum("ContinuationFrame") + if err != nil { + return nlen, err + } + case kProgressFrameType: + err := osr.analysisXml(&osr.Frame.ProgressFrame) + if err != nil { + return nlen, err + } + case kStatsFrameType: + err := osr.analysisXml(&osr.Frame.StatsFrame) + if err != nil { + return nlen, err + } + case kEndFrameType: + err = osr.payloadChecksum("EndFrame") + if err != nil { + return nlen, err + } + osr.Finish = true + return nlen, io.EOF + case kErrorFrameType: + return nlen, osr.Frame.ErrorFrame + } + } + return nlen, err +} + +func (osr *ObjectSelectResponse) analysisPrelude() error { + frame := make([]byte, 12) + _, err := osr.fixedLengthRead(frame, kReadTimeout) + if err != nil { + return err + } + + var preludeCRC uint32 + bytesToInt(frame[0:4], &osr.Frame.TotalFrameLength) + bytesToInt(frame[4:8], &osr.Frame.TotalHeaderLength) + bytesToInt(frame[8:12], &preludeCRC) + osr.Frame.Payload = append(osr.Frame.Payload, frame...) + + return checksum(frame[0:8], preludeCRC, "Prelude") +} + +func (osr *ObjectSelectResponse) analysisHeader() error { + var nlen int32 + headers := make(map[string]string) + for nlen < osr.Frame.TotalHeaderLength { + var headerNameLen int8 + var headerValueLen int16 + bHeaderNameLen := make([]byte, 1) + _, err := osr.fixedLengthRead(bHeaderNameLen, kReadTimeout) + if err != nil { + return err + } + nlen += 1 + bytesToInt(bHeaderNameLen, &headerNameLen) + osr.Frame.Payload = append(osr.Frame.Payload, bHeaderNameLen...) + + bHeaderName := make([]byte, headerNameLen) + _, err = osr.fixedLengthRead(bHeaderName, kReadTimeout) + if err != nil { + return err + } + nlen += int32(headerNameLen) + headerName := string(bHeaderName) + osr.Frame.Payload = append(osr.Frame.Payload, bHeaderName...) + + bValueTypeLen := make([]byte, 3) + _, err = osr.fixedLengthRead(bValueTypeLen, kReadTimeout) + if err != nil { + return err + } + nlen += 3 + bytesToInt(bValueTypeLen[1:], &headerValueLen) + osr.Frame.Payload = append(osr.Frame.Payload, bValueTypeLen...) + + bHeaderValue := make([]byte, headerValueLen) + _, err = osr.fixedLengthRead(bHeaderValue, kReadTimeout) + if err != nil { + return err + } + nlen += int32(headerValueLen) + headers[headerName] = string(bHeaderValue) + osr.Frame.Payload = append(osr.Frame.Payload, bHeaderValue...) + } + htype, ok := headers[kMessageType] + if !ok { + return fmt.Errorf("header parse failed, no message-type, headers: %+v\n", headers) + } + switch { + case htype == "error": + osr.Frame.FrameType = kErrorFrameType + osr.Frame.ErrorFrame = &ErrorFrame{} + osr.Frame.ErrorFrame.Code, _ = headers[":error-code"] + osr.Frame.ErrorFrame.Message, _ = headers[":error-message"] + case htype == "event": + hevent, ok := headers[kEventType] + if !ok { + return fmt.Errorf("header parse failed, no event-type, headers: %+v\n", headers) + } + switch { + case hevent == "Records": + hContentType, ok := headers[kContentType] + if ok { + osr.Frame.DataFrame.ContentType = hContentType + } + osr.Frame.FrameType = kRecordsFrameType + case hevent == "Cont": + osr.Frame.FrameType = kContinuationFrameType + case hevent == "Progress": + osr.Frame.FrameType = kProgressFrameType + case hevent == "Stats": + osr.Frame.FrameType = kStatsFrameType + case hevent == "End": + osr.Frame.FrameType = kEndFrameType + default: + return fmt.Errorf("header parse failed, invalid event-type, headers: %+v\n", headers) + } + default: + return fmt.Errorf("header parse failed, invalid message-type: headers: %+v\n", headers) + } + return nil +} + +func (osr *ObjectSelectResponse) analysisRecords(data []byte) (int, error) { + var needReadLength int32 + dlen := int32(len(data)) + restLen := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength - osr.Frame.DataFrame.ConsumedBytesLength + if dlen <= restLen { + needReadLength = dlen + } else { + needReadLength = restLen + } + n, err := osr.fixedLengthRead(data[:needReadLength], kReadTimeout) + if err != nil { + return n, fmt.Errorf("read data frame error: %s", err.Error()) + } + osr.Frame.DataFrame.ConsumedBytesLength += int32(n) + osr.Frame.Payload = append(osr.Frame.Payload, data[:needReadLength]...) + // 读完了一帧数据并填充到data中了 + if osr.Frame.DataFrame.ConsumedBytesLength == osr.Frame.TotalFrameLength-16-osr.Frame.TotalHeaderLength { + osr.Frame.DataFrame.ConsumedBytesLength = 0 + err = osr.payloadChecksum("RecordFrame") + } + return n, err +} + +func (osr *ObjectSelectResponse) analysisXml(frame interface{}) error { + payloadLength := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength + bFrame := make([]byte, payloadLength) + _, err := osr.fixedLengthRead(bFrame, kReadTimeout) + if err != nil { + return err + } + err = xml.Unmarshal(bFrame, frame) + if err != nil { + return err + } + osr.Frame.Payload = append(osr.Frame.Payload, bFrame...) + return osr.payloadChecksum("XmlFrame") +} + +// 调用payloadChecksum时,表示该帧已读完,开始读取下一帧内容 +func (osr *ObjectSelectResponse) payloadChecksum(ftype string) error { + bcrc := make([]byte, 4) + _, err := osr.fixedLengthRead(bcrc, kReadTimeout) + if err != nil { + return err + } + var res uint32 + bytesToInt(bcrc, &res) + err = checksum(osr.Frame.Payload, res, ftype) + + osr.Frame.NextFrame = true + osr.Frame.Payload = []byte{} + + return err +} + +type chanReadIO struct { + readLen int + err error +} + +func (osr *ObjectSelectResponse) fixedLengthRead(p []byte, read_timeout int64) (int, error) { + timeout := time.Duration(read_timeout) + r := osr.Body + ch := make(chan chanReadIO, 1) + defer close(ch) + go func(p []byte) { + var needLen int + readChan := chanReadIO{} + needLen = len(p) + for { + n, err := r.Read(p[readChan.readLen:needLen]) + readChan.readLen += n + if err != nil { + readChan.err = err + ch <- readChan + return + } + + if readChan.readLen == needLen { + break + } + } + ch <- readChan + }(p) + + select { + case <-time.After(time.Second * timeout): + return 0, fmt.Errorf("requestId: %s, readLen timeout, timeout is %d(second),need read:%d", "sr.Headers.Get(HTTPHeaderOssRequestID)", timeout, len(p)) + case result := <-ch: + return result.readLen, result.err + } +} + +func bytesToInt(b []byte, ret interface{}) { + binBuf := bytes.NewBuffer(b) + binary.Read(binBuf, binary.BigEndian, ret) +} + +func checksum(b []byte, rec uint32, ftype string) error { + c := crc32.ChecksumIEEE(b) + if c != rec { + return fmt.Errorf("parse type: %v, checksum failed, cal: %v, rec: %v\n", ftype, c, rec) + } + return nil +}