diff --git a/example/object/MutiUpload.go b/example/object/MutiUpload.go index a93bfe7..282f7af 100644 --- a/example/object/MutiUpload.go +++ b/example/object/MutiUpload.go @@ -2,10 +2,10 @@ package main import ( "context" + "net/http" "net/url" "os" "time" - "net/http" "fmt" "github.com/tencentyun/cos-go-sdk-v5" @@ -13,14 +13,14 @@ import ( ) func main() { - u, _ := url.Parse("http://tencentyun02-1252448703.cos.ap-guangzhou.myqcloud.com") + u, _ := url.Parse("http://alanbj-1251668577.cos.ap-beijing.myqcloud.com") b := &cos.BaseURL{BucketURL: u} c := cos.NewClient(b, &http.Client{ //设置超时时间 Timeout: 100 * time.Second, Transport: &cos.AuthorizationTransport{ - SecretID: os.Getenv("COS_Key"), - SecretKey: os.Getenv("COS_Secret"), + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), Transport: &debug.DebugRequestTransport{ RequestHeader: false, RequestBody: false, @@ -29,15 +29,16 @@ func main() { }, }, }) - f,err:=os.Open("E:/cos-php-sdk.zip") - if err!=nil {panic(err)} + opt := &cos.MultiUploadOptions{ - OptIni: nil, - PartSize:1, + OptIni: nil, + PartSize: 1, } v, _, err := c.Object.MultiUpload( - context.Background(), "test", f, opt, + context.Background(), "test/gomulput1G", "./test1G", opt, ) - if err!=nil {panic(err)} + if err != nil { + panic(err) + } fmt.Println(v) -} \ No newline at end of file +} diff --git a/object.go b/object.go index 483561f..c88dc02 100644 --- a/object.go +++ b/object.go @@ -9,7 +9,7 @@ import ( "net/http" "net/url" "os" - "strings" + "sort" "time" ) @@ -421,9 +421,114 @@ type Object struct { Owner *Owner `xml:",omitempty"` } +// MultiUploadOptions is the option of the multiupload, +// ThreadPoolSize default is one type MultiUploadOptions struct { - OptIni *InitiateMultipartUploadOptions - PartSize int + OptIni *InitiateMultipartUploadOptions + PartSize int64 + ThreadPoolSize int +} + +type Chunk struct { + Number int + OffSet int64 + Size int64 +} + +// jobs +type Jobs struct { + Name string + UploadId string + FilePath string + RetryTimes int + Chunk Chunk + Data io.Reader + Opt *ObjectUploadPartOptions +} + +type Results struct { + PartNumber int + Resp *Response +} + +func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { + for j := range jobs { + fd, err := os.Open(j.FilePath) + var res Results + if err != nil { + res.PartNumber = j.Chunk.Number + res.Resp = nil + results <- &res + } + + fd.Seek(j.Chunk.OffSet, os.SEEK_SET) + // UploadPart do not support the chunk trsf, so need to add the content-length + opt := &ObjectUploadPartOptions{ + ContentLength: int(j.Chunk.Size), + } + + rt := j.RetryTimes + for { + resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number, + &io.LimitedReader{R: fd, N: j.Chunk.Size}, opt) + res.PartNumber = j.Chunk.Number + res.Resp = resp + if err != nil { + rt-- + if rt == 0 { + fd.Close() + results <- &res + break + } + continue + } + fd.Close() + results <- &res + break + } + } +} + +func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) { + if filePath == "" || partSize <= 0 { + return nil, 0, errors.New("chunkSize invalid") + } + + file, err := os.Open(filePath) + if err != nil { + return nil, 0, err + } + defer file.Close() + + stat, err := file.Stat() + if err != nil { + return nil, 0, err + } + var partNum = stat.Size() / partSize + // 10000 max part size + if partNum >= 10000 { + return nil, 0, errors.New("Too many parts, out of 10000") + } + + var chunks []Chunk + var chunk = Chunk{} + for i := int64(0); i < partNum; i++ { + chunk.Number = int(i + 1) + chunk.OffSet = i * partSize + chunk.Size = partSize + chunks = append(chunks, chunk) + } + + if stat.Size()%partSize > 0 { + chunk.Number = len(chunks) + 1 + chunk.OffSet = int64(len(chunks)) * partSize + chunk.Size = stat.Size() % partSize + chunks = append(chunks, chunk) + partNum++ + } + + return chunks, int(partNum), nil + } // MultiUpload 为高级upload接口,并发分块上传 @@ -433,59 +538,66 @@ type MultiUploadOptions struct { // 同时请确认分块数量不超过10000 // -func (s *ObjectService) MultiUpload(ctx context.Context, name string, r io.Reader, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) { +func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) { + // 1.Get the file chunk + bufSize := opt.PartSize * 1024 * 1024 + chunks, partNum, err := SplitFileIntoChunks(filepath, bufSize) + if err != nil { + return nil, nil, err + } + // 2.Init optini := opt.OptIni res, _, err := s.InitiateMultipartUpload(ctx, name, optini) if err != nil { return nil, nil, err } uploadID := res.UploadID - bufSize := opt.PartSize * 1024 * 1024 - buffer := make([]byte, bufSize) - optcom := &CompleteMultipartUploadOptions{} + var poolSize int + if opt.ThreadPoolSize > 0 { + poolSize = opt.ThreadPoolSize + } else { + // Default is one + poolSize = 1 + } - PartUpload := func(ch chan *Response, ctx context.Context, name string, uploadId string, partNumber int, data io.Reader, opt *ObjectUploadPartOptions) { + chjobs := make(chan *Jobs, 100) + chresults := make(chan *Results, 10000) + optcom := &CompleteMultipartUploadOptions{} - defer func() { - if err := recover(); err != nil { - fmt.Println(err) - } - }() - resp, _ := s.UploadPart(context.Background(), name, uploadId, partNumber, data, nil) - ch <- resp + // 3.Start worker + for w := 1; w <= poolSize; w++ { + go worker(s, chjobs, chresults) } - chs := make([]chan *Response, 10000) - PartNumber := 0 - for i := 1; true; i++ { - bytesread, err := r.Read(buffer) - if err != nil { - if err != io.EOF { - return nil, nil, err - } - // If read fail also need to create i index respon in chan, - // in case below out of index to panic. - chs[i] = make(chan *Response) - PartNumber = i - break + // 4.Push jobs + for _, chunk := range chunks { + job := &Jobs{ + Name: name, + RetryTimes: 3, + FilePath: filepath, + UploadId: uploadID, + Chunk: chunk, } - chs[i] = make(chan *Response) - go PartUpload(chs[i], context.Background(), name, uploadID, i, strings.NewReader(string(buffer[:bytesread])), nil) + chjobs <- job } + close(chjobs) - for i := 1; i < PartNumber; i++ { - resp := <-chs[i] + // 5.Recv the resp etag to complete + for i := 1; i <= partNum; i++ { + res := <-chresults // Notice one part fail can not get the etag according. - if resp == nil { + if res.Resp == nil { // Some part already fail, can not to get the header inside. - return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, i) + return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, res.PartNumber) } - etag := resp.Header.Get("ETag") + // Notice one part fail can not get the etag according. + etag := res.Resp.Header.Get("ETag") optcom.Parts = append(optcom.Parts, Object{ - PartNumber: i, ETag: etag}, + PartNumber: res.PartNumber, ETag: etag}, ) } + sort.Sort(ObjectList(optcom.Parts)) v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)