From 287669a677af36153d645aabd4b296114aea75fe Mon Sep 17 00:00:00 2001 From: jojoliang Date: Tue, 21 Jan 2020 12:04:45 +0800 Subject: [PATCH] add checkpoint multi upload --- object.go | 137 +++++++++++++++++++++++++++++++++++++++++++++++++-------- object_part.go | 2 +- 2 files changed, 119 insertions(+), 20 deletions(-) diff --git a/object.go b/object.go index d44e351..514c5c2 100644 --- a/object.go +++ b/object.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net/http" "net/url" "os" @@ -481,15 +482,28 @@ type Object struct { // MultiUploadOptions is the option of the multiupload, // ThreadPoolSize default is one type MultiUploadOptions struct { - OptIni *InitiateMultipartUploadOptions - PartSize int64 - ThreadPoolSize int + OptIni *InitiateMultipartUploadOptions + PartSize int64 + ThreadPoolSize int + CheckPointFile string + EnableCheckpoint bool +} + +type CheckPointOptions struct { + cpfile *os.File + Key string `xml:"Key"` + FilePath string `xml:"FilePath"` + FileSize int64 `xml:"FileSize"` + PartSize int64 `xml:"PartSize"` + UploadID string `xml:"UploadID"` + Parts []Object `xml:"Parts>Part,omitempty"` } type Chunk struct { Number int OffSet int64 Size int64 + Done bool } // jobs @@ -506,6 +520,7 @@ type Jobs struct { type Results struct { PartNumber int Resp *Response + err error } func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { @@ -513,6 +528,7 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { fd, err := os.Open(j.FilePath) var res Results if err != nil { + res.err = err res.PartNumber = j.Chunk.Number res.Resp = nil results <- &res @@ -528,6 +544,7 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { &io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt) res.PartNumber = j.Chunk.Number res.Resp = resp + res.err = err if err != nil { rt-- if rt == 0 { @@ -554,34 +571,72 @@ func DividePart(fileSize int64) (int64, int64) { return partNum, partSize } -func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) { +func SplitFileIntoChunks(name, filePath string, opt *MultiUploadOptions) (*CheckPointOptions, []Chunk, int, error) { if filePath == "" { - return nil, 0, errors.New("filePath invalid") + return nil, nil, 0, errors.New("filePath invalid") } file, err := os.Open(filePath) if err != nil { - return nil, 0, err + return nil, nil, 0, err } defer file.Close() stat, err := file.Stat() if err != nil { - return nil, 0, err + return nil, nil, 0, err } + + optcp := &CheckPointOptions{} + uploaded := false + if opt.EnableCheckpoint { + for { + optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDONLY|os.O_CREATE, 0644) + if err != nil { + return nil, nil, 0, errors.New("open(create) checkpoint file[" + opt.CheckPointFile + "] failed, error:" + err.Error()) + } + defer optcp.cpfile.Close() + bs, err := ioutil.ReadAll(optcp.cpfile) + if err != nil { + break + } + err = xml.Unmarshal(bs, optcp) + if err != nil { + break + } + if optcp.Key != name || optcp.FilePath != filePath || optcp.FileSize != stat.Size() { + break + } + uploaded = true + break + } + optcp.Key = name + optcp.FilePath = filePath + optcp.FileSize = stat.Size() + } + var partNum int64 + partSize := opt.PartSize + if uploaded { + partSize = optcp.PartSize + } if partSize > 0 { partSize = partSize * 1024 * 1024 partNum = stat.Size() / partSize if partNum >= 10000 { - return nil, 0, errors.New("Too many parts, out of 10000") + return nil, nil, 0, errors.New("Too many parts, out of 10000") } } else { partNum, partSize = DividePart(stat.Size()) } + if opt.EnableCheckpoint { + optcp.PartSize = partSize / 1024 / 1024 + } var chunks []Chunk - var chunk = Chunk{} + var chunk = Chunk{ + Done: false, + } for i := int64(0); i < partNum; i++ { chunk.Number = int(i + 1) chunk.OffSet = i * partSize @@ -597,8 +652,14 @@ func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) partNum++ } - return chunks, int(partNum), nil - + if uploaded { + for _, part := range optcp.Parts { + if part.PartNumber <= int(partNum) { + chunks[(part.PartNumber - 1)].Done = true + } + } + } + return optcp, chunks, int(partNum), nil } // MultiUpload/Upload 为高级upload接口,并发分块上传 @@ -615,8 +676,12 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string if opt == nil { opt = &MultiUploadOptions{} } + if opt.EnableCheckpoint && opt.CheckPointFile == "" { + opt.CheckPointFile = fmt.Sprintf("%s.cp", filepath) + } + // 1.Get the file chunk - chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize) + optcp, chunks, partNum, err := SplitFileIntoChunks(name, filepath, opt) if err != nil { return nil, nil, err } @@ -637,15 +702,26 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string ETag: rsp.Header.Get("ETag"), } return result, rsp, nil + } + if opt.EnableCheckpoint { + optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDWR, 0644) + if err != nil { + return nil, nil, errors.New("open checkpoint file failed, error: " + err.Error()) + } + defer optcp.cpfile.Close() } - // 2.Init + uploadID := optcp.UploadID optini := opt.OptIni - res, _, err := s.InitiateMultipartUpload(ctx, name, optini) - if err != nil { - return nil, nil, err + if uploadID == "" { + // 2.Init + res, _, err := s.InitiateMultipartUpload(ctx, name, optini) + if err != nil { + return nil, nil, err + } + uploadID = res.UploadID + optcp.UploadID = uploadID } - uploadID := res.UploadID var poolSize int if opt.ThreadPoolSize > 0 { poolSize = opt.ThreadPoolSize @@ -657,6 +733,10 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string chjobs := make(chan *Jobs, 100) chresults := make(chan *Results, 10000) optcom := &CompleteMultipartUploadOptions{} + if len(optcp.Parts) > 0 { + optcom.Parts = append(optcom.Parts, optcp.Parts...) + partNum -= len(optcp.Parts) + } // 3.Start worker for w := 1; w <= poolSize; w++ { @@ -665,6 +745,9 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string // 4.Push jobs for _, chunk := range chunks { + if chunk.Done { + continue + } partOpt := &ObjectUploadPartOptions{} if optini != nil && optini.ObjectPutHeaderOptions != nil { partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo @@ -687,19 +770,35 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string for i := 1; i <= partNum; i++ { res := <-chresults // Notice one part fail can not get the etag according. - if res.Resp == nil { + if res.Resp == nil || res.err != nil { // Some part already fail, can not to get the header inside. - return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, res.PartNumber) + return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error()) } // Notice one part fail can not get the etag according. etag := res.Resp.Header.Get("ETag") optcom.Parts = append(optcom.Parts, Object{ PartNumber: res.PartNumber, ETag: etag}, ) + if opt.EnableCheckpoint { + optcp.Parts = append(optcp.Parts, Object{ + PartNumber: res.PartNumber, ETag: etag}, + ) + err := optcp.cpfile.Truncate(0) + if err != nil { + continue + } + _, err = optcp.cpfile.Seek(0, os.SEEK_SET) + if err == nil { + xml.NewEncoder(optcp.cpfile).Encode(optcp) + } + } } sort.Sort(ObjectList(optcom.Parts)) v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom) + if opt.EnableCheckpoint && err == nil { + os.Remove(opt.CheckPointFile) + } return v, resp, err } diff --git a/object_part.go b/object_part.go index 897afca..75bce19 100644 --- a/object_part.go +++ b/object_part.go @@ -42,7 +42,7 @@ func (s *ObjectService) InitiateMultipartUpload(ctx context.Context, name string // ObjectUploadPartOptions is the options of upload-part type ObjectUploadPartOptions struct { Expect string `header:"Expect,omitempty" url:"-"` - XCosContentSHA1 string `header:"x-cos-content-sha1" url:"-"` + XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"` ContentLength int `header:"Content-Length,omitempty" url:"-"` XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`