Browse Source

add multidownload

master
jojoliang 4 years ago
parent
commit
f54cbf13a5
  1. 57
      example/object/download.go
  2. 23
      helper.go
  3. 193
      object.go
  4. 79
      object_test.go

57
example/object/download.go

@ -0,0 +1,57 @@
package main
import (
"context"
"net/http"
"net/url"
"os"
"fmt"
"github.com/tencentyun/cos-go-sdk-v5"
"github.com/tencentyun/cos-go-sdk-v5/debug"
)
func log_status(err error) {
if err == nil {
return
}
if cos.IsNotFoundError(err) {
// WARN
fmt.Println("WARN: Resource is not existed")
} else if e, ok := cos.IsCOSError(err); ok {
fmt.Printf("ERROR: Code: %v\n", e.Code)
fmt.Printf("ERROR: Message: %v\n", e.Message)
fmt.Printf("ERROR: Resource: %v\n", e.Resource)
fmt.Printf("ERROR: RequestId: %v\n", e.RequestID)
// ERROR
} else {
fmt.Printf("ERROR: %v\n", err)
// ERROR
}
}
func main() {
u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com")
b := &cos.BaseURL{BucketURL: u}
c := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: os.Getenv("COS_SECRETID"),
SecretKey: os.Getenv("COS_SECRETKEY"),
Transport: &debug.DebugRequestTransport{
RequestHeader: false,
RequestBody: false,
ResponseHeader: false,
ResponseBody: false,
},
},
})
opt := &cos.MultiDownloadOptions{
ThreadPoolSize: 5,
}
resp, err := c.Object.Download(
context.Background(), "test", "./test1G", opt,
)
log_status(err)
fmt.Printf("done, %v\n", resp.Header)
}

23
helper.go

@ -237,3 +237,26 @@ func cloneObjectUploadPartOptions(opt *ObjectUploadPartOptions) *ObjectUploadPar
}
return &res
}
type RangeOptions struct {
HasStart bool
HasEnd bool
Start int64
End int64
}
func FormatRangeOptions(opt *RangeOptions) string {
if opt == nil {
return ""
}
if opt.HasStart && opt.HasEnd {
return fmt.Sprintf("bytes=%v-%v", opt.Start, opt.End)
}
if opt.HasStart {
return fmt.Sprintf("bytes=%v-", opt.Start)
}
if opt.HasEnd {
return fmt.Sprintf("bytes=-%v", opt.End)
}
return "bytes=-"
}

193
object.go

@ -553,6 +553,12 @@ type MultiUploadOptions struct {
EnableVerification bool
}
type MultiDownloadOptions struct {
Opt *ObjectGetOptions
PartSize int64
ThreadPoolSize int
}
type Chunk struct {
Number int
OffSet int64
@ -570,6 +576,7 @@ type Jobs struct {
Chunk Chunk
Data io.Reader
Opt *ObjectUploadPartOptions
DownOpt *ObjectGetOptions
}
type Results struct {
@ -632,6 +639,48 @@ func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
}
}
func downloadWorker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
for j := range jobs {
opt := &RangeOptions{
HasStart: true,
HasEnd: true,
Start: j.Chunk.OffSet,
End: j.Chunk.OffSet + j.Chunk.Size - 1,
}
j.DownOpt.Range = FormatRangeOptions(opt)
rt := j.RetryTimes
for {
var res Results
res.PartNumber = j.Chunk.Number
resp, err := s.Get(context.Background(), j.Name, j.DownOpt)
res.err = err
res.Resp = resp
if err != nil {
rt--
if rt == 0 {
results <- &res
break
}
continue
}
defer resp.Body.Close()
fd, err := os.OpenFile(j.FilePath, os.O_WRONLY, 0660)
if err != nil {
res.err = err
results <- &res
break
}
fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
n, err := io.Copy(fd, LimitReadCloser(resp.Body, j.Chunk.Size))
if n != j.Chunk.Size || err != nil {
res.err = fmt.Errorf("io.Copy Failed, read:%v, size:%v, err:%v", n, j.Chunk.Size, err)
}
results <- &res
break
}
}
}
func DividePart(fileSize int64, last int) (int64, int64) {
partSize := int64(last * 1024 * 1024)
partNum := fileSize / partSize
@ -953,6 +1002,150 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
return v, resp, err
}
func SplitSizeIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, error) {
var partNum int64
if partSize > 0 {
partSize = partSize * 1024 * 1024
partNum = totalBytes / partSize
if partNum >= 10000 {
return nil, 0, errors.New("Too manry parts, out of 10000")
}
} else {
partNum, partSize = DividePart(totalBytes, 64)
}
var chunks []Chunk
var chunk = Chunk{}
for i := int64(0); i < partNum; i++ {
chunk.Number = int(i + 1)
chunk.OffSet = i * partSize
chunk.Size = partSize
chunks = append(chunks, chunk)
}
if totalBytes%partSize > 0 {
chunk.Number = len(chunks) + 1
chunk.OffSet = int64(len(chunks)) * partSize
chunk.Size = totalBytes % partSize
chunks = append(chunks, chunk)
partNum++
}
return chunks, int(partNum), nil
}
func (s *ObjectService) Download(ctx context.Context, name string, filepath string, opt *MultiDownloadOptions) (*Response, error) {
// 参数校验
if opt == nil {
opt = &MultiDownloadOptions{}
}
if opt.Opt != nil && opt.Opt.Range != "" {
return nil, fmt.Errorf("does not supported Range Get")
}
// 获取文件长度和CRC
var coscrc string
resp, err := s.Head(ctx, name, nil)
if err != nil {
return resp, err
}
coscrc = resp.Header.Get("x-cos-hash-crc64ecma")
strTotalBytes := resp.Header.Get("Content-Length")
totalBytes, err := strconv.ParseInt(strTotalBytes, 10, 64)
if err != nil {
return resp, err
}
// 切分
chunks, partNum, err := SplitSizeIntoChunks(totalBytes, opt.PartSize)
if err != nil {
return resp, err
}
// 直接下载到文件
if partNum == 0 || partNum == 1 {
rsp, err := s.GetToFile(ctx, name, filepath, opt.Opt)
if err != nil {
return rsp, err
}
if coscrc != "" && s.client.Conf.EnableCRC {
icoscrc, _ := strconv.ParseUint(coscrc, 10, 64)
fd, err := os.Open(filepath)
if err != nil {
return rsp, err
}
localcrc, err := calCRC64(fd)
if err != nil {
return rsp, err
}
if localcrc != icoscrc {
return rsp, fmt.Errorf("verification failed, want:%v, return:%v", icoscrc, localcrc)
}
}
return rsp, err
}
nfile, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
if err != nil {
return resp, err
}
nfile.Close()
var poolSize int
if opt.ThreadPoolSize > 0 {
poolSize = opt.ThreadPoolSize
} else {
poolSize = 1
}
chjobs := make(chan *Jobs, 100)
chresults := make(chan *Results, 10000)
for w := 1; w <= poolSize; w++ {
go downloadWorker(s, chjobs, chresults)
}
go func() {
for _, chunk := range chunks {
var downOpt ObjectGetOptions
if opt.Opt != nil {
downOpt = *opt.Opt
}
job := &Jobs{
Name: name,
RetryTimes: 3,
FilePath: filepath,
Chunk: chunk,
DownOpt: &downOpt,
}
chjobs <- job
}
close(chjobs)
}()
err = nil
for i := 0; i < partNum; i++ {
res := <-chresults
if res.Resp == nil || res.err != nil {
err = fmt.Errorf("part %d get resp Content. error: %s", res.PartNumber, res.err.Error())
continue
}
}
close(chresults)
if err != nil {
return nil, err
}
if coscrc != "" && s.client.Conf.EnableCRC {
icoscrc, _ := strconv.ParseUint(coscrc, 10, 64)
fd, err := os.Open(filepath)
if err != nil {
return resp, err
}
localcrc, err := calCRC64(fd)
if err != nil {
return resp, err
}
if localcrc != icoscrc {
return resp, fmt.Errorf("verification failed, want:%v, return:%v", icoscrc, localcrc)
}
}
return resp, err
}
type ObjectPutTaggingOptions struct {
XMLName xml.Name `xml:"Tagging"`
TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`

79
object_test.go

@ -8,12 +8,14 @@ import (
"encoding/xml"
"fmt"
"hash/crc64"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"strings"
"testing"
"time"
)
@ -514,3 +516,80 @@ func TestObjectService_Upload2(t *testing.T) {
retry++
}
}
func TestObjectService_Download(t *testing.T) {
setup()
defer teardown()
filePath := "rsp.file" + time.Now().Format(time.RFC3339)
newfile, err := os.Create(filePath)
if err != nil {
t.Fatalf("create tmp file failed")
}
defer os.Remove(filePath)
// 源文件内容
totalBytes := int64(1024 * 1024 * 10)
b := make([]byte, totalBytes)
_, err = rand.Read(b)
newfile.Write(b)
newfile.Close()
tb := crc64.MakeTable(crc64.ECMA)
localcrc := crc64.Update(0, tb, b)
retryMap := make(map[int64]int)
mux.HandleFunc("/test.go.download", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodHead {
w.Header().Add("Content-Length", strconv.FormatInt(totalBytes, 10))
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(localcrc, 10))
return
}
strRange := r.Header.Get("Range")
slice1 := strings.Split(strRange, "=")
slice2 := strings.Split(slice1[1], "-")
start, _ := strconv.ParseInt(slice2[0], 10, 64)
end, _ := strconv.ParseInt(slice2[1], 10, 64)
if retryMap[start] == 0 {
retryMap[start]++
w.WriteHeader(http.StatusGatewayTimeout)
} else if retryMap[start] == 1 {
retryMap[start]++
w.WriteHeader(http.StatusGatewayTimeout)
return
fd, err := os.Open(filePath)
if err != nil {
t.Fatalf("open file failed: %v", err)
}
defer fd.Close()
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(localcrc, 10))
fd.Seek(start, os.SEEK_SET)
n, err := io.Copy(w, LimitReadCloser(fd, (end-start)/2))
if err != nil || int64(n) != (end-start)/2 {
t.Fatalf("write file failed:%v, n:%v", err, n)
}
} else {
fd, err := os.Open(filePath)
if err != nil {
t.Fatalf("open file failed: %v", err)
}
defer fd.Close()
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(localcrc, 10))
fd.Seek(start, os.SEEK_SET)
n, err := io.Copy(w, LimitReadCloser(fd, end-start+1))
if err != nil || int64(n) != end-start+1 {
t.Fatalf("write file failed:%v, n:%v", err, n)
}
}
})
opt := &MultiDownloadOptions{
ThreadPoolSize: 3,
PartSize: 1,
}
downPath := "down.file" + time.Now().Format(time.RFC3339)
_, err = client.Object.Download(context.Background(), "test.go.download", downPath, opt)
if err != nil {
t.Fatalf("Object.Upload returned error: %v", err)
}
os.Remove(downPath)
}
Loading…
Cancel
Save