From 287669a677af36153d645aabd4b296114aea75fe Mon Sep 17 00:00:00 2001 From: jojoliang Date: Tue, 21 Jan 2020 12:04:45 +0800 Subject: [PATCH 1/9] add checkpoint multi upload --- object.go | 137 +++++++++++++++++++++++++++++++++++++++++++++++++-------- object_part.go | 2 +- 2 files changed, 119 insertions(+), 20 deletions(-) diff --git a/object.go b/object.go index d44e351..514c5c2 100644 --- a/object.go +++ b/object.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net/http" "net/url" "os" @@ -481,15 +482,28 @@ type Object struct { // MultiUploadOptions is the option of the multiupload, // ThreadPoolSize default is one type MultiUploadOptions struct { - OptIni *InitiateMultipartUploadOptions - PartSize int64 - ThreadPoolSize int + OptIni *InitiateMultipartUploadOptions + PartSize int64 + ThreadPoolSize int + CheckPointFile string + EnableCheckpoint bool +} + +type CheckPointOptions struct { + cpfile *os.File + Key string `xml:"Key"` + FilePath string `xml:"FilePath"` + FileSize int64 `xml:"FileSize"` + PartSize int64 `xml:"PartSize"` + UploadID string `xml:"UploadID"` + Parts []Object `xml:"Parts>Part,omitempty"` } type Chunk struct { Number int OffSet int64 Size int64 + Done bool } // jobs @@ -506,6 +520,7 @@ type Jobs struct { type Results struct { PartNumber int Resp *Response + err error } func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { @@ -513,6 +528,7 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { fd, err := os.Open(j.FilePath) var res Results if err != nil { + res.err = err res.PartNumber = j.Chunk.Number res.Resp = nil results <- &res @@ -528,6 +544,7 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { &io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt) res.PartNumber = j.Chunk.Number res.Resp = resp + res.err = err if err != nil { rt-- if rt == 0 { @@ -554,34 +571,72 @@ func DividePart(fileSize int64) (int64, int64) { return partNum, partSize } -func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) { +func SplitFileIntoChunks(name, filePath string, opt *MultiUploadOptions) (*CheckPointOptions, []Chunk, int, error) { if filePath == "" { - return nil, 0, errors.New("filePath invalid") + return nil, nil, 0, errors.New("filePath invalid") } file, err := os.Open(filePath) if err != nil { - return nil, 0, err + return nil, nil, 0, err } defer file.Close() stat, err := file.Stat() if err != nil { - return nil, 0, err + return nil, nil, 0, err } + + optcp := &CheckPointOptions{} + uploaded := false + if opt.EnableCheckpoint { + for { + optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDONLY|os.O_CREATE, 0644) + if err != nil { + return nil, nil, 0, errors.New("open(create) checkpoint file[" + opt.CheckPointFile + "] failed, error:" + err.Error()) + } + defer optcp.cpfile.Close() + bs, err := ioutil.ReadAll(optcp.cpfile) + if err != nil { + break + } + err = xml.Unmarshal(bs, optcp) + if err != nil { + break + } + if optcp.Key != name || optcp.FilePath != filePath || optcp.FileSize != stat.Size() { + break + } + uploaded = true + break + } + optcp.Key = name + optcp.FilePath = filePath + optcp.FileSize = stat.Size() + } + var partNum int64 + partSize := opt.PartSize + if uploaded { + partSize = optcp.PartSize + } if partSize > 0 { partSize = partSize * 1024 * 1024 partNum = stat.Size() / partSize if partNum >= 10000 { - return nil, 0, errors.New("Too many parts, out of 10000") + return nil, nil, 0, errors.New("Too many parts, out of 10000") } } else { partNum, partSize = DividePart(stat.Size()) } + if opt.EnableCheckpoint { + optcp.PartSize = partSize / 1024 / 1024 + } var chunks []Chunk - var chunk = Chunk{} + var chunk = Chunk{ + Done: false, + } for i := int64(0); i < partNum; i++ { chunk.Number = int(i + 1) chunk.OffSet = i * partSize @@ -597,8 +652,14 @@ func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) partNum++ } - return chunks, int(partNum), nil - + if uploaded { + for _, part := range optcp.Parts { + if part.PartNumber <= int(partNum) { + chunks[(part.PartNumber - 1)].Done = true + } + } + } + return optcp, chunks, int(partNum), nil } // MultiUpload/Upload 为高级upload接口,并发分块上传 @@ -615,8 +676,12 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string if opt == nil { opt = &MultiUploadOptions{} } + if opt.EnableCheckpoint && opt.CheckPointFile == "" { + opt.CheckPointFile = fmt.Sprintf("%s.cp", filepath) + } + // 1.Get the file chunk - chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize) + optcp, chunks, partNum, err := SplitFileIntoChunks(name, filepath, opt) if err != nil { return nil, nil, err } @@ -637,15 +702,26 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string ETag: rsp.Header.Get("ETag"), } return result, rsp, nil + } + if opt.EnableCheckpoint { + optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDWR, 0644) + if err != nil { + return nil, nil, errors.New("open checkpoint file failed, error: " + err.Error()) + } + defer optcp.cpfile.Close() } - // 2.Init + uploadID := optcp.UploadID optini := opt.OptIni - res, _, err := s.InitiateMultipartUpload(ctx, name, optini) - if err != nil { - return nil, nil, err + if uploadID == "" { + // 2.Init + res, _, err := s.InitiateMultipartUpload(ctx, name, optini) + if err != nil { + return nil, nil, err + } + uploadID = res.UploadID + optcp.UploadID = uploadID } - uploadID := res.UploadID var poolSize int if opt.ThreadPoolSize > 0 { poolSize = opt.ThreadPoolSize @@ -657,6 +733,10 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string chjobs := make(chan *Jobs, 100) chresults := make(chan *Results, 10000) optcom := &CompleteMultipartUploadOptions{} + if len(optcp.Parts) > 0 { + optcom.Parts = append(optcom.Parts, optcp.Parts...) + partNum -= len(optcp.Parts) + } // 3.Start worker for w := 1; w <= poolSize; w++ { @@ -665,6 +745,9 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string // 4.Push jobs for _, chunk := range chunks { + if chunk.Done { + continue + } partOpt := &ObjectUploadPartOptions{} if optini != nil && optini.ObjectPutHeaderOptions != nil { partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo @@ -687,19 +770,35 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string for i := 1; i <= partNum; i++ { res := <-chresults // Notice one part fail can not get the etag according. - if res.Resp == nil { + if res.Resp == nil || res.err != nil { // Some part already fail, can not to get the header inside. - return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, res.PartNumber) + return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error()) } // Notice one part fail can not get the etag according. etag := res.Resp.Header.Get("ETag") optcom.Parts = append(optcom.Parts, Object{ PartNumber: res.PartNumber, ETag: etag}, ) + if opt.EnableCheckpoint { + optcp.Parts = append(optcp.Parts, Object{ + PartNumber: res.PartNumber, ETag: etag}, + ) + err := optcp.cpfile.Truncate(0) + if err != nil { + continue + } + _, err = optcp.cpfile.Seek(0, os.SEEK_SET) + if err == nil { + xml.NewEncoder(optcp.cpfile).Encode(optcp) + } + } } sort.Sort(ObjectList(optcom.Parts)) v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom) + if opt.EnableCheckpoint && err == nil { + os.Remove(opt.CheckPointFile) + } return v, resp, err } diff --git a/object_part.go b/object_part.go index 897afca..75bce19 100644 --- a/object_part.go +++ b/object_part.go @@ -42,7 +42,7 @@ func (s *ObjectService) InitiateMultipartUpload(ctx context.Context, name string // ObjectUploadPartOptions is the options of upload-part type ObjectUploadPartOptions struct { Expect string `header:"Expect,omitempty" url:"-"` - XCosContentSHA1 string `header:"x-cos-content-sha1" url:"-"` + XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"` ContentLength int `header:"Content-Length,omitempty" url:"-"` XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"` From 52c110b7ede92af97e3685d4afd73a128ddd443b Mon Sep 17 00:00:00 2001 From: jojoliang Date: Tue, 21 Jan 2020 15:39:18 +0800 Subject: [PATCH 2/9] add checkpoint multi upload: update --- object.go | 1 + 1 file changed, 1 insertion(+) diff --git a/object.go b/object.go index 514c5c2..03738cb 100644 --- a/object.go +++ b/object.go @@ -605,6 +605,7 @@ func SplitFileIntoChunks(name, filePath string, opt *MultiUploadOptions) (*Check break } if optcp.Key != name || optcp.FilePath != filePath || optcp.FileSize != stat.Size() { + optcp = &CheckPointOptions{} break } uploaded = true From e6f823c7c1ee66fe68de4aafd45b009dee4b1ac2 Mon Sep 17 00:00:00 2001 From: jojoliang Date: Thu, 20 Feb 2020 14:19:40 +0800 Subject: [PATCH 3/9] add select, and x-cos-traffic-limit --- example/object/select.go | 66 +++++++ example/object/select_csv.go | 66 +++++++ object.go | 4 + object_part.go | 2 + object_select.go | 444 +++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 582 insertions(+) create mode 100644 example/object/select.go create mode 100644 example/object/select_csv.go create mode 100644 object_select.go diff --git a/example/object/select.go b/example/object/select.go new file mode 100644 index 0000000..a07a4fa --- /dev/null +++ b/example/object/select.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "fmt" + "net/url" + "os" + + "io/ioutil" + "net/http" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + // Notice when put a large file and set need the request body, might happend out of memory error. + RequestBody: false, + ResponseHeader: true, + ResponseBody: false, + }, + }, + }) + + opt := &cos.ObjectSelectOptions{ + Expression: "Select * from COSObject", + ExpressionType: "SQL", + InputSerialization: &cos.SelectInputSerialization{ + JSON: &cos.JSONInputSerialization{ + Type: "DOCUMENT", + }, + }, + OutputSerialization: &cos.SelectOutputSerialization{ + JSON: &cos.JSONOutputSerialization{ + RecordDelimiter: "\n", + }, + }, + RequestProgress: "TRUE", + } + res, err := c.Object.Select(context.Background(), "test.json", opt) + if err != nil { + panic(err) + } + defer res.Close() + data, err := ioutil.ReadAll(res) + if err != nil { + panic(err) + } + fmt.Printf("data: %v\n", string(data)) + resp, _ := res.(*cos.ObjectSelectResponse) + fmt.Printf("data: %+v\n", resp.Frame) + + // Select to File + _, err = c.Object.SelectToFile(context.Background(), "test.json", "./test.json", opt) + if err != nil { + panic(err) + } +} diff --git a/example/object/select_csv.go b/example/object/select_csv.go new file mode 100644 index 0000000..19c1262 --- /dev/null +++ b/example/object/select_csv.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "fmt" + "net/url" + "os" + + "io/ioutil" + "net/http" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + // Notice when put a large file and set need the request body, might happend out of memory error. + RequestBody: false, + ResponseHeader: true, + ResponseBody: false, + }, + }, + }) + + opt := &cos.ObjectSelectOptions{ + Expression: "Select * from COSObject", + ExpressionType: "SQL", + InputSerialization: &cos.SelectInputSerialization{ + CSV: &cos.CSVInputSerialization{ + FileHeaderInfo: "IGNORE", + }, + }, + OutputSerialization: &cos.SelectOutputSerialization{ + CSV: &cos.CSVOutputSerialization{ + RecordDelimiter: "\n", + }, + }, + RequestProgress: "TRUE", + } + res, err := c.Object.Select(context.Background(), "test.csv", opt) + if err != nil { + panic(err) + } + defer res.Close() + data, err := ioutil.ReadAll(res) + if err != nil { + panic(err) + } + fmt.Printf("data: %v\n", string(data)) + resp, _ := res.(*cos.ObjectSelectResponse) + fmt.Printf("data: %+v\n", resp.Frame) + + // Select To File + _, err = c.Object.SelectToFile(context.Background(), "test.csv", "./test.csv", opt) + if err != nil { + panic(err) + } +} diff --git a/object.go b/object.go index 03738cb..0c348fc 100644 --- a/object.go +++ b/object.go @@ -32,6 +32,7 @@ type ObjectGetOptions struct { XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"` XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"` XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` + XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` } // presignedURLTestingOptions is the opt of presigned url @@ -148,6 +149,8 @@ type ObjectPutHeaderOptions struct { XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` //兼容其他自定义头部 XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"` + + XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` } // ObjectPutOptions the options of put object @@ -754,6 +757,7 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5 + partOpt.XCosTrafficLimit = optini.XCosTrafficLimit } job := &Jobs{ Name: name, diff --git a/object_part.go b/object_part.go index 75bce19..d2d3bc4 100644 --- a/object_part.go +++ b/object_part.go @@ -48,6 +48,8 @@ type ObjectUploadPartOptions struct { XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"` XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"` XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` + + XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` } // UploadPart 请求实现在初始化以后的分块上传,支持的块的数量为1到10000,块的大小为1 MB 到5 GB。 diff --git a/object_select.go b/object_select.go new file mode 100644 index 0000000..4269b5a --- /dev/null +++ b/object_select.go @@ -0,0 +1,444 @@ +package cos + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/xml" + "fmt" + "hash/crc32" + "io" + "io/ioutil" + "net/http" + "os" + "time" +) + +type JSONInputSerialization struct { + Type string `xml:"Type"` +} + +type CSVInputSerialization struct { + RecordDelimiter string `xml:"RecordDelimiter,omitempty"` + FieldDelimiter string `xml:"FieldDelimiter,omitempty"` + QuoteCharacter string `xml:"QuoteCharacter,omitempty"` + QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"` + AllowQuotedRecordDelimiter string `xml:"AllowQuotedRecordDelimiter,omitempty"` + FileHeaderInfo string `xml:"FileHeaderInfo,omitempty"` + Comments string `xml:"Comments,omitempty"` +} + +type SelectInputSerialization struct { + CompressionType string `xml:"CompressionType,omitempty"` + CSV *CSVInputSerialization `xml:"CSV,omitempty"` + JSON *JSONInputSerialization `xml:"JSON,omitempty"` +} + +type JSONOutputSerialization struct { + RecordDelimiter string `xml:"RecordDelimiter,omitempty"` +} + +type CSVOutputSerialization struct { + QuoteFileds string `xml:"QuoteFileds,omitempty"` + RecordDelimiter string `xml:"RecordDelimiter,omitempty"` + FieldDelimiter string `xml:"FieldDelimiter,omitempty"` + QuoteCharacter string `xml:"QuoteCharacter,omitempty"` + QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"` +} + +type SelectOutputSerialization struct { + CSV *CSVOutputSerialization `xml:"CSV,omitempty"` + JSON *JSONOutputSerialization `xml:"JSON,omitempty"` +} + +type ObjectSelectOptions struct { + XMLName xml.Name `xml:"SelectRequest"` + Expression string `xml:"Expression"` + ExpressionType string `xml:"ExpressionType"` + InputSerialization *SelectInputSerialization `xml:"InputSerialization"` + OutputSerialization *SelectOutputSerialization `xml:"OutputSerialization"` + RequestProgress string `xml:"RequestProgress>Enabled,omitempty"` +} + +func (s *ObjectService) Select(ctx context.Context, name string, opt *ObjectSelectOptions) (io.ReadCloser, error) { + u := fmt.Sprintf("/%s?select&select-type=2", encodeURIComponent(name)) + sendOpt := sendOptions{ + baseURL: s.client.BaseURL.BucketURL, + uri: u, + method: http.MethodPost, + body: opt, + disableCloseBody: true, + } + resp, err := s.client.send(ctx, &sendOpt) + if err != nil { + return nil, err + } + result := &ObjectSelectResponse{ + Headers: resp.Header, + Body: resp.Body, + StatusCode: resp.StatusCode, + Frame: &ObjectSelectResult{ + NextFrame: true, + Payload: []byte{}, + }, + Finish: false, + } + + return result, nil +} + +func (s *ObjectService) SelectToFile(ctx context.Context, name, file string, opt *ObjectSelectOptions) (*ObjectSelectResponse, error) { + resp, err := s.Select(ctx, name, opt) + if err != nil { + return nil, err + } + res, _ := resp.(*ObjectSelectResponse) + defer func() { + io.Copy(ioutil.Discard, resp) + resp.Close() + }() + + fd, err := os.OpenFile(file, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(0664)) + if err != nil { + return res, err + } + + _, err = io.Copy(fd, resp) + fd.Close() + res.Finish = true + return res, err +} + +const ( + kReadTimeout = 3 + kMessageType = ":message-type" + kEventType = ":event-type" + kContentType = ":content-type" + + kRecordsFrameType = iota + kContinuationFrameType + kProgressFrameType + kStatsFrameType + kEndFrameType + kErrorFrameType +) + +type ProgressFrame struct { + XMLName xml.Name `xml:"Progress"` + BytesScanned int `xml:"BytesScanned"` + BytesProcessed int `xml:"BytesProcessed"` + BytesReturned int `xml:"BytesReturned"` +} + +type StatsFrame struct { + XMLName xml.Name `xml:"Stats"` + BytesScanned int `xml:"BytesScanned"` + BytesProcessed int `xml:"BytesProcessed"` + BytesReturned int `xml:"BytesReturned"` +} + +type DataFrame struct { + ContentType string + ConsumedBytesLength int32 + LeftBytesLength int32 +} + +type ErrorFrame struct { + Code string + Message string +} + +func (e *ErrorFrame) Error() string { + return fmt.Sprintf("Error Code: %s, Error Message: %s", e.Code, e.Message) +} + +type ObjectSelectResult struct { + TotalFrameLength int32 + TotalHeaderLength int32 + NextFrame bool + FrameType int + Payload []byte + DataFrame DataFrame + ProgressFrame ProgressFrame + StatsFrame StatsFrame + ErrorFrame *ErrorFrame +} + +type ObjectSelectResponse struct { + StatusCode int + Headers http.Header + Body io.ReadCloser + Frame *ObjectSelectResult + Finish bool +} + +func (osr *ObjectSelectResponse) Read(p []byte) (n int, err error) { + n, err = osr.readFrames(p) + return +} +func (osr *ObjectSelectResponse) Close() error { + return osr.Body.Close() +} + +func (osr *ObjectSelectResponse) readFrames(p []byte) (int, error) { + if osr.Finish { + return 0, io.EOF + } + if osr.Frame.ErrorFrame != nil { + return 0, osr.Frame.ErrorFrame + } + + var err error + var nlen int + dlen := len(p) + + for nlen < dlen { + if osr.Frame.NextFrame == true { + osr.Frame.NextFrame = false + err := osr.analysisPrelude() + if err != nil { + return nlen, err + } + err = osr.analysisHeader() + if err != nil { + return nlen, err + } + } + switch osr.Frame.FrameType { + case kRecordsFrameType: + n, err := osr.analysisRecords(p[nlen:]) + if err != nil { + return nlen, err + } + nlen += n + case kContinuationFrameType: + err = osr.payloadChecksum("ContinuationFrame") + if err != nil { + return nlen, err + } + case kProgressFrameType: + err := osr.analysisXml(&osr.Frame.ProgressFrame) + if err != nil { + return nlen, err + } + case kStatsFrameType: + err := osr.analysisXml(&osr.Frame.StatsFrame) + if err != nil { + return nlen, err + } + case kEndFrameType: + err = osr.payloadChecksum("EndFrame") + if err != nil { + return nlen, err + } + osr.Finish = true + return nlen, io.EOF + case kErrorFrameType: + return nlen, osr.Frame.ErrorFrame + } + } + return nlen, err +} + +func (osr *ObjectSelectResponse) analysisPrelude() error { + frame := make([]byte, 12) + _, err := osr.fixedLengthRead(frame, kReadTimeout) + if err != nil { + return err + } + + var preludeCRC uint32 + bytesToInt(frame[0:4], &osr.Frame.TotalFrameLength) + bytesToInt(frame[4:8], &osr.Frame.TotalHeaderLength) + bytesToInt(frame[8:12], &preludeCRC) + osr.Frame.Payload = append(osr.Frame.Payload, frame...) + + return checksum(frame[0:8], preludeCRC, "Prelude") +} + +func (osr *ObjectSelectResponse) analysisHeader() error { + var nlen int32 + headers := make(map[string]string) + for nlen < osr.Frame.TotalHeaderLength { + var headerNameLen int8 + var headerValueLen int16 + bHeaderNameLen := make([]byte, 1) + _, err := osr.fixedLengthRead(bHeaderNameLen, kReadTimeout) + if err != nil { + return err + } + nlen += 1 + bytesToInt(bHeaderNameLen, &headerNameLen) + osr.Frame.Payload = append(osr.Frame.Payload, bHeaderNameLen...) + + bHeaderName := make([]byte, headerNameLen) + _, err = osr.fixedLengthRead(bHeaderName, kReadTimeout) + if err != nil { + return err + } + nlen += int32(headerNameLen) + headerName := string(bHeaderName) + osr.Frame.Payload = append(osr.Frame.Payload, bHeaderName...) + + bValueTypeLen := make([]byte, 3) + _, err = osr.fixedLengthRead(bValueTypeLen, kReadTimeout) + if err != nil { + return err + } + nlen += 3 + bytesToInt(bValueTypeLen[1:], &headerValueLen) + osr.Frame.Payload = append(osr.Frame.Payload, bValueTypeLen...) + + bHeaderValue := make([]byte, headerValueLen) + _, err = osr.fixedLengthRead(bHeaderValue, kReadTimeout) + if err != nil { + return err + } + nlen += int32(headerValueLen) + headers[headerName] = string(bHeaderValue) + osr.Frame.Payload = append(osr.Frame.Payload, bHeaderValue...) + } + htype, ok := headers[kMessageType] + if !ok { + return fmt.Errorf("header parse failed, no message-type, headers: %+v\n", headers) + } + switch { + case htype == "error": + osr.Frame.FrameType = kErrorFrameType + osr.Frame.ErrorFrame = &ErrorFrame{} + osr.Frame.ErrorFrame.Code, _ = headers[":error-code"] + osr.Frame.ErrorFrame.Message, _ = headers[":error-message"] + case htype == "event": + hevent, ok := headers[kEventType] + if !ok { + return fmt.Errorf("header parse failed, no event-type, headers: %+v\n", headers) + } + switch { + case hevent == "Records": + hContentType, ok := headers[kContentType] + if ok { + osr.Frame.DataFrame.ContentType = hContentType + } + osr.Frame.FrameType = kRecordsFrameType + case hevent == "Cont": + osr.Frame.FrameType = kContinuationFrameType + case hevent == "Progress": + osr.Frame.FrameType = kProgressFrameType + case hevent == "Stats": + osr.Frame.FrameType = kStatsFrameType + case hevent == "End": + osr.Frame.FrameType = kEndFrameType + default: + return fmt.Errorf("header parse failed, invalid event-type, headers: %+v\n", headers) + } + default: + return fmt.Errorf("header parse failed, invalid message-type: headers: %+v\n", headers) + } + return nil +} + +func (osr *ObjectSelectResponse) analysisRecords(data []byte) (int, error) { + var needReadLength int32 + dlen := int32(len(data)) + restLen := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength - osr.Frame.DataFrame.ConsumedBytesLength + if dlen <= restLen { + needReadLength = dlen + } else { + needReadLength = restLen + } + n, err := osr.fixedLengthRead(data[:needReadLength], kReadTimeout) + if err != nil { + return n, fmt.Errorf("read data frame error: %s", err.Error()) + } + osr.Frame.DataFrame.ConsumedBytesLength += int32(n) + osr.Frame.Payload = append(osr.Frame.Payload, data[:needReadLength]...) + // 读完了一帧数据并填充到data中了 + if osr.Frame.DataFrame.ConsumedBytesLength == osr.Frame.TotalFrameLength-16-osr.Frame.TotalHeaderLength { + osr.Frame.DataFrame.ConsumedBytesLength = 0 + err = osr.payloadChecksum("RecordFrame") + } + return n, err +} + +func (osr *ObjectSelectResponse) analysisXml(frame interface{}) error { + payloadLength := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength + bFrame := make([]byte, payloadLength) + _, err := osr.fixedLengthRead(bFrame, kReadTimeout) + if err != nil { + return err + } + err = xml.Unmarshal(bFrame, frame) + if err != nil { + return err + } + osr.Frame.Payload = append(osr.Frame.Payload, bFrame...) + return osr.payloadChecksum("XmlFrame") +} + +// 调用payloadChecksum时,表示该帧已读完,开始读取下一帧内容 +func (osr *ObjectSelectResponse) payloadChecksum(ftype string) error { + bcrc := make([]byte, 4) + _, err := osr.fixedLengthRead(bcrc, kReadTimeout) + if err != nil { + return err + } + var res uint32 + bytesToInt(bcrc, &res) + err = checksum(osr.Frame.Payload, res, ftype) + + osr.Frame.NextFrame = true + osr.Frame.Payload = []byte{} + + return err +} + +type chanReadIO struct { + readLen int + err error +} + +func (osr *ObjectSelectResponse) fixedLengthRead(p []byte, read_timeout int64) (int, error) { + timeout := time.Duration(read_timeout) + r := osr.Body + ch := make(chan chanReadIO, 1) + defer close(ch) + go func(p []byte) { + var needLen int + readChan := chanReadIO{} + needLen = len(p) + for { + n, err := r.Read(p[readChan.readLen:needLen]) + readChan.readLen += n + if err != nil { + readChan.err = err + ch <- readChan + return + } + + if readChan.readLen == needLen { + break + } + } + ch <- readChan + }(p) + + select { + case <-time.After(time.Second * timeout): + return 0, fmt.Errorf("requestId: %s, readLen timeout, timeout is %d(second),need read:%d", "sr.Headers.Get(HTTPHeaderOssRequestID)", timeout, len(p)) + case result := <-ch: + return result.readLen, result.err + } +} + +func bytesToInt(b []byte, ret interface{}) { + binBuf := bytes.NewBuffer(b) + binary.Read(binBuf, binary.BigEndian, ret) +} + +func checksum(b []byte, rec uint32, ftype string) error { + c := crc32.ChecksumIEEE(b) + if c != rec { + return fmt.Errorf("parse type: %v, checksum failed, cal: %v, rec: %v\n", ftype, c, rec) + } + return nil +} From dd7c41ca0824b607633e6a7bd0d3a838d52a91fd Mon Sep 17 00:00:00 2001 From: jojoliang Date: Fri, 17 Jul 2020 21:09:11 +0800 Subject: [PATCH 4/9] add ci post --- ci.go | 50 ++++++++++++++++++++++++++++++++++ example/object/ci_post.go | 68 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 example/object/ci_post.go diff --git a/ci.go b/ci.go index 2cf1713..6d6e973 100644 --- a/ci.go +++ b/ci.go @@ -1,7 +1,10 @@ package cos import ( + "context" "encoding/json" + "encoding/xml" + "net/http" ) type PicOperations struct { @@ -22,3 +25,50 @@ func EncodePicOperations(pic *PicOperations) string { } return string(bs) } + +type CloudImageReuslt struct { + XMLName xml.Name `xml:"UploadResult"` + OriginalInfo *PicOriginalInfo `xml:"OriginalInfo,omitempty"` + ProcessObject *PicProcessObject `xml:"ProcessResults>Object,omitempty"` +} + +type PicOriginalInfo struct { + Key string `xml:"Key,omitempty"` + Location string `xml:"Location,omitempty"` + ImageInfo *PicImageInfo `xml:"ImageInfo,omitempty"` +} + +type PicImageInfo struct { + Format string `xml:"Format,omitempty"` + Width int `xml:"Width,omitempty"` + Height int `xml:"Height,omitempty"` + Size int `xml:"Size,omitempty"` + Quality int `xml:"Quality,omitempty"` +} + +type PicProcessObject struct { + Key string `xml:"Key,omitempty"` + Location string `xml:"Location,omitempty"` + Format string `xml:"Format,omitempty"` + Width int `xml:"Width,omitempty"` + Height int `xml:"Height,omitempty"` + Size int `xml:"Size,omitempty"` + Quality int `xml:"Quality,omitempty"` +} + +type CloudImageOptions struct { + PicOperations string `header:"Pic-Operations" xml:"-" url:"-"` +} + +func (s *ObjectService) PostCI(ctx context.Context, name string, opt *CloudImageOptions) (*CloudImageReuslt, *Response, error) { + var res CloudImageReuslt + sendOpt := sendOptions{ + baseURL: s.client.BaseURL.BucketURL, + uri: "/" + encodeURIComponent(name) + "?image_process", + method: http.MethodPost, + optHeader: opt, + result: &res, + } + resp, err := s.client.send(ctx, &sendOpt) + return &res, resp, err +} diff --git a/example/object/ci_post.go b/example/object/ci_post.go new file mode 100644 index 0000000..380db87 --- /dev/null +++ b/example/object/ci_post.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "encoding/xml" + "fmt" + "net/http" + "net/url" + "os" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + // Notice when put a large file and set need the request body, might happend out of memory error. + RequestBody: false, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + + pic := &cos.PicOperations{ + IsPicInfo: 1, + Rules: []cos.PicOperationsRules{ + { + FileId: "format.jpg", + Rule: "imageView2/format/png", + }, + }, + } + opt := &cos.CloudImageOptions{ + PicOperations: cos.EncodePicOperations(pic), + } + name := "test.jpg" + res, _, err := c.Object.PostCI(context.Background(), name, opt) + data, _ := xml.Marshal(res) + fmt.Printf("%+v\n", string(data)) + log_status(err) +} From 64d31f318a223866ac16c64bbe9effcd168b696d Mon Sep 17 00:00:00 2001 From: jojoliang Date: Wed, 22 Jul 2020 14:46:26 +0800 Subject: [PATCH 5/9] =?UTF-8?q?add=20ListMultiUploads=20&&=20=E6=96=AD?= =?UTF-8?q?=E7=82=B9=E7=BB=AD=E4=BC=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cos.go | 2 +- example/object/list_uploads.go | 99 +++++++++++++++++++ object.go | 209 ++++++++++++++++++++--------------------- object_part.go | 47 +++++++++ 4 files changed, 251 insertions(+), 106 deletions(-) create mode 100644 example/object/list_uploads.go diff --git a/cos.go b/cos.go index 665084f..42ca376 100644 --- a/cos.go +++ b/cos.go @@ -22,7 +22,7 @@ import ( const ( // Version current go sdk version - Version = "0.7.10" + Version = "0.7.11" userAgent = "cos-go-sdk-v5/" + Version contentTypeXML = "application/xml" defaultServiceBaseURL = "http://service.cos.myqcloud.com" diff --git a/example/object/list_uploads.go b/example/object/list_uploads.go new file mode 100644 index 0000000..f2bf8aa --- /dev/null +++ b/example/object/list_uploads.go @@ -0,0 +1,99 @@ +package main + +import ( + "context" + "fmt" + "math/rand" + "net/url" + "os" + "strings" + + "net/http" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func initUpload(c *cos.Client, name string) *cos.InitiateMultipartUploadResult { + v, _, err := c.Object.InitiateMultipartUpload(context.Background(), name, nil) + log_status(err) + fmt.Printf("%#v\n", v) + return v +} + +func uploadPart(c *cos.Client, name string, uploadID string, blockSize, n int) string { + + b := make([]byte, blockSize) + if _, err := rand.Read(b); err != nil { + log_status(err) + } + s := fmt.Sprintf("%X", b) + f := strings.NewReader(s) + + resp, err := c.Object.UploadPart( + context.Background(), name, uploadID, n, f, nil, + ) + log_status(err) + fmt.Printf("%s\n", resp.Status) + return resp.Header.Get("Etag") +} + +func main() { + u, _ := url.Parse("http://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + RequestBody: false, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + + name := "test/test_list_parts.go" + up := initUpload(c, name) + uploadID := up.UploadID + ctx := context.Background() + blockSize := 1024 * 1024 * 3 + + for i := 1; i < 5; i++ { + uploadPart(c, name, uploadID, blockSize, i) + } + opt := &cos.ObjectListUploadsOptions{ + Prefix: cos.EncodeURIComponent("test/test_list_parts"), + MaxUploads: 100, + } + v, _, err := c.Object.ListUploads(context.Background(), opt) + if err != nil { + log_status(err) + return + } + fmt.Printf("%+v\n", v) + for _, p := range v.Upload { + fmt.Printf("%+v\n", p) + fmt.Printf("%v, %v, %v\n", p.Key, p.UploadID, p.Initiated) + } +} diff --git a/object.go b/object.go index 0c348fc..1cea83f 100644 --- a/object.go +++ b/object.go @@ -2,6 +2,7 @@ package cos import ( "context" + "crypto/md5" "encoding/xml" "errors" "fmt" @@ -32,7 +33,8 @@ type ObjectGetOptions struct { XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"` XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"` XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` - XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` + + XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` } // presignedURLTestingOptions is the opt of presigned url @@ -148,9 +150,8 @@ type ObjectPutHeaderOptions struct { XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"` XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` //兼容其他自定义头部 - XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"` - - XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` + XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"` + XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` } // ObjectPutOptions the options of put object @@ -485,21 +486,10 @@ type Object struct { // MultiUploadOptions is the option of the multiupload, // ThreadPoolSize default is one type MultiUploadOptions struct { - OptIni *InitiateMultipartUploadOptions - PartSize int64 - ThreadPoolSize int - CheckPointFile string - EnableCheckpoint bool -} - -type CheckPointOptions struct { - cpfile *os.File - Key string `xml:"Key"` - FilePath string `xml:"FilePath"` - FileSize int64 `xml:"FileSize"` - PartSize int64 `xml:"PartSize"` - UploadID string `xml:"UploadID"` - Parts []Object `xml:"Parts>Part,omitempty"` + OptIni *InitiateMultipartUploadOptions + PartSize int64 + ThreadPoolSize int + CheckPoint bool } type Chunk struct { @@ -574,73 +564,34 @@ func DividePart(fileSize int64) (int64, int64) { return partNum, partSize } -func SplitFileIntoChunks(name, filePath string, opt *MultiUploadOptions) (*CheckPointOptions, []Chunk, int, error) { +func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) { if filePath == "" { - return nil, nil, 0, errors.New("filePath invalid") + return nil, 0, errors.New("filePath invalid") } file, err := os.Open(filePath) if err != nil { - return nil, nil, 0, err + return nil, 0, err } defer file.Close() stat, err := file.Stat() if err != nil { - return nil, nil, 0, err - } - - optcp := &CheckPointOptions{} - uploaded := false - if opt.EnableCheckpoint { - for { - optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDONLY|os.O_CREATE, 0644) - if err != nil { - return nil, nil, 0, errors.New("open(create) checkpoint file[" + opt.CheckPointFile + "] failed, error:" + err.Error()) - } - defer optcp.cpfile.Close() - bs, err := ioutil.ReadAll(optcp.cpfile) - if err != nil { - break - } - err = xml.Unmarshal(bs, optcp) - if err != nil { - break - } - if optcp.Key != name || optcp.FilePath != filePath || optcp.FileSize != stat.Size() { - optcp = &CheckPointOptions{} - break - } - uploaded = true - break - } - optcp.Key = name - optcp.FilePath = filePath - optcp.FileSize = stat.Size() + return nil, 0, err } - var partNum int64 - partSize := opt.PartSize - if uploaded { - partSize = optcp.PartSize - } if partSize > 0 { partSize = partSize * 1024 * 1024 partNum = stat.Size() / partSize if partNum >= 10000 { - return nil, nil, 0, errors.New("Too many parts, out of 10000") + return nil, 0, errors.New("Too many parts, out of 10000") } } else { partNum, partSize = DividePart(stat.Size()) } - if opt.EnableCheckpoint { - optcp.PartSize = partSize / 1024 / 1024 - } var chunks []Chunk - var chunk = Chunk{ - Done: false, - } + var chunk = Chunk{} for i := int64(0); i < partNum; i++ { chunk.Number = int(i + 1) chunk.OffSet = i * partSize @@ -656,14 +607,84 @@ func SplitFileIntoChunks(name, filePath string, opt *MultiUploadOptions) (*Check partNum++ } - if uploaded { - for _, part := range optcp.Parts { - if part.PartNumber <= int(partNum) { - chunks[(part.PartNumber - 1)].Done = true - } + return chunks, int(partNum), nil + +} + +func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) { + opt := &ObjectListUploadsOptions{ + Prefix: name, + EncodingType: "url", + } + res, _, err := s.ListUploads(ctx, opt) + if err != nil { + return "", err + } + if len(res.Upload) == 0 { + return "", nil + } + last := len(res.Upload) - 1 + for last >= 0 { + decodeKey, _ := decodeURIComponent(res.Upload[last].Key) + if decodeKey == name { + return decodeURIComponent(res.Upload[last].UploadID) } + last = last - 1 } - return optcp, chunks, int(partNum), nil + return "", nil +} + +func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error { + var err error + var uploadedParts []Object + isTruncated := true + opt := &ObjectListPartsOptions{ + EncodingType: "url", + } + for isTruncated { + res, _, err := s.ListParts(ctx, name, UploadID, opt) + if err != nil { + return err + } + if len(res.Parts) > 0 { + uploadedParts = append(uploadedParts, res.Parts...) + } + isTruncated = res.IsTruncated + opt.PartNumberMarker = res.NextPartNumberMarker + } + fd, err := os.Open(filepath) + if err != nil { + return err + } + defer fd.Close() + err = nil + for _, part := range uploadedParts { + partNumber := part.PartNumber + if partNumber > partNum { + err = errors.New("Part Number is not consistent") + break + } + partNumber = partNumber - 1 + fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET) + bs, e := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size)) + if e != nil { + err = e + break + } + localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs)) + if localMD5 != part.ETag { + err = errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber)) + break + } + chunks[partNumber].Done = true + } + // 某个分块出错, 重置chunks + if err != nil { + for _, chunk := range chunks { + chunk.Done = false + } + } + return err } // MultiUpload/Upload 为高级upload接口,并发分块上传 @@ -680,12 +701,8 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string if opt == nil { opt = &MultiUploadOptions{} } - if opt.EnableCheckpoint && opt.CheckPointFile == "" { - opt.CheckPointFile = fmt.Sprintf("%s.cp", filepath) - } - // 1.Get the file chunk - optcp, chunks, partNum, err := SplitFileIntoChunks(name, filepath, opt) + chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize) if err != nil { return nil, nil, err } @@ -706,25 +723,27 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string ETag: rsp.Header.Get("ETag"), } return result, rsp, nil - } - if opt.EnableCheckpoint { - optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDWR, 0644) - if err != nil { - return nil, nil, errors.New("open checkpoint file failed, error: " + err.Error()) + } + + var uploadID string + resumableFlag := false + if opt.CheckPoint { + var err error + uploadID, err = s.getResumableUploadID(ctx, name) + if err == nil && uploadID != "" { + err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum) + resumableFlag = (err == nil) } - defer optcp.cpfile.Close() } - uploadID := optcp.UploadID + // 2.Init optini := opt.OptIni - if uploadID == "" { - // 2.Init + if !resumableFlag { res, _, err := s.InitiateMultipartUpload(ctx, name, optini) if err != nil { return nil, nil, err } uploadID = res.UploadID - optcp.UploadID = uploadID } var poolSize int if opt.ThreadPoolSize > 0 { @@ -737,10 +756,6 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string chjobs := make(chan *Jobs, 100) chresults := make(chan *Results, 10000) optcom := &CompleteMultipartUploadOptions{} - if len(optcp.Parts) > 0 { - optcom.Parts = append(optcom.Parts, optcp.Parts...) - partNum -= len(optcp.Parts) - } // 3.Start worker for w := 1; w <= poolSize; w++ { @@ -784,26 +799,10 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string optcom.Parts = append(optcom.Parts, Object{ PartNumber: res.PartNumber, ETag: etag}, ) - if opt.EnableCheckpoint { - optcp.Parts = append(optcp.Parts, Object{ - PartNumber: res.PartNumber, ETag: etag}, - ) - err := optcp.cpfile.Truncate(0) - if err != nil { - continue - } - _, err = optcp.cpfile.Seek(0, os.SEEK_SET) - if err == nil { - xml.NewEncoder(optcp.cpfile).Encode(optcp) - } - } } sort.Sort(ObjectList(optcom.Parts)) v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom) - if opt.EnableCheckpoint && err == nil { - os.Remove(opt.CheckPointFile) - } return v, resp, err } diff --git a/object_part.go b/object_part.go index d2d3bc4..ee181b8 100644 --- a/object_part.go +++ b/object_part.go @@ -246,3 +246,50 @@ func (s *ObjectService) CopyPart(ctx context.Context, name, uploadID string, par } return &res, resp, err } + +type ObjectListUploadsOptions struct { + Delimiter string `url:"Delimiter,omitempty"` + EncodingType string `url:"EncodingType,omitempty"` + Prefix string `url:"Prefix"` + MaxUploads int `url:"MaxUploads"` + KeyMarker string `url:"KeyMarker"` + UploadIdMarker string `url:"UploadIDMarker"` +} + +type ObjectListUploadsResult struct { + XMLName xml.Name `xml:"ListMultipartUploadsResult"` + Bucket string `xml:"Bucket,omitempty"` + EncodingType string `xml:"Encoding-Type,omitempty"` + KeyMarker string `xml:"KeyMarker,omitempty"` + UploadIdMarker string `xml:"UploadIdMarker,omitempty"` + NextKeyMarker string `xml:"NextKeyMarker,omitempty"` + NextUploadIdMarker string `xml:"NextUploadIdMarker,omitempty"` + MaxUploads string `xml:"MaxUploads,omitempty"` + IsTruncated bool `xml:"IsTruncated,omitempty"` + Prefix string `xml:"Prefix,omitempty"` + Delimiter string `xml:"Delimiter,omitempty"` + Upload []ListUploadsResultUpload `xml:"Upload,omitempty"` + CommonPrefixes []string `xml:"CommonPrefixes>Prefix,omitempty"` +} + +type ListUploadsResultUpload struct { + Key string `xml:"Key,omitempty"` + UploadID string `xml:"UploadId,omitempty"` + StorageClass string `xml:"StorageClass,omitempty"` + Initiator *Initiator `xml:"Initiator,omitempty"` + Owner *Owner `xml:"Owner,omitempty"` + Initiated string `xml:"Initiated,omitempty"` +} + +func (s *ObjectService) ListUploads(ctx context.Context, opt *ObjectListUploadsOptions) (*ObjectListUploadsResult, *Response, error) { + var res ObjectListUploadsResult + sendOpt := &sendOptions{ + baseURL: s.client.BaseURL.BucketURL, + uri: "/?uploads", + method: http.MethodGet, + optQuery: opt, + result: &res, + } + resp, err := s.client.send(ctx, sendOpt) + return &res, resp, err +} From f6c91c92d6e46869c8f7acf000eccce2d351051b Mon Sep 17 00:00:00 2001 From: jojoliang Date: Wed, 22 Jul 2020 14:57:54 +0800 Subject: [PATCH 6/9] add decodeURIComponent --- helper.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/helper.go b/helper.go index 08c5a35..f0ed8f3 100644 --- a/helper.go +++ b/helper.go @@ -6,6 +6,7 @@ import ( "crypto/sha1" "fmt" "net/http" + "net/url" ) // 计算 md5 或 sha1 时的分块大小 @@ -83,3 +84,19 @@ func encodeURIComponent(s string) string { b.WriteString(s[written:]) return b.String() } + +func decodeURIComponent(s string) (string, error) { + decodeStr, err := url.QueryUnescape(s) + if err != nil { + return s, err + } + return decodeStr, err +} + +func DecodeURIComponent(s string) (string, error) { + return DecodeURIComponent(s) +} + +func EncodeURIComponent(s string) string { + return encodeURIComponent(s) +} From 5e69c19d3450d06c3da270ca12e43647432d0ca4 Mon Sep 17 00:00:00 2001 From: jojoliang Date: Wed, 22 Jul 2020 15:59:30 +0800 Subject: [PATCH 7/9] fix list uploads --- example/object/list_uploads.go | 1 - object.go | 30 +++++++++++++----------------- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/example/object/list_uploads.go b/example/object/list_uploads.go index f2bf8aa..86d74c4 100644 --- a/example/object/list_uploads.go +++ b/example/object/list_uploads.go @@ -76,7 +76,6 @@ func main() { name := "test/test_list_parts.go" up := initUpload(c, name) uploadID := up.UploadID - ctx := context.Background() blockSize := 1024 * 1024 * 3 for i := 1; i < 5; i++ { diff --git a/object.go b/object.go index 1cea83f..787c382 100644 --- a/object.go +++ b/object.go @@ -635,7 +635,6 @@ func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) ( } func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error { - var err error var uploadedParts []Object isTruncated := true opt := &ObjectListPartsOptions{ @@ -657,34 +656,31 @@ func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, return err } defer fd.Close() - err = nil + // 某个分块出错, 重置chunks + ret := func(e error) error { + for _, chunk := range chunks { + chunk.Done = false + } + return e + } for _, part := range uploadedParts { partNumber := part.PartNumber if partNumber > partNum { - err = errors.New("Part Number is not consistent") - break + return ret(errors.New("Part Number is not consistent")) } partNumber = partNumber - 1 fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET) - bs, e := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size)) - if e != nil { - err = e - break + bs, err := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size)) + if err != nil { + return ret(err) } localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs)) if localMD5 != part.ETag { - err = errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber)) - break + return ret(errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber))) } chunks[partNumber].Done = true } - // 某个分块出错, 重置chunks - if err != nil { - for _, chunk := range chunks { - chunk.Done = false - } - } - return err + return nil } // MultiUpload/Upload 为高级upload接口,并发分块上传 From 98eab2886c628a3fbbbf581aeea56759ff38700c Mon Sep 17 00:00:00 2001 From: jojoliang Date: Thu, 17 Sep 2020 20:43:41 +0800 Subject: [PATCH 8/9] =?UTF-8?q?add=20CI=E5=9B=BE=E7=89=87=E5=AE=A1?= =?UTF-8?q?=E6=A0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ci.go | 26 ++++++++++++++++++++ example/object/ci_get.go | 62 ++++++++++++++++++++++++++++++++++++++++++++++++ object.go | 4 ++++ 3 files changed, 92 insertions(+) create mode 100644 example/object/ci_get.go diff --git a/ci.go b/ci.go index 6d6e973..2064130 100644 --- a/ci.go +++ b/ci.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "encoding/xml" + "io" "net/http" ) @@ -72,3 +73,28 @@ func (s *ObjectService) PostCI(ctx context.Context, name string, opt *CloudImage resp, err := s.client.send(ctx, &sendOpt) return &res, resp, err } + +type CloudImageRecognitionInfo struct { + Code int `xml:"Code,omitempty"` + Msg string `xml:"Msg,omitempty"` + HitFlag int `xml:"HitFlag,omitempty"` + Score int `xml:"Score,omitempty"` + Label string `xml:"Label,omitempty"` + Count int `xml:"Count,omitempty"` +} + +type CloudImageRecognitionResult struct { + PornInfo *CloudImageRecognitionInfo `xml:"PornInfo,omitempty"` + TerroristInfo *CloudImageRecognitionInfo `xml:"TerroristInfo,omitempty"` + PoliticsInfo *CloudImageRecognitionInfo `xml:"PoliticsInfo,omitempty"` + AdsInfo *CloudImageRecognitionInfo `xml:"AdsInfo,omitempty"` +} + +func GetRecognitionResult(body io.ReadCloser) *CloudImageRecognitionResult { + var res CloudImageRecognitionResult + err := xml.NewDecoder(body).Decode(&res) + if err != nil && err != io.EOF { + return nil + } + return &res +} diff --git a/example/object/ci_get.go b/example/object/ci_get.go new file mode 100644 index 0000000..cb7b036 --- /dev/null +++ b/example/object/ci_get.go @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "net/url" + "os" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + RequestBody: true, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + opt := &cos.ObjectGetOptions{ + CIProcess: "sensitive-content-recognition", + CIDetectType: "porn,terrorist,politics,ads", + } + + // Case1 Download object into ReadCloser(). the body needs to be closed + name := "test.jpg" + resp, err := c.Object.Get(context.Background(), name, opt) + log_status(err) + resp.Body.Close() + res := cos.GetRecognitionResult(resp.Body) + if res != nil { + fmt.Printf("%+v\n", res) + } +} diff --git a/object.go b/object.go index 787c382..0e1b103 100644 --- a/object.go +++ b/object.go @@ -35,6 +35,10 @@ type ObjectGetOptions struct { XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` + + // CI 图片审核 + CIProcess string `header:"-" url:"ci-process" xml:"-"` + CIDetectType string `header:"-" url:"detect-type" xml:"-"` } // presignedURLTestingOptions is the opt of presigned url From 695c4466f5cec17acd5143777d4eae90f389f119 Mon Sep 17 00:00:00 2001 From: jojoliang Date: Mon, 12 Oct 2020 20:17:47 +0800 Subject: [PATCH 9/9] =?UTF-8?q?ci=20=E5=86=85=E5=AE=B9=E5=AE=A1=E6=A0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ci.go | 143 +++++++++++++++++++++++++++----- cos.go | 8 +- cos_test.go | 2 +- example/object/ci_get.go | 62 -------------- example/object/ci_image_process.go | 63 ++++++++++++++ example/object/ci_image_recognition.go | 56 +++++++++++++ example/object/ci_post.go | 68 --------------- example/object/ci_video_auditing_job.go | 71 ++++++++++++++++ object.go | 20 +++-- 9 files changed, 330 insertions(+), 163 deletions(-) delete mode 100644 example/object/ci_get.go create mode 100644 example/object/ci_image_process.go create mode 100644 example/object/ci_image_recognition.go delete mode 100644 example/object/ci_post.go create mode 100644 example/object/ci_video_auditing_job.go diff --git a/ci.go b/ci.go index 2064130..60fcff0 100644 --- a/ci.go +++ b/ci.go @@ -4,15 +4,15 @@ import ( "context" "encoding/json" "encoding/xml" - "io" "net/http" ) +type CIService service + type PicOperations struct { IsPicInfo int `json:"is_pic_info,omitempty"` Rules []PicOperationsRules `json:"rules,omitemtpy"` } - type PicOperationsRules struct { Bucket string `json:"bucket,omitempty"` FileId string `json:"fileid"` @@ -20,6 +20,9 @@ type PicOperationsRules struct { } func EncodePicOperations(pic *PicOperations) string { + if pic == nil { + return "" + } bs, err := json.Marshal(pic) if err != nil { return "" @@ -27,18 +30,16 @@ func EncodePicOperations(pic *PicOperations) string { return string(bs) } -type CloudImageReuslt struct { +type ImageProcessResult struct { XMLName xml.Name `xml:"UploadResult"` OriginalInfo *PicOriginalInfo `xml:"OriginalInfo,omitempty"` ProcessObject *PicProcessObject `xml:"ProcessResults>Object,omitempty"` } - type PicOriginalInfo struct { Key string `xml:"Key,omitempty"` Location string `xml:"Location,omitempty"` ImageInfo *PicImageInfo `xml:"ImageInfo,omitempty"` } - type PicImageInfo struct { Format string `xml:"Format,omitempty"` Width int `xml:"Width,omitempty"` @@ -46,7 +47,6 @@ type PicImageInfo struct { Size int `xml:"Size,omitempty"` Quality int `xml:"Quality,omitempty"` } - type PicProcessObject struct { Key string `xml:"Key,omitempty"` Location string `xml:"Location,omitempty"` @@ -57,24 +57,42 @@ type PicProcessObject struct { Quality int `xml:"Quality,omitempty"` } -type CloudImageOptions struct { +type picOperationsHeader struct { PicOperations string `header:"Pic-Operations" xml:"-" url:"-"` } -func (s *ObjectService) PostCI(ctx context.Context, name string, opt *CloudImageOptions) (*CloudImageReuslt, *Response, error) { - var res CloudImageReuslt +type ImageProcessOptions = PicOperations + +// 云上数据处理 https://cloud.tencent.com/document/product/460/18147 +func (s *CIService) ImageProcess(ctx context.Context, name string, opt *ImageProcessOptions) (*ImageProcessResult, *Response, error) { + header := &picOperationsHeader{ + PicOperations: EncodePicOperations(opt), + } + var res ImageProcessResult sendOpt := sendOptions{ baseURL: s.client.BaseURL.BucketURL, uri: "/" + encodeURIComponent(name) + "?image_process", method: http.MethodPost, - optHeader: opt, + optHeader: header, result: &res, } resp, err := s.client.send(ctx, &sendOpt) return &res, resp, err } -type CloudImageRecognitionInfo struct { +type ImageRecognitionOptions struct { + CIProcess string `url:"ci-process,omitempty"` + DetectType string `url:"detect-type,omitempty"` +} + +type ImageRecognitionResult struct { + XMLName xml.Name `xml:"RecognitionResult"` + PornInfo *RecognitionInfo `xml:"PornInfo,omitempty"` + TerroristInfo *RecognitionInfo `xml:"TerroristInfo,omitempty"` + PoliticsInfo *RecognitionInfo `xml:"PoliticsInfo,omitempty"` + AdsInfo *RecognitionInfo `xml:"AdsInfo,omitempty"` +} +type RecognitionInfo struct { Code int `xml:"Code,omitempty"` Msg string `xml:"Msg,omitempty"` HitFlag int `xml:"HitFlag,omitempty"` @@ -83,18 +101,99 @@ type CloudImageRecognitionInfo struct { Count int `xml:"Count,omitempty"` } -type CloudImageRecognitionResult struct { - PornInfo *CloudImageRecognitionInfo `xml:"PornInfo,omitempty"` - TerroristInfo *CloudImageRecognitionInfo `xml:"TerroristInfo,omitempty"` - PoliticsInfo *CloudImageRecognitionInfo `xml:"PoliticsInfo,omitempty"` - AdsInfo *CloudImageRecognitionInfo `xml:"AdsInfo,omitempty"` +// 图片审核 https://cloud.tencent.com/document/product/460/37318 +func (s *CIService) ImageRecognition(ctx context.Context, name string, opt *ImageRecognitionOptions) (*ImageRecognitionResult, *Response, error) { + if opt != nil && opt.CIProcess == "" { + opt.CIProcess = "sensitive-content-recognition" + } + var res ImageRecognitionResult + sendOpt := sendOptions{ + baseURL: s.client.BaseURL.BucketURL, + uri: "/" + encodeURIComponent(name), + method: http.MethodGet, + optQuery: opt, + result: &res, + } + resp, err := s.client.send(ctx, &sendOpt) + return &res, resp, err +} + +type PutVideoAuditingJobOptions struct { + XMLName xml.Name `xml:"Request"` + InputObject string `xml:"Input>Object"` + Conf *VideoAuditingJobConf `xml:"Conf"` +} +type VideoAuditingJobConf struct { + DetectType string `xml:",omitempty"` + Snapshot *PutVideoAuditingJobSnapshot `xml:",omitempty"` + Callback string `xml:",omitempty"` +} +type PutVideoAuditingJobSnapshot struct { + Mode string `xml:",omitempty"` + Count int `xml:",omitempty"` + TimeInterval float32 `xml:",omitempty"` + Start float32 `xml:",omitempty"` +} + +type PutVideoAuditingJobResult struct { + XMLName xml.Name `xml:"Response"` + JobsDetail struct { + JobId string `xml:"JobId,omitempty"` + State string `xml:"State,omitempty"` + CreationTime string `xml:"CreationTime,omitempty"` + Object string `xml:"Object,omitempty"` + } `xml:"JobsDetail,omitempty"` +} + +func (s *CIService) PutVideoAuditingJob(ctx context.Context, opt *PutVideoAuditingJobOptions) (*PutVideoAuditingJobResult, *Response, error) { + var res PutVideoAuditingJobResult + sendOpt := sendOptions{ + baseURL: s.client.BaseURL.CIURL, + uri: "/video/auditing", + method: http.MethodPost, + body: opt, + result: &res, + } + resp, err := s.client.send(ctx, &sendOpt) + return &res, resp, err +} + +type GetVideoAuditingJobResult struct { + XMLName xml.Name `xml:"Response"` + JobsDetail *VideoAuditingJobDetail `xml:",omitempty"` + NonExistJobIds string `xml:",omitempty"` +} +type VideoAuditingJobDetail struct { + Code string `xml:",omitempty"` + Message string `xml:",omitempty"` + JobId string `xml:",omitempty"` + State string `xml:",omitempty"` + CreationTime string `xml:",omitempty"` + Object string `xml:",omitempty"` + SnapshotCount string `xml:",omitempty"` + result int `xml:",omitempty"` + PornInfo *RecognitionInfo `xml:",omitempty"` + TerrorismInfo *RecognitionInfo `xml:",omitempty"` + PoliticsInfo *RecognitionInfo `xml:",omitempty"` + AdsInfo *RecognitionInfo `xml:",omitempty"` + Snapshot *GetVideoAuditingJobSnapshot `xml:",omitempty"` +} +type GetVideoAuditingJobSnapshot struct { + Url string `xml:",omitempty"` + PornInfo *RecognitionInfo `xml:",omitempty"` + TerrorismInfo *RecognitionInfo `xml:",omitempty"` + PoliticsInfo *RecognitionInfo `xml:",omitempty"` + AdsInfo *RecognitionInfo `xml:",omitempty"` } -func GetRecognitionResult(body io.ReadCloser) *CloudImageRecognitionResult { - var res CloudImageRecognitionResult - err := xml.NewDecoder(body).Decode(&res) - if err != nil && err != io.EOF { - return nil +func (s *CIService) GetVideoAuditingJob(ctx context.Context, jobid string) (*GetVideoAuditingJobResult, *Response, error) { + var res GetVideoAuditingJobResult + sendOpt := sendOptions{ + baseURL: s.client.BaseURL.CIURL, + uri: "/video/auditing/" + jobid, + method: http.MethodGet, + result: &res, } - return &res + resp, err := s.client.send(ctx, &sendOpt) + return &res, resp, err } diff --git a/cos.go b/cos.go index 42ca376..b3a7d6c 100644 --- a/cos.go +++ b/cos.go @@ -42,6 +42,8 @@ type BaseURL struct { ServiceURL *url.URL // 访问 job API 的基础 URL (不包含 path 部分): http://example.com BatchURL *url.URL + // 访问 CI 的基础 URL + CIURL *url.URL } // NewBucketURL 生成 BaseURL 所需的 BucketURL @@ -82,6 +84,7 @@ type Client struct { Bucket *BucketService Object *ObjectService Batch *BatchService + CI *CIService } type service struct { @@ -99,6 +102,7 @@ func NewClient(uri *BaseURL, httpClient *http.Client) *Client { baseURL.BucketURL = uri.BucketURL baseURL.ServiceURL = uri.ServiceURL baseURL.BatchURL = uri.BatchURL + baseURL.CIURL = uri.CIURL } if baseURL.ServiceURL == nil { baseURL.ServiceURL, _ = url.Parse(defaultServiceBaseURL) @@ -114,6 +118,7 @@ func NewClient(uri *BaseURL, httpClient *http.Client) *Client { c.Bucket = (*BucketService)(&c.common) c.Object = (*ObjectService)(&c.common) c.Batch = (*BatchService)(&c.common) + c.CI = (*CIService)(&c.common) return c } @@ -244,9 +249,6 @@ func (c *Client) send(ctx context.Context, opt *sendOptions) (resp *Response, er } resp, err = c.doAPI(ctx, req, opt.result, !opt.disableCloseBody) - if err != nil { - return - } return } diff --git a/cos_test.go b/cos_test.go index f312779..ecdcebd 100644 --- a/cos_test.go +++ b/cos_test.go @@ -32,7 +32,7 @@ func setup() { server = httptest.NewServer(mux) u, _ := url.Parse(server.URL) - client = NewClient(&BaseURL{u, u, u}, nil) + client = NewClient(&BaseURL{u, u, u, u}, nil) } // teardown closes the test HTTP server. diff --git a/example/object/ci_get.go b/example/object/ci_get.go deleted file mode 100644 index cb7b036..0000000 --- a/example/object/ci_get.go +++ /dev/null @@ -1,62 +0,0 @@ -package main - -import ( - "context" - "fmt" - "net/http" - "net/url" - "os" - - "github.com/tencentyun/cos-go-sdk-v5" - "github.com/tencentyun/cos-go-sdk-v5/debug" -) - -func log_status(err error) { - if err == nil { - return - } - if cos.IsNotFoundError(err) { - // WARN - fmt.Println("WARN: Resource is not existed") - } else if e, ok := cos.IsCOSError(err); ok { - fmt.Printf("ERROR: Code: %v\n", e.Code) - fmt.Printf("ERROR: Message: %v\n", e.Message) - fmt.Printf("ERROR: Resource: %v\n", e.Resource) - fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) - // ERROR - } else { - fmt.Printf("ERROR: %v\n", err) - // ERROR - } -} - -func main() { - u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") - b := &cos.BaseURL{BucketURL: u} - c := cos.NewClient(b, &http.Client{ - Transport: &cos.AuthorizationTransport{ - SecretID: os.Getenv("COS_SECRETID"), - SecretKey: os.Getenv("COS_SECRETKEY"), - Transport: &debug.DebugRequestTransport{ - RequestHeader: true, - RequestBody: true, - ResponseHeader: true, - ResponseBody: true, - }, - }, - }) - opt := &cos.ObjectGetOptions{ - CIProcess: "sensitive-content-recognition", - CIDetectType: "porn,terrorist,politics,ads", - } - - // Case1 Download object into ReadCloser(). the body needs to be closed - name := "test.jpg" - resp, err := c.Object.Get(context.Background(), name, opt) - log_status(err) - resp.Body.Close() - res := cos.GetRecognitionResult(resp.Body) - if res != nil { - fmt.Printf("%+v\n", res) - } -} diff --git a/example/object/ci_image_process.go b/example/object/ci_image_process.go new file mode 100644 index 0000000..6fc1584 --- /dev/null +++ b/example/object/ci_image_process.go @@ -0,0 +1,63 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "net/url" + "os" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + // Notice when put a large file and set need the request body, might happend out of memory error. + RequestBody: false, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + + opt := &cos.ImageProcessOptions{ + IsPicInfo: 1, + Rules: []cos.PicOperationsRules{ + { + FileId: "format.jpg", + Rule: "imageView2/format/png", + }, + }, + } + name := "test.jpg" + res, _, err := c.CI.ImageProcess(context.Background(), name, opt) + log_status(err) + fmt.Printf("%+v\n", res) +} diff --git a/example/object/ci_image_recognition.go b/example/object/ci_image_recognition.go new file mode 100644 index 0000000..a17fd18 --- /dev/null +++ b/example/object/ci_image_recognition.go @@ -0,0 +1,56 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "net/url" + "os" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + RequestBody: true, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + opt := &cos.ImageRecognitionOptions{ + DetectType: "porn,terrorist,politics", + } + + name := "test.jpg" + res, _, err := c.CI.ImageRecognition(context.Background(), name, opt) + log_status(err) + fmt.Printf("%+v\n", res) +} diff --git a/example/object/ci_post.go b/example/object/ci_post.go deleted file mode 100644 index 380db87..0000000 --- a/example/object/ci_post.go +++ /dev/null @@ -1,68 +0,0 @@ -package main - -import ( - "context" - "encoding/xml" - "fmt" - "net/http" - "net/url" - "os" - - "github.com/tencentyun/cos-go-sdk-v5" - "github.com/tencentyun/cos-go-sdk-v5/debug" -) - -func log_status(err error) { - if err == nil { - return - } - if cos.IsNotFoundError(err) { - // WARN - fmt.Println("WARN: Resource is not existed") - } else if e, ok := cos.IsCOSError(err); ok { - fmt.Printf("ERROR: Code: %v\n", e.Code) - fmt.Printf("ERROR: Message: %v\n", e.Message) - fmt.Printf("ERROR: Resource: %v\n", e.Resource) - fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) - // ERROR - } else { - fmt.Printf("ERROR: %v\n", err) - // ERROR - } -} - -func main() { - u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") - b := &cos.BaseURL{BucketURL: u} - c := cos.NewClient(b, &http.Client{ - Transport: &cos.AuthorizationTransport{ - SecretID: os.Getenv("COS_SECRETID"), - SecretKey: os.Getenv("COS_SECRETKEY"), - Transport: &debug.DebugRequestTransport{ - RequestHeader: true, - // Notice when put a large file and set need the request body, might happend out of memory error. - RequestBody: false, - ResponseHeader: true, - ResponseBody: true, - }, - }, - }) - - pic := &cos.PicOperations{ - IsPicInfo: 1, - Rules: []cos.PicOperationsRules{ - { - FileId: "format.jpg", - Rule: "imageView2/format/png", - }, - }, - } - opt := &cos.CloudImageOptions{ - PicOperations: cos.EncodePicOperations(pic), - } - name := "test.jpg" - res, _, err := c.Object.PostCI(context.Background(), name, opt) - data, _ := xml.Marshal(res) - fmt.Printf("%+v\n", string(data)) - log_status(err) -} diff --git a/example/object/ci_video_auditing_job.go b/example/object/ci_video_auditing_job.go new file mode 100644 index 0000000..9a5cbb7 --- /dev/null +++ b/example/object/ci_video_auditing_job.go @@ -0,0 +1,71 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "net/url" + "os" + "time" + + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + bu, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + cu, _ := url.Parse("https://test-1259654469.ci.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: bu, CIURL: cu} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: true, + RequestBody: true, + ResponseHeader: true, + ResponseBody: true, + }, + }, + }) + opt := &cos.PutVideoAuditingJobOptions{ + InputObject: "demo.mp4", + Conf: &cos.VideoAuditingJobConf{ + DetectType: "Porn,Terrorism,Politics,Ads", + Snapshot: &cos.PutVideoAuditingJobSnapshot{ + Mode: "Interval", + Start: 0.5, + TimeInterval: 50.5, + Count: 100, + }, + }, + } + + res, _, err := c.CI.PutVideoAuditingJob(context.Background(), opt) + log_status(err) + fmt.Printf("%+v\n", res) + + time.Sleep(3 * time.Second) + res2, _, err := c.CI.GetVideoAuditingJob(context.Background(), res.JobsDetail.JobId) + log_status(err) + fmt.Printf("%+v\n", res2) +} diff --git a/object.go b/object.go index 0e1b103..9b963ca 100644 --- a/object.go +++ b/object.go @@ -35,10 +35,6 @@ type ObjectGetOptions struct { XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` - - // CI 图片审核 - CIProcess string `header:"-" url:"ci-process" xml:"-"` - CIDetectType string `header:"-" url:"detect-type" xml:"-"` } // presignedURLTestingOptions is the opt of presigned url @@ -501,6 +497,7 @@ type Chunk struct { OffSet int64 Size int64 Done bool + ETag string } // jobs @@ -662,8 +659,9 @@ func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, defer fd.Close() // 某个分块出错, 重置chunks ret := func(e error) error { - for _, chunk := range chunks { - chunk.Done = false + for i, _ := range chunks { + chunks[i].Done = false + chunks[i].ETag = "" } return e } @@ -683,6 +681,7 @@ func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, return ret(errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber))) } chunks[partNumber].Done = true + chunks[partNumber].ETag = part.ETag } return nil } @@ -706,6 +705,7 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string if err != nil { return nil, nil, err } + // filesize=0 , use simple upload if partNum == 0 { var opt0 *ObjectPutOptions if opt.OptIni != nil { @@ -787,7 +787,13 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string close(chjobs) // 5.Recv the resp etag to complete - for i := 1; i <= partNum; i++ { + for i := 0; i < partNum; i++ { + if chunks[i].Done { + optcom.Parts = append(optcom.Parts, Object{ + PartNumber: chunks[i].Number, ETag: chunks[i].ETag}, + ) + continue + } res := <-chresults // Notice one part fail can not get the etag according. if res.Resp == nil || res.err != nil {