diff --git a/example/object/download.go b/example/object/download.go new file mode 100644 index 0000000..62f7889 --- /dev/null +++ b/example/object/download.go @@ -0,0 +1,57 @@ +package main + +import ( + "context" + "net/http" + "net/url" + "os" + + "fmt" + "github.com/tencentyun/cos-go-sdk-v5" + "github.com/tencentyun/cos-go-sdk-v5/debug" +) + +func log_status(err error) { + if err == nil { + return + } + if cos.IsNotFoundError(err) { + // WARN + fmt.Println("WARN: Resource is not existed") + } else if e, ok := cos.IsCOSError(err); ok { + fmt.Printf("ERROR: Code: %v\n", e.Code) + fmt.Printf("ERROR: Message: %v\n", e.Message) + fmt.Printf("ERROR: Resource: %v\n", e.Resource) + fmt.Printf("ERROR: RequestId: %v\n", e.RequestID) + // ERROR + } else { + fmt.Printf("ERROR: %v\n", err) + // ERROR + } +} + +func main() { + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") + b := &cos.BaseURL{BucketURL: u} + c := cos.NewClient(b, &http.Client{ + Transport: &cos.AuthorizationTransport{ + SecretID: os.Getenv("COS_SECRETID"), + SecretKey: os.Getenv("COS_SECRETKEY"), + Transport: &debug.DebugRequestTransport{ + RequestHeader: false, + RequestBody: false, + ResponseHeader: false, + ResponseBody: false, + }, + }, + }) + + opt := &cos.MultiDownloadOptions{ + ThreadPoolSize: 5, + } + resp, err := c.Object.Download( + context.Background(), "test", "./test1G", opt, + ) + log_status(err) + fmt.Printf("done, %v\n", resp.Header) +} diff --git a/helper.go b/helper.go index 7b4acbf..a7256d3 100644 --- a/helper.go +++ b/helper.go @@ -237,3 +237,26 @@ func cloneObjectUploadPartOptions(opt *ObjectUploadPartOptions) *ObjectUploadPar } return &res } + +type RangeOptions struct { + HasStart bool + HasEnd bool + Start int64 + End int64 +} + +func FormatRangeOptions(opt *RangeOptions) string { + if opt == nil { + return "" + } + if opt.HasStart && opt.HasEnd { + return fmt.Sprintf("bytes=%v-%v", opt.Start, opt.End) + } + if opt.HasStart { + return fmt.Sprintf("bytes=%v-", opt.Start) + } + if opt.HasEnd { + return fmt.Sprintf("bytes=-%v", opt.End) + } + return "bytes=-" +} diff --git a/object.go b/object.go index 281a035..5eb7593 100644 --- a/object.go +++ b/object.go @@ -553,6 +553,12 @@ type MultiUploadOptions struct { EnableVerification bool } +type MultiDownloadOptions struct { + Opt *ObjectGetOptions + PartSize int64 + ThreadPoolSize int +} + type Chunk struct { Number int OffSet int64 @@ -570,6 +576,7 @@ type Jobs struct { Chunk Chunk Data io.Reader Opt *ObjectUploadPartOptions + DownOpt *ObjectGetOptions } type Results struct { @@ -632,6 +639,48 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { } } +func downloadWorker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) { + for j := range jobs { + opt := &RangeOptions{ + HasStart: true, + HasEnd: true, + Start: j.Chunk.OffSet, + End: j.Chunk.OffSet + j.Chunk.Size - 1, + } + j.DownOpt.Range = FormatRangeOptions(opt) + rt := j.RetryTimes + for { + var res Results + res.PartNumber = j.Chunk.Number + resp, err := s.Get(context.Background(), j.Name, j.DownOpt) + res.err = err + res.Resp = resp + if err != nil { + rt-- + if rt == 0 { + results <- &res + break + } + continue + } + defer resp.Body.Close() + fd, err := os.OpenFile(j.FilePath, os.O_WRONLY, 0660) + if err != nil { + res.err = err + results <- &res + break + } + fd.Seek(j.Chunk.OffSet, os.SEEK_SET) + n, err := io.Copy(fd, LimitReadCloser(resp.Body, j.Chunk.Size)) + if n != j.Chunk.Size || err != nil { + res.err = fmt.Errorf("io.Copy Failed, read:%v, size:%v, err:%v", n, j.Chunk.Size, err) + } + results <- &res + break + } + } +} + func DividePart(fileSize int64, last int) (int64, int64) { partSize := int64(last * 1024 * 1024) partNum := fileSize / partSize @@ -953,6 +1002,150 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string return v, resp, err } +func SplitSizeIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, error) { + var partNum int64 + if partSize > 0 { + partSize = partSize * 1024 * 1024 + partNum = totalBytes / partSize + if partNum >= 10000 { + return nil, 0, errors.New("Too manry parts, out of 10000") + } + } else { + partNum, partSize = DividePart(totalBytes, 64) + } + + var chunks []Chunk + var chunk = Chunk{} + for i := int64(0); i < partNum; i++ { + chunk.Number = int(i + 1) + chunk.OffSet = i * partSize + chunk.Size = partSize + chunks = append(chunks, chunk) + } + + if totalBytes%partSize > 0 { + chunk.Number = len(chunks) + 1 + chunk.OffSet = int64(len(chunks)) * partSize + chunk.Size = totalBytes % partSize + chunks = append(chunks, chunk) + partNum++ + } + + return chunks, int(partNum), nil +} + +func (s *ObjectService) Download(ctx context.Context, name string, filepath string, opt *MultiDownloadOptions) (*Response, error) { + // 参数校验 + if opt == nil { + opt = &MultiDownloadOptions{} + } + if opt.Opt != nil && opt.Opt.Range != "" { + return nil, fmt.Errorf("does not supported Range Get") + } + // 获取文件长度和CRC + var coscrc string + resp, err := s.Head(ctx, name, nil) + if err != nil { + return resp, err + } + coscrc = resp.Header.Get("x-cos-hash-crc64ecma") + strTotalBytes := resp.Header.Get("Content-Length") + totalBytes, err := strconv.ParseInt(strTotalBytes, 10, 64) + if err != nil { + return resp, err + } + + // 切分 + chunks, partNum, err := SplitSizeIntoChunks(totalBytes, opt.PartSize) + if err != nil { + return resp, err + } + // 直接下载到文件 + if partNum == 0 || partNum == 1 { + rsp, err := s.GetToFile(ctx, name, filepath, opt.Opt) + if err != nil { + return rsp, err + } + if coscrc != "" && s.client.Conf.EnableCRC { + icoscrc, _ := strconv.ParseUint(coscrc, 10, 64) + fd, err := os.Open(filepath) + if err != nil { + return rsp, err + } + localcrc, err := calCRC64(fd) + if err != nil { + return rsp, err + } + if localcrc != icoscrc { + return rsp, fmt.Errorf("verification failed, want:%v, return:%v", icoscrc, localcrc) + } + } + return rsp, err + } + nfile, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660) + if err != nil { + return resp, err + } + nfile.Close() + var poolSize int + if opt.ThreadPoolSize > 0 { + poolSize = opt.ThreadPoolSize + } else { + poolSize = 1 + } + chjobs := make(chan *Jobs, 100) + chresults := make(chan *Results, 10000) + for w := 1; w <= poolSize; w++ { + go downloadWorker(s, chjobs, chresults) + } + + go func() { + for _, chunk := range chunks { + var downOpt ObjectGetOptions + if opt.Opt != nil { + downOpt = *opt.Opt + } + job := &Jobs{ + Name: name, + RetryTimes: 3, + FilePath: filepath, + Chunk: chunk, + DownOpt: &downOpt, + } + chjobs <- job + } + close(chjobs) + }() + + err = nil + for i := 0; i < partNum; i++ { + res := <-chresults + if res.Resp == nil || res.err != nil { + err = fmt.Errorf("part %d get resp Content. error: %s", res.PartNumber, res.err.Error()) + continue + } + } + close(chresults) + if err != nil { + return nil, err + } + if coscrc != "" && s.client.Conf.EnableCRC { + icoscrc, _ := strconv.ParseUint(coscrc, 10, 64) + fd, err := os.Open(filepath) + if err != nil { + return resp, err + } + localcrc, err := calCRC64(fd) + if err != nil { + return resp, err + } + if localcrc != icoscrc { + return resp, fmt.Errorf("verification failed, want:%v, return:%v", icoscrc, localcrc) + } + } + return resp, err +} + type ObjectPutTaggingOptions struct { XMLName xml.Name `xml:"Tagging"` TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"` diff --git a/object_test.go b/object_test.go index fcd8f2e..23e13fe 100644 --- a/object_test.go +++ b/object_test.go @@ -8,12 +8,14 @@ import ( "encoding/xml" "fmt" "hash/crc64" + "io" "io/ioutil" "net/http" "net/url" "os" "reflect" "strconv" + "strings" "testing" "time" ) @@ -514,3 +516,80 @@ func TestObjectService_Upload2(t *testing.T) { retry++ } } + +func TestObjectService_Download(t *testing.T) { + setup() + defer teardown() + + filePath := "rsp.file" + time.Now().Format(time.RFC3339) + newfile, err := os.Create(filePath) + if err != nil { + t.Fatalf("create tmp file failed") + } + defer os.Remove(filePath) + // 源文件内容 + totalBytes := int64(1024 * 1024 * 10) + b := make([]byte, totalBytes) + _, err = rand.Read(b) + newfile.Write(b) + newfile.Close() + tb := crc64.MakeTable(crc64.ECMA) + localcrc := crc64.Update(0, tb, b) + + retryMap := make(map[int64]int) + mux.HandleFunc("/test.go.download", func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodHead { + w.Header().Add("Content-Length", strconv.FormatInt(totalBytes, 10)) + w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(localcrc, 10)) + return + } + strRange := r.Header.Get("Range") + slice1 := strings.Split(strRange, "=") + slice2 := strings.Split(slice1[1], "-") + start, _ := strconv.ParseInt(slice2[0], 10, 64) + end, _ := strconv.ParseInt(slice2[1], 10, 64) + if retryMap[start] == 0 { + retryMap[start]++ + w.WriteHeader(http.StatusGatewayTimeout) + } else if retryMap[start] == 1 { + retryMap[start]++ + w.WriteHeader(http.StatusGatewayTimeout) + return + fd, err := os.Open(filePath) + if err != nil { + t.Fatalf("open file failed: %v", err) + } + defer fd.Close() + w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(localcrc, 10)) + fd.Seek(start, os.SEEK_SET) + n, err := io.Copy(w, LimitReadCloser(fd, (end-start)/2)) + if err != nil || int64(n) != (end-start)/2 { + t.Fatalf("write file failed:%v, n:%v", err, n) + } + + } else { + fd, err := os.Open(filePath) + if err != nil { + t.Fatalf("open file failed: %v", err) + } + defer fd.Close() + w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(localcrc, 10)) + fd.Seek(start, os.SEEK_SET) + n, err := io.Copy(w, LimitReadCloser(fd, end-start+1)) + if err != nil || int64(n) != end-start+1 { + t.Fatalf("write file failed:%v, n:%v", err, n) + } + } + }) + + opt := &MultiDownloadOptions{ + ThreadPoolSize: 3, + PartSize: 1, + } + downPath := "down.file" + time.Now().Format(time.RFC3339) + _, err = client.Object.Download(context.Background(), "test.go.download", downPath, opt) + if err != nil { + t.Fatalf("Object.Upload returned error: %v", err) + } + os.Remove(downPath) +}