Browse Source

add checkpoint multi upload

tags/v0.7.11^2
jojoliang 5 years ago
parent
commit
287669a677
  1. 137
      object.go
  2. 2
      object_part.go

137
object.go

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
@ -481,15 +482,28 @@ type Object struct {
// MultiUploadOptions is the option of the multiupload,
// ThreadPoolSize default is one
type MultiUploadOptions struct {
OptIni *InitiateMultipartUploadOptions
PartSize int64
ThreadPoolSize int
OptIni *InitiateMultipartUploadOptions
PartSize int64
ThreadPoolSize int
CheckPointFile string
EnableCheckpoint bool
}
type CheckPointOptions struct {
cpfile *os.File
Key string `xml:"Key"`
FilePath string `xml:"FilePath"`
FileSize int64 `xml:"FileSize"`
PartSize int64 `xml:"PartSize"`
UploadID string `xml:"UploadID"`
Parts []Object `xml:"Parts>Part,omitempty"`
}
type Chunk struct {
Number int
OffSet int64
Size int64
Done bool
}
// jobs
@ -506,6 +520,7 @@ type Jobs struct {
type Results struct {
PartNumber int
Resp *Response
err error
}
func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
@ -513,6 +528,7 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
fd, err := os.Open(j.FilePath)
var res Results
if err != nil {
res.err = err
res.PartNumber = j.Chunk.Number
res.Resp = nil
results <- &res
@ -528,6 +544,7 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
&io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt)
res.PartNumber = j.Chunk.Number
res.Resp = resp
res.err = err
if err != nil {
rt--
if rt == 0 {
@ -554,34 +571,72 @@ func DividePart(fileSize int64) (int64, int64) {
return partNum, partSize
}
func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) {
func SplitFileIntoChunks(name, filePath string, opt *MultiUploadOptions) (*CheckPointOptions, []Chunk, int, error) {
if filePath == "" {
return nil, 0, errors.New("filePath invalid")
return nil, nil, 0, errors.New("filePath invalid")
}
file, err := os.Open(filePath)
if err != nil {
return nil, 0, err
return nil, nil, 0, err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return nil, 0, err
return nil, nil, 0, err
}
optcp := &CheckPointOptions{}
uploaded := false
if opt.EnableCheckpoint {
for {
optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDONLY|os.O_CREATE, 0644)
if err != nil {
return nil, nil, 0, errors.New("open(create) checkpoint file[" + opt.CheckPointFile + "] failed, error:" + err.Error())
}
defer optcp.cpfile.Close()
bs, err := ioutil.ReadAll(optcp.cpfile)
if err != nil {
break
}
err = xml.Unmarshal(bs, optcp)
if err != nil {
break
}
if optcp.Key != name || optcp.FilePath != filePath || optcp.FileSize != stat.Size() {
break
}
uploaded = true
break
}
optcp.Key = name
optcp.FilePath = filePath
optcp.FileSize = stat.Size()
}
var partNum int64
partSize := opt.PartSize
if uploaded {
partSize = optcp.PartSize
}
if partSize > 0 {
partSize = partSize * 1024 * 1024
partNum = stat.Size() / partSize
if partNum >= 10000 {
return nil, 0, errors.New("Too many parts, out of 10000")
return nil, nil, 0, errors.New("Too many parts, out of 10000")
}
} else {
partNum, partSize = DividePart(stat.Size())
}
if opt.EnableCheckpoint {
optcp.PartSize = partSize / 1024 / 1024
}
var chunks []Chunk
var chunk = Chunk{}
var chunk = Chunk{
Done: false,
}
for i := int64(0); i < partNum; i++ {
chunk.Number = int(i + 1)
chunk.OffSet = i * partSize
@ -597,8 +652,14 @@ func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error)
partNum++
}
return chunks, int(partNum), nil
if uploaded {
for _, part := range optcp.Parts {
if part.PartNumber <= int(partNum) {
chunks[(part.PartNumber - 1)].Done = true
}
}
}
return optcp, chunks, int(partNum), nil
}
// MultiUpload/Upload 为高级upload接口,并发分块上传
@ -615,8 +676,12 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
if opt == nil {
opt = &MultiUploadOptions{}
}
if opt.EnableCheckpoint && opt.CheckPointFile == "" {
opt.CheckPointFile = fmt.Sprintf("%s.cp", filepath)
}
// 1.Get the file chunk
chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
optcp, chunks, partNum, err := SplitFileIntoChunks(name, filepath, opt)
if err != nil {
return nil, nil, err
}
@ -637,15 +702,26 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
ETag: rsp.Header.Get("ETag"),
}
return result, rsp, nil
}
if opt.EnableCheckpoint {
optcp.cpfile, err = os.OpenFile(opt.CheckPointFile, os.O_RDWR, 0644)
if err != nil {
return nil, nil, errors.New("open checkpoint file failed, error: " + err.Error())
}
defer optcp.cpfile.Close()
}
// 2.Init
uploadID := optcp.UploadID
optini := opt.OptIni
res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
if err != nil {
return nil, nil, err
if uploadID == "" {
// 2.Init
res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
if err != nil {
return nil, nil, err
}
uploadID = res.UploadID
optcp.UploadID = uploadID
}
uploadID := res.UploadID
var poolSize int
if opt.ThreadPoolSize > 0 {
poolSize = opt.ThreadPoolSize
@ -657,6 +733,10 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
chjobs := make(chan *Jobs, 100)
chresults := make(chan *Results, 10000)
optcom := &CompleteMultipartUploadOptions{}
if len(optcp.Parts) > 0 {
optcom.Parts = append(optcom.Parts, optcp.Parts...)
partNum -= len(optcp.Parts)
}
// 3.Start worker
for w := 1; w <= poolSize; w++ {
@ -665,6 +745,9 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
// 4.Push jobs
for _, chunk := range chunks {
if chunk.Done {
continue
}
partOpt := &ObjectUploadPartOptions{}
if optini != nil && optini.ObjectPutHeaderOptions != nil {
partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
@ -687,19 +770,35 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
for i := 1; i <= partNum; i++ {
res := <-chresults
// Notice one part fail can not get the etag according.
if res.Resp == nil {
if res.Resp == nil || res.err != nil {
// Some part already fail, can not to get the header inside.
return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, res.PartNumber)
return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
}
// Notice one part fail can not get the etag according.
etag := res.Resp.Header.Get("ETag")
optcom.Parts = append(optcom.Parts, Object{
PartNumber: res.PartNumber, ETag: etag},
)
if opt.EnableCheckpoint {
optcp.Parts = append(optcp.Parts, Object{
PartNumber: res.PartNumber, ETag: etag},
)
err := optcp.cpfile.Truncate(0)
if err != nil {
continue
}
_, err = optcp.cpfile.Seek(0, os.SEEK_SET)
if err == nil {
xml.NewEncoder(optcp.cpfile).Encode(optcp)
}
}
}
sort.Sort(ObjectList(optcom.Parts))
v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
if opt.EnableCheckpoint && err == nil {
os.Remove(opt.CheckPointFile)
}
return v, resp, err
}

2
object_part.go

@ -42,7 +42,7 @@ func (s *ObjectService) InitiateMultipartUpload(ctx context.Context, name string
// ObjectUploadPartOptions is the options of upload-part
type ObjectUploadPartOptions struct {
Expect string `header:"Expect,omitempty" url:"-"`
XCosContentSHA1 string `header:"x-cos-content-sha1" url:"-"`
XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
ContentLength int `header:"Content-Length,omitempty" url:"-"`
XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`

Loading…
Cancel
Save