From 2afc5e192cd9bc8d6630fa929e10028ec79bde8e Mon Sep 17 00:00:00 2001 From: jojoliang Date: Wed, 14 Oct 2020 17:36:28 +0800 Subject: [PATCH] add progress --- example/object/get.go | 25 +++++--- example/object/put.go | 17 +++--- example/object/upload.go | 21 ++++++- example/object/uploadPart.go | 40 +++++++++++-- helper.go | 28 +++++++++ object.go | 61 ++++++++++++++++--- object_part.go | 10 ++++ progress.go | 135 +++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 306 insertions(+), 31 deletions(-) create mode 100644 progress.go diff --git a/example/object/get.go b/example/object/get.go index 2650030..ff44ebf 100644 --- a/example/object/get.go +++ b/example/object/get.go @@ -19,7 +19,7 @@ func log_status(err error) { } if cos.IsNotFoundError(err) { // WARN - fmt.Println("WARN: Resource is not existed") + fmt.Println("WARN: Resource is not existed") } else if e, ok := cos.IsCOSError(err); ok { fmt.Printf("ERROR: Code: %v\n", e.Code) fmt.Printf("ERROR: Message: %v\n", e.Message) @@ -33,7 +33,7 @@ func log_status(err error) { } func main() { - u, _ := url.Parse("https://test-1253846586.cos.ap-guangzhou.myqcloud.com") + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") b := &cos.BaseURL{BucketURL: u} c := cos.NewClient(b, &http.Client{ Transport: &cos.AuthorizationTransport{ @@ -48,8 +48,8 @@ func main() { }, }) - // Case1 Download object into ReadCloser(). the body needs to be closed - name := "test/hello.txt" + // Case1 通过resp.Body下载对象,Body需要关闭 + name := "test/example" resp, err := c.Object.Get(context.Background(), name, nil) log_status(err) @@ -57,8 +57,8 @@ func main() { resp.Body.Close() fmt.Printf("%s\n", string(bs)) - // Case2 Download object to local file. the body needs to be closed - fd, err := os.OpenFile("hello.txt", os.O_WRONLY|os.O_CREATE, 0660) + // Case2 下载对象到文件. Body需要关闭 + fd, err := os.OpenFile("test", os.O_WRONLY|os.O_CREATE, 0660) log_status(err) defer fd.Close() @@ -68,11 +68,11 @@ func main() { io.Copy(fd, resp.Body) resp.Body.Close() - // Case3 Download object to local file path - _, err = c.Object.GetToFile(context.Background(), name, "hello_1.txt", nil) + // Case3 下载对象到文件 + _, err = c.Object.GetToFile(context.Background(), name, "test", nil) log_status(err) - // Case4 Download object with range header, can used to concurrent download + // Case4 range下载对象,可以根据range实现并发下载 opt := &cos.ObjectGetOptions{ ResponseContentType: "text/html", Range: "bytes=0-3", @@ -82,4 +82,11 @@ func main() { bs, _ = ioutil.ReadAll(resp.Body) resp.Body.Close() fmt.Printf("%s\n", string(bs)) + + // Case5 下载对象到文件,查看下载进度 + opt = &cos.ObjectGetOptions{ + Listener: &cos.DefaultProgressListener{}, + } + _, err = c.Object.GetToFile(context.Background(), name, "test", opt) + log_status(err) } diff --git a/example/object/put.go b/example/object/put.go index a9f97f3..0f4b12b 100644 --- a/example/object/put.go +++ b/example/object/put.go @@ -19,7 +19,7 @@ func log_status(err error) { } if cos.IsNotFoundError(err) { // WARN - fmt.Println("WARN: Resource is not existed") + fmt.Println("WARN: Resource is not existed") } else if e, ok := cos.IsCOSError(err); ok { fmt.Printf("ERROR: Code: %v\n", e.Code) fmt.Printf("ERROR: Message: %v\n", e.Message) @@ -44,20 +44,19 @@ func main() { // Notice when put a large file and set need the request body, might happend out of memory error. RequestBody: false, ResponseHeader: true, - ResponseBody: true, + ResponseBody: false, }, }, }) - // Case1 normal put object - name := "test/objectPut.go" + // Case1 上传对象 + name := "test/example" f := strings.NewReader("test") _, err := c.Object.Put(context.Background(), name, f, nil) log_status(err) - // Case2 put object with the options - name = "test/put_option.go" + // Case2 使用options上传对象 f = strings.NewReader("test xxx") opt := &cos.ObjectPutOptions{ ObjectPutHeaderOptions: &cos.ObjectPutHeaderOptions{ @@ -71,7 +70,11 @@ func main() { _, err = c.Object.Put(context.Background(), name, f, opt) log_status(err) - // Case3 put object by local file path + // Case3 通过本地文件上传对象 _, err = c.Object.PutFromFile(context.Background(), name, "./test", nil) log_status(err) + + // Case4 查看上传进度 + opt.ObjectPutHeaderOptions.Listener = &cos.DefaultProgressListener{} + _, err = c.Object.PutFromFile(context.Background(), name, "./test", opt) } diff --git a/example/object/upload.go b/example/object/upload.go index 7894fda..54a6f7d 100644 --- a/example/object/upload.go +++ b/example/object/upload.go @@ -49,9 +49,26 @@ func main() { }, }) + // Case1 多线程上传对象 + opt := &cos.MultiUploadOptions{ + ThreadPoolSize: 3, + } v, _, err := c.Object.Upload( - context.Background(), "gomulput1G", "./test1G", nil, + context.Background(), "gomulput1G", "./test1G", opt, + ) + log_status(err) + fmt.Printf("Case1 done, %v\n", v) + + // Case2 多线程上传对象,查看上传进度 + opt.OptIni = &cos.InitiateMultipartUploadOptions{ + nil, + &cos.ObjectPutHeaderOptions{ + Listener: &cos.DefaultProgressListener{}, + }, + } + v, _, err = c.Object.Upload( + context.Background(), "gomulput1G", "./test1G", opt, ) log_status(err) - fmt.Println(v) + fmt.Printf("Case2 done, %v\n", v) } diff --git a/example/object/uploadPart.go b/example/object/uploadPart.go index 945bfe5..0a3c5fa 100644 --- a/example/object/uploadPart.go +++ b/example/object/uploadPart.go @@ -41,7 +41,7 @@ func initUpload(c *cos.Client, name string) *cos.InitiateMultipartUploadResult { } func main() { - u, _ := url.Parse("https://test-1253846586.cos.ap-guangzhou.myqcloud.com") + u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") b := &cos.BaseURL{BucketURL: u} c := cos.NewClient(b, &http.Client{ Transport: &cos.AuthorizationTransport{ @@ -49,20 +49,50 @@ func main() { SecretKey: os.Getenv("COS_SECRETKEY"), Transport: &debug.DebugRequestTransport{ RequestHeader: true, - RequestBody: true, + RequestBody: false, ResponseHeader: true, - ResponseBody: true, + ResponseBody: false, }, }, }) + optcom := &cos.CompleteMultipartUploadOptions{} name := "test/test_multi_upload.go" up := initUpload(c, name) uploadID := up.UploadID + fd, err := os.Open("test") + if err != nil { + fmt.Printf("Open File Error: %v\n", err) + return + } + defer fd.Close() + stat, err := fd.Stat() + if err != nil { + fmt.Printf("Stat File Error: %v\n", err) + return + } + opt := &cos.ObjectUploadPartOptions{ + Listener: &cos.DefaultProgressListener{}, + ContentLength: int(stat.Size()), + } + resp, err := c.Object.UploadPart( + context.Background(), name, uploadID, 1, fd, opt, + ) + optcom.Parts = append(optcom.Parts, cos.Object{ + PartNumber: 1, ETag: resp.Header.Get("ETag"), + }) + log_status(err) + f := strings.NewReader("test heoo") - _, err := c.Object.UploadPart( - context.Background(), name, uploadID, 1, f, nil, + resp, err = c.Object.UploadPart( + context.Background(), name, uploadID, 2, f, nil, ) log_status(err) + optcom.Parts = append(optcom.Parts, cos.Object{ + PartNumber: 2, ETag: resp.Header.Get("ETag"), + }) + + _, _, err = c.Object.CompleteMultipartUpload(context.Background(), name, uploadID, optcom) + log_status(err) } diff --git a/helper.go b/helper.go index 8e98348..52fdeee 100644 --- a/helper.go +++ b/helper.go @@ -5,8 +5,11 @@ import ( "crypto/md5" "crypto/sha1" "fmt" + "io" "net/http" "net/url" + "os" + "strings" ) // 计算 md5 或 sha1 时的分块大小 @@ -112,3 +115,28 @@ func DecodeURIComponent(s string) (string, error) { func EncodeURIComponent(s string) string { return encodeURIComponent(s) } + +func GetReaderLen(reader io.Reader) (length int64, err error) { + switch v := reader.(type) { + case *bytes.Buffer: + length = int64(v.Len()) + case *bytes.Reader: + length = int64(v.Len()) + case *strings.Reader: + length = int64(v.Len()) + case *os.File: + stat, ferr := v.Stat() + if ferr != nil { + err = fmt.Errorf("can't get reader length: %s", ferr.Error()) + } else { + length = stat.Size() + } + case *io.LimitedReader: + length = int64(v.N) + case FixedLengthReader: + length = v.Size() + default: + err = fmt.Errorf("can't get reader content length, unkown reader type") + } + return +} diff --git a/object.go b/object.go index 8933c69..5d3337a 100644 --- a/object.go +++ b/object.go @@ -12,6 +12,7 @@ import ( "net/url" "os" "sort" + "strconv" "strings" "time" ) @@ -35,6 +36,9 @@ type ObjectGetOptions struct { XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` + + // 下载进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil + Listener ProgressListener `header:"-" url:"-" xml:"-"` } // presignedURLTestingOptions is the opt of presigned url @@ -65,6 +69,14 @@ func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOpti disableCloseBody: true, } resp, err := s.client.send(ctx, &sendOpt) + + if opt != nil && opt.Listener != nil { + if err == nil && resp != nil { + if totalBytes, e := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64); e == nil { + resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener) + } + } + } return resp, err } @@ -152,6 +164,9 @@ type ObjectPutHeaderOptions struct { //兼容其他自定义头部 XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"` XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` + + // 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil + Listener ProgressListener `header:"-" url:"-" xml:"-"` } // ObjectPutOptions the options of put object @@ -166,6 +181,14 @@ type ObjectPutOptions struct { // // https://www.qcloud.com/document/product/436/7749 func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) { + if opt != nil && opt.Listener != nil { + totalBytes, err := GetReaderLen(r) + if err != nil { + return nil, err + } + r = TeeReader(r, nil, totalBytes, opt.Listener) + } + sendOpt := sendOptions{ baseURL: s.client.BaseURL.BucketURL, uri: "/" + encodeURIComponent(name), @@ -174,6 +197,7 @@ func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt * optHeader: opt, } resp, err := s.client.send(ctx, &sendOpt) + return resp, err } @@ -571,27 +595,27 @@ func DividePart(fileSize int64) (int64, int64) { return partNum, partSize } -func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) { +func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int, error) { if filePath == "" { - return nil, 0, errors.New("filePath invalid") + return 0, nil, 0, errors.New("filePath invalid") } file, err := os.Open(filePath) if err != nil { - return nil, 0, err + return 0, nil, 0, err } defer file.Close() stat, err := file.Stat() if err != nil { - return nil, 0, err + return 0, nil, 0, err } var partNum int64 if partSize > 0 { partSize = partSize * 1024 * 1024 partNum = stat.Size() / partSize if partNum >= 10000 { - return nil, 0, errors.New("Too many parts, out of 10000") + return 0, nil, 0, errors.New("Too many parts, out of 10000") } } else { partNum, partSize = DividePart(stat.Size()) @@ -614,7 +638,7 @@ func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) partNum++ } - return chunks, int(partNum), nil + return int64(stat.Size()), chunks, int(partNum), nil } @@ -707,7 +731,7 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string opt = &MultiUploadOptions{} } // 1.Get the file chunk - chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize) + totalBytes, chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize) if err != nil { return nil, nil, err } @@ -768,6 +792,15 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string go worker(s, chjobs, chresults) } + // progress started event + var listener ProgressListener + var consumedBytes int64 + if opt.OptIni != nil { + listener = opt.OptIni.Listener + } + event := newProgressEvent(ProgressStartedEvent, 0, 0, totalBytes) + progressCallback(listener, event) + // 4.Push jobs for _, chunk := range chunks { if chunk.Done { @@ -798,22 +831,34 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string optcom.Parts = append(optcom.Parts, Object{ PartNumber: chunks[i].Number, ETag: chunks[i].ETag}, ) + consumedBytes += chunks[i].Size + event = newProgressEvent(ProgressDataEvent, chunks[i].Size, consumedBytes, totalBytes) + progressCallback(listener, event) continue } res := <-chresults // Notice one part fail can not get the etag according. if res.Resp == nil || res.err != nil { // Some part already fail, can not to get the header inside. - return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error()) + err := fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error()) + event = newProgressEvent(ProgressFailedEvent, 0, consumedBytes, totalBytes, err) + progressCallback(listener, event) + return nil, nil, err } // Notice one part fail can not get the etag according. etag := res.Resp.Header.Get("ETag") optcom.Parts = append(optcom.Parts, Object{ PartNumber: res.PartNumber, ETag: etag}, ) + consumedBytes += chunks[res.PartNumber-1].Size + event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes) + progressCallback(listener, event) } sort.Sort(ObjectList(optcom.Parts)) + event = newProgressEvent(ProgressCompletedEvent, 0, consumedBytes, totalBytes) + progressCallback(listener, event) + v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom) return v, resp, err diff --git a/object_part.go b/object_part.go index ee181b8..718899c 100644 --- a/object_part.go +++ b/object_part.go @@ -50,6 +50,9 @@ type ObjectUploadPartOptions struct { XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"` XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"` + + // 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil + Listener ProgressListener `header:"-" url:"-" xml:"-"` } // UploadPart 请求实现在初始化以后的分块上传,支持的块的数量为1到10000,块的大小为1 MB 到5 GB。 @@ -61,6 +64,13 @@ type ObjectUploadPartOptions struct { // // https://www.qcloud.com/document/product/436/7750 func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, partNumber int, r io.Reader, opt *ObjectUploadPartOptions) (*Response, error) { + if opt != nil && opt.Listener != nil { + totalBytes, err := GetReaderLen(r) + if err != nil { + return nil, err + } + r = TeeReader(r, nil, totalBytes, opt.Listener) + } u := fmt.Sprintf("/%s?partNumber=%d&uploadId=%s", encodeURIComponent(name), partNumber, uploadID) sendOpt := sendOptions{ baseURL: s.client.BaseURL.BucketURL, diff --git a/progress.go b/progress.go new file mode 100644 index 0000000..65d3185 --- /dev/null +++ b/progress.go @@ -0,0 +1,135 @@ +package cos + +import ( + "fmt" + "io" +) + +type ProgressEventType int + +const ( + // 数据开始传输 + ProgressStartedEvent ProgressEventType = iota + // 数据传输中 + ProgressDataEvent + // 数据传输完成, 但不能表示对应API调用完成 + ProgressCompletedEvent + // 只有在数据传输时发生错误才会返回 + ProgressFailedEvent +) + +type ProgressEvent struct { + EventType ProgressEventType + RWBytes int64 + ConsumedBytes int64 + TotalBytes int64 + Err error +} + +func newProgressEvent(eventType ProgressEventType, rwBytes, consumed, total int64, err ...error) *ProgressEvent { + event := &ProgressEvent{ + EventType: eventType, + RWBytes: rwBytes, + ConsumedBytes: consumed, + TotalBytes: total, + } + if len(err) > 0 { + event.Err = err[0] + } + return event +} + +// 用户自定义Listener需要实现该方法 +type ProgressListener interface { + ProgressChangedCallback(event *ProgressEvent) +} + +func progressCallback(listener ProgressListener, event *ProgressEvent) { + if listener != nil && event != nil { + listener.ProgressChangedCallback(event) + } +} + +type teeReader struct { + reader io.Reader + writer io.Writer + consumedBytes int64 + totalBytes int64 + listener ProgressListener +} + +func (r *teeReader) Read(p []byte) (int, error) { + if r.consumedBytes == 0 { + event := newProgressEvent(ProgressStartedEvent, 0, r.consumedBytes, r.totalBytes) + progressCallback(r.listener, event) + } + + n, err := r.reader.Read(p) + if err != nil && err != io.EOF { + event := newProgressEvent(ProgressFailedEvent, 0, r.consumedBytes, r.totalBytes, err) + progressCallback(r.listener, event) + } + if n > 0 { + r.consumedBytes += int64(n) + if r.writer != nil { + if n, err := r.writer.Write(p[:n]); err != nil { + return n, err + } + } + if r.listener != nil { + event := newProgressEvent(ProgressDataEvent, int64(n), r.consumedBytes, r.totalBytes) + progressCallback(r.listener, event) + } + } + + if err == io.EOF { + event := newProgressEvent(ProgressCompletedEvent, int64(n), r.consumedBytes, r.totalBytes) + progressCallback(r.listener, event) + } + + return n, err +} + +func (r *teeReader) Close() error { + if rc, ok := r.reader.(io.ReadCloser); ok { + return rc.Close() + } + return nil +} + +func TeeReader(reader io.Reader, writer io.Writer, total int64, listener ProgressListener) *teeReader { + return &teeReader{ + reader: reader, + writer: writer, + consumedBytes: 0, + totalBytes: total, + listener: listener, + } +} + +type FixedLengthReader interface { + io.Reader + Size() int64 +} + +type DefaultProgressListener struct { +} + +func (l *DefaultProgressListener) ProgressChangedCallback(event *ProgressEvent) { + switch event.EventType { + case ProgressStartedEvent: + fmt.Printf("Transfer Start [ConsumedBytes/TotalBytes: %d/%d]\n", + event.ConsumedBytes, event.TotalBytes) + case ProgressDataEvent: + fmt.Printf("\rTransfer Data [ConsumedBytes/TotalBytes: %d/%d, %d%%]", + event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes) + case ProgressCompletedEvent: + fmt.Printf("\nTransfer Complete [ConsumedBytes/TotalBytes: %d/%d]\n", + event.ConsumedBytes, event.TotalBytes) + case ProgressFailedEvent: + fmt.Printf("\nTransfer Failed [ConsumedBytes/TotalBytes: %d/%d] [Err: %v]\n", + event.ConsumedBytes, event.TotalBytes, event.Err) + default: + fmt.Printf("Progress Changed Error: unknown progress event type\n") + } +}