Browse Source

download checkpoint

master
jojoliang 4 years ago
parent
commit
4d158217c8
  1. 2
      example/bucket/putPolicy.go
  2. 128
      object.go

2
example/bucket/putPolicy.go

@ -48,7 +48,7 @@ func main() {
Condition: map[string]map[string]interface{}{
"ip_not_equal": map[string]interface{}{
"qcs:ip": []string{
"192.168.1.1",
"<ip>",
},
},
},

128
object.go

@ -3,6 +3,7 @@ package cos
import (
"context"
"crypto/md5"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
@ -568,6 +569,20 @@ type MultiDownloadOptions struct {
Opt *ObjectGetOptions
PartSize int64
ThreadPoolSize int
CheckPoint bool
CheckPointFile string
}
type MultiDownloadCPInfo struct {
Size int64 `json:"contentLength,omitempty"`
ETag string `json:"eTag,omitempty"`
CRC64 string `json:"crc64ecma,omitempty"`
LastModified string `json:"lastModified,omitempty"`
DownloadedBlocks []DownloadedBlock `json:"downloadedBlocks,omitempty"`
}
type DownloadedBlock struct {
From int64 `json:"from,omitempty"`
To int64 `json:"to,omitempty"`
}
type Chunk struct {
@ -584,6 +599,7 @@ type Jobs struct {
UploadId string
FilePath string
RetryTimes int
VersionId []string
Chunk Chunk
Data io.Reader
Opt *ObjectUploadPartOptions
@ -663,7 +679,7 @@ func downloadWorker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results
for {
var res Results
res.PartNumber = j.Chunk.Number
resp, err := s.Get(context.Background(), j.Name, j.DownOpt)
resp, err := s.Get(context.Background(), j.Name, j.DownOpt, j.VersionId...)
res.err = err
res.Resp = resp
if err != nil {
@ -1046,7 +1062,50 @@ func SplitSizeIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, error)
return chunks, int(partNum), nil
}
func (s *ObjectService) Download(ctx context.Context, name string, filepath string, opt *MultiDownloadOptions) (*Response, error) {
func (s *ObjectService) checkDownloadedParts(opt *MultiDownloadCPInfo, chfile string, chunks []Chunk) (*MultiDownloadCPInfo, bool) {
var defaultRes MultiDownloadCPInfo
defaultRes = *opt
fd, err := os.Open(chfile)
// checkpoint 文件不存在
if err != nil && os.IsNotExist(err) {
// 创建 checkpoint 文件
fd, _ = os.OpenFile(chfile, os.O_RDONLY|os.O_CREATE|os.O_TRUNC, 0660)
fd.Close()
return &defaultRes, false
}
if err != nil {
return &defaultRes, false
}
defer fd.Close()
var res MultiDownloadCPInfo
err = json.NewDecoder(fd).Decode(&res)
if err != nil {
return &defaultRes, false
}
// 与COS的文件比较
if res.CRC64 != opt.CRC64 || res.ETag != opt.ETag || res.Size != opt.Size || res.LastModified != opt.LastModified || len(res.DownloadedBlocks) == 0 {
return &defaultRes, false
}
// len(chunks) 大于1,否则为简单下载, chunks[0].Size为partSize
partSize := chunks[0].Size
for _, v := range res.DownloadedBlocks {
index := v.From / partSize
to := chunks[index].OffSet + chunks[index].Size - 1
if chunks[index].OffSet != v.From || to != v.To {
// 重置chunks
for i, _ := range chunks {
chunks[i].Done = false
}
return &defaultRes, false
}
chunks[index].Done = true
}
return &res, true
}
func (s *ObjectService) Download(ctx context.Context, name string, filepath string, opt *MultiDownloadOptions, id ...string) (*Response, error) {
// 参数校验
if opt == nil {
opt = &MultiDownloadOptions{}
@ -1056,7 +1115,7 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
}
// 获取文件长度和CRC
var coscrc string
resp, err := s.Head(ctx, name, nil)
resp, err := s.Head(ctx, name, nil, id...)
if err != nil {
return resp, err
}
@ -1075,7 +1134,7 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
}
// 直接下载到文件
if partNum == 0 || partNum == 1 {
rsp, err := s.GetToFile(ctx, name, filepath, opt.Opt)
rsp, err := s.GetToFile(ctx, name, filepath, opt.Opt, id...)
if err != nil {
return rsp, err
}
@ -1096,12 +1155,39 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
}
return rsp, err
}
// 创建文件
nfile, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
if err != nil {
return resp, err
// 断点续载
var resumableFlag bool
var resumableInfo *MultiDownloadCPInfo
var cpfd *os.File
var cpfile string
if opt.CheckPoint {
cpInfo := &MultiDownloadCPInfo{
LastModified: resp.Header.Get("Last-Modified"),
ETag: resp.Header.Get("ETag"),
CRC64: coscrc,
Size: totalBytes,
}
cpfile = opt.CheckPointFile
if cpfile == "" {
cpfile = fmt.Sprintf("%s.cosresumabletask", filepath)
}
resumableInfo, resumableFlag = s.checkDownloadedParts(cpInfo, cpfile, chunks)
cpfd, err = os.OpenFile(cpfile, os.O_RDWR, 0660)
if err != nil {
return nil, fmt.Errorf("Open CheckPoint File[%v] Failed:%v", cpfile, err)
}
}
if !resumableFlag {
// 创建文件
nfile, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
if err != nil {
if cpfd != nil {
cpfd.Close()
}
return resp, err
}
nfile.Close()
}
nfile.Close()
var poolSize int
if opt.ThreadPoolSize > 0 {
@ -1117,6 +1203,9 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
go func() {
for _, chunk := range chunks {
if chunk.Done {
continue
}
var downOpt ObjectGetOptions
if opt.Opt != nil {
downOpt = *opt.Opt
@ -1129,23 +1218,42 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
Chunk: chunk,
DownOpt: &downOpt,
}
if len(id) > 0 {
job.VersionId = append(job.VersionId, id...)
}
chjobs <- job
}
close(chjobs)
}()
err = nil
for i := 0; i < partNum; i++ {
if chunks[i].Done {
continue
}
res := <-chresults
if res.Resp == nil || res.err != nil {
err = fmt.Errorf("part %d get resp Content. error: %s", res.PartNumber, res.err.Error())
continue
}
// Dump CheckPoint Info
if opt.CheckPoint {
cpfd.Truncate(0)
cpfd.Seek(0, os.SEEK_SET)
resumableInfo.DownloadedBlocks = append(resumableInfo.DownloadedBlocks, DownloadedBlock{From: chunks[i].OffSet, To: chunks[i].OffSet + chunks[i].Size - 1})
json.NewEncoder(cpfd).Encode(resumableInfo)
}
}
close(chresults)
if cpfd != nil {
cpfd.Close()
}
if err != nil {
return nil, err
}
// 下载成功,删除checkpoint文件
if opt.CheckPoint {
os.Remove(cpfile)
}
if coscrc != "" && s.client.Conf.EnableCRC {
icoscrc, _ := strconv.ParseUint(coscrc, 10, 64)
fd, err := os.Open(filepath)

Loading…
Cancel
Save