Browse Source

add progress

tags/v0.7.13^2
jojoliang 5 years ago
parent
commit
2afc5e192c
  1. 23
      example/object/get.go
  2. 15
      example/object/put.go
  3. 21
      example/object/upload.go
  4. 40
      example/object/uploadPart.go
  5. 28
      helper.go
  6. 61
      object.go
  7. 10
      object_part.go
  8. 135
      progress.go

23
example/object/get.go

@ -33,7 +33,7 @@ func log_status(err error) {
}
func main() {
u, _ := url.Parse("https://test-1253846586.cos.ap-guangzhou.myqcloud.com")
u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com")
b := &cos.BaseURL{BucketURL: u}
c := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
@ -48,8 +48,8 @@ func main() {
},
})
// Case1 Download object into ReadCloser(). the body needs to be closed
name := "test/hello.txt"
// Case1 通过resp.Body下载对象,Body需要关闭
name := "test/example"
resp, err := c.Object.Get(context.Background(), name, nil)
log_status(err)
@ -57,8 +57,8 @@ func main() {
resp.Body.Close()
fmt.Printf("%s\n", string(bs))
// Case2 Download object to local file. the body needs to be closed
fd, err := os.OpenFile("hello.txt", os.O_WRONLY|os.O_CREATE, 0660)
// Case2 下载对象到文件. Body需要关闭
fd, err := os.OpenFile("test", os.O_WRONLY|os.O_CREATE, 0660)
log_status(err)
defer fd.Close()
@ -68,11 +68,11 @@ func main() {
io.Copy(fd, resp.Body)
resp.Body.Close()
// Case3 Download object to local file path
_, err = c.Object.GetToFile(context.Background(), name, "hello_1.txt", nil)
// Case3 下载对象到文件
_, err = c.Object.GetToFile(context.Background(), name, "test", nil)
log_status(err)
// Case4 Download object with range header, can used to concurrent download
// Case4 range下载对象,可以根据range实现并发下载
opt := &cos.ObjectGetOptions{
ResponseContentType: "text/html",
Range: "bytes=0-3",
@ -82,4 +82,11 @@ func main() {
bs, _ = ioutil.ReadAll(resp.Body)
resp.Body.Close()
fmt.Printf("%s\n", string(bs))
// Case5 下载对象到文件,查看下载进度
opt = &cos.ObjectGetOptions{
Listener: &cos.DefaultProgressListener{},
}
_, err = c.Object.GetToFile(context.Background(), name, "test", opt)
log_status(err)
}

15
example/object/put.go

@ -44,20 +44,19 @@ func main() {
// Notice when put a large file and set need the request body, might happend out of memory error.
RequestBody: false,
ResponseHeader: true,
ResponseBody: true,
ResponseBody: false,
},
},
})
// Case1 normal put object
name := "test/objectPut.go"
// Case1 上传对象
name := "test/example"
f := strings.NewReader("test")
_, err := c.Object.Put(context.Background(), name, f, nil)
log_status(err)
// Case2 put object with the options
name = "test/put_option.go"
// Case2 使用options上传对象
f = strings.NewReader("test xxx")
opt := &cos.ObjectPutOptions{
ObjectPutHeaderOptions: &cos.ObjectPutHeaderOptions{
@ -71,7 +70,11 @@ func main() {
_, err = c.Object.Put(context.Background(), name, f, opt)
log_status(err)
// Case3 put object by local file path
// Case3 通过本地文件上传对象
_, err = c.Object.PutFromFile(context.Background(), name, "./test", nil)
log_status(err)
// Case4 查看上传进度
opt.ObjectPutHeaderOptions.Listener = &cos.DefaultProgressListener{}
_, err = c.Object.PutFromFile(context.Background(), name, "./test", opt)
}

21
example/object/upload.go

@ -49,9 +49,26 @@ func main() {
},
})
// Case1 多线程上传对象
opt := &cos.MultiUploadOptions{
ThreadPoolSize: 3,
}
v, _, err := c.Object.Upload(
context.Background(), "gomulput1G", "./test1G", nil,
context.Background(), "gomulput1G", "./test1G", opt,
)
log_status(err)
fmt.Printf("Case1 done, %v\n", v)
// Case2 多线程上传对象,查看上传进度
opt.OptIni = &cos.InitiateMultipartUploadOptions{
nil,
&cos.ObjectPutHeaderOptions{
Listener: &cos.DefaultProgressListener{},
},
}
v, _, err = c.Object.Upload(
context.Background(), "gomulput1G", "./test1G", opt,
)
log_status(err)
fmt.Println(v)
fmt.Printf("Case2 done, %v\n", v)
}

40
example/object/uploadPart.go

@ -41,7 +41,7 @@ func initUpload(c *cos.Client, name string) *cos.InitiateMultipartUploadResult {
}
func main() {
u, _ := url.Parse("https://test-1253846586.cos.ap-guangzhou.myqcloud.com")
u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com")
b := &cos.BaseURL{BucketURL: u}
c := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
@ -49,20 +49,50 @@ func main() {
SecretKey: os.Getenv("COS_SECRETKEY"),
Transport: &debug.DebugRequestTransport{
RequestHeader: true,
RequestBody: true,
RequestBody: false,
ResponseHeader: true,
ResponseBody: true,
ResponseBody: false,
},
},
})
optcom := &cos.CompleteMultipartUploadOptions{}
name := "test/test_multi_upload.go"
up := initUpload(c, name)
uploadID := up.UploadID
fd, err := os.Open("test")
if err != nil {
fmt.Printf("Open File Error: %v\n", err)
return
}
defer fd.Close()
stat, err := fd.Stat()
if err != nil {
fmt.Printf("Stat File Error: %v\n", err)
return
}
opt := &cos.ObjectUploadPartOptions{
Listener: &cos.DefaultProgressListener{},
ContentLength: int(stat.Size()),
}
resp, err := c.Object.UploadPart(
context.Background(), name, uploadID, 1, fd, opt,
)
optcom.Parts = append(optcom.Parts, cos.Object{
PartNumber: 1, ETag: resp.Header.Get("ETag"),
})
log_status(err)
f := strings.NewReader("test heoo")
_, err := c.Object.UploadPart(
context.Background(), name, uploadID, 1, f, nil,
resp, err = c.Object.UploadPart(
context.Background(), name, uploadID, 2, f, nil,
)
log_status(err)
optcom.Parts = append(optcom.Parts, cos.Object{
PartNumber: 2, ETag: resp.Header.Get("ETag"),
})
_, _, err = c.Object.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
log_status(err)
}

28
helper.go

@ -5,8 +5,11 @@ import (
"crypto/md5"
"crypto/sha1"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
)
// 计算 md5 或 sha1 时的分块大小
@ -112,3 +115,28 @@ func DecodeURIComponent(s string) (string, error) {
func EncodeURIComponent(s string) string {
return encodeURIComponent(s)
}
func GetReaderLen(reader io.Reader) (length int64, err error) {
switch v := reader.(type) {
case *bytes.Buffer:
length = int64(v.Len())
case *bytes.Reader:
length = int64(v.Len())
case *strings.Reader:
length = int64(v.Len())
case *os.File:
stat, ferr := v.Stat()
if ferr != nil {
err = fmt.Errorf("can't get reader length: %s", ferr.Error())
} else {
length = stat.Size()
}
case *io.LimitedReader:
length = int64(v.N)
case FixedLengthReader:
length = v.Size()
default:
err = fmt.Errorf("can't get reader content length, unkown reader type")
}
return
}

61
object.go

@ -12,6 +12,7 @@ import (
"net/url"
"os"
"sort"
"strconv"
"strings"
"time"
)
@ -35,6 +36,9 @@ type ObjectGetOptions struct {
XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
// 下载进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
Listener ProgressListener `header:"-" url:"-" xml:"-"`
}
// presignedURLTestingOptions is the opt of presigned url
@ -65,6 +69,14 @@ func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOpti
disableCloseBody: true,
}
resp, err := s.client.send(ctx, &sendOpt)
if opt != nil && opt.Listener != nil {
if err == nil && resp != nil {
if totalBytes, e := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64); e == nil {
resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener)
}
}
}
return resp, err
}
@ -152,6 +164,9 @@ type ObjectPutHeaderOptions struct {
//兼容其他自定义头部
XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
// 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
Listener ProgressListener `header:"-" url:"-" xml:"-"`
}
// ObjectPutOptions the options of put object
@ -166,6 +181,14 @@ type ObjectPutOptions struct {
//
// https://www.qcloud.com/document/product/436/7749
func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
if opt != nil && opt.Listener != nil {
totalBytes, err := GetReaderLen(r)
if err != nil {
return nil, err
}
r = TeeReader(r, nil, totalBytes, opt.Listener)
}
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: "/" + encodeURIComponent(name),
@ -174,6 +197,7 @@ func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *
optHeader: opt,
}
resp, err := s.client.send(ctx, &sendOpt)
return resp, err
}
@ -571,27 +595,27 @@ func DividePart(fileSize int64) (int64, int64) {
return partNum, partSize
}
func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) {
func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int, error) {
if filePath == "" {
return nil, 0, errors.New("filePath invalid")
return 0, nil, 0, errors.New("filePath invalid")
}
file, err := os.Open(filePath)
if err != nil {
return nil, 0, err
return 0, nil, 0, err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return nil, 0, err
return 0, nil, 0, err
}
var partNum int64
if partSize > 0 {
partSize = partSize * 1024 * 1024
partNum = stat.Size() / partSize
if partNum >= 10000 {
return nil, 0, errors.New("Too many parts, out of 10000")
return 0, nil, 0, errors.New("Too many parts, out of 10000")
}
} else {
partNum, partSize = DividePart(stat.Size())
@ -614,7 +638,7 @@ func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error)
partNum++
}
return chunks, int(partNum), nil
return int64(stat.Size()), chunks, int(partNum), nil
}
@ -707,7 +731,7 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
opt = &MultiUploadOptions{}
}
// 1.Get the file chunk
chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
totalBytes, chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
if err != nil {
return nil, nil, err
}
@ -768,6 +792,15 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
go worker(s, chjobs, chresults)
}
// progress started event
var listener ProgressListener
var consumedBytes int64
if opt.OptIni != nil {
listener = opt.OptIni.Listener
}
event := newProgressEvent(ProgressStartedEvent, 0, 0, totalBytes)
progressCallback(listener, event)
// 4.Push jobs
for _, chunk := range chunks {
if chunk.Done {
@ -798,22 +831,34 @@ func (s *ObjectService) Upload(ctx context.Context, name string, filepath string
optcom.Parts = append(optcom.Parts, Object{
PartNumber: chunks[i].Number, ETag: chunks[i].ETag},
)
consumedBytes += chunks[i].Size
event = newProgressEvent(ProgressDataEvent, chunks[i].Size, consumedBytes, totalBytes)
progressCallback(listener, event)
continue
}
res := <-chresults
// Notice one part fail can not get the etag according.
if res.Resp == nil || res.err != nil {
// Some part already fail, can not to get the header inside.
return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
err := fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
event = newProgressEvent(ProgressFailedEvent, 0, consumedBytes, totalBytes, err)
progressCallback(listener, event)
return nil, nil, err
}
// Notice one part fail can not get the etag according.
etag := res.Resp.Header.Get("ETag")
optcom.Parts = append(optcom.Parts, Object{
PartNumber: res.PartNumber, ETag: etag},
)
consumedBytes += chunks[res.PartNumber-1].Size
event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes)
progressCallback(listener, event)
}
sort.Sort(ObjectList(optcom.Parts))
event = newProgressEvent(ProgressCompletedEvent, 0, consumedBytes, totalBytes)
progressCallback(listener, event)
v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
return v, resp, err

10
object_part.go

@ -50,6 +50,9 @@ type ObjectUploadPartOptions struct {
XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
// 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
Listener ProgressListener `header:"-" url:"-" xml:"-"`
}
// UploadPart 请求实现在初始化以后的分块上传,支持的块的数量为1到10000,块的大小为1 MB 到5 GB。
@ -61,6 +64,13 @@ type ObjectUploadPartOptions struct {
//
// https://www.qcloud.com/document/product/436/7750
func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, partNumber int, r io.Reader, opt *ObjectUploadPartOptions) (*Response, error) {
if opt != nil && opt.Listener != nil {
totalBytes, err := GetReaderLen(r)
if err != nil {
return nil, err
}
r = TeeReader(r, nil, totalBytes, opt.Listener)
}
u := fmt.Sprintf("/%s?partNumber=%d&uploadId=%s", encodeURIComponent(name), partNumber, uploadID)
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,

135
progress.go

@ -0,0 +1,135 @@
package cos
import (
"fmt"
"io"
)
type ProgressEventType int
const (
// 数据开始传输
ProgressStartedEvent ProgressEventType = iota
// 数据传输中
ProgressDataEvent
// 数据传输完成, 但不能表示对应API调用完成
ProgressCompletedEvent
// 只有在数据传输时发生错误才会返回
ProgressFailedEvent
)
type ProgressEvent struct {
EventType ProgressEventType
RWBytes int64
ConsumedBytes int64
TotalBytes int64
Err error
}
func newProgressEvent(eventType ProgressEventType, rwBytes, consumed, total int64, err ...error) *ProgressEvent {
event := &ProgressEvent{
EventType: eventType,
RWBytes: rwBytes,
ConsumedBytes: consumed,
TotalBytes: total,
}
if len(err) > 0 {
event.Err = err[0]
}
return event
}
// 用户自定义Listener需要实现该方法
type ProgressListener interface {
ProgressChangedCallback(event *ProgressEvent)
}
func progressCallback(listener ProgressListener, event *ProgressEvent) {
if listener != nil && event != nil {
listener.ProgressChangedCallback(event)
}
}
type teeReader struct {
reader io.Reader
writer io.Writer
consumedBytes int64
totalBytes int64
listener ProgressListener
}
func (r *teeReader) Read(p []byte) (int, error) {
if r.consumedBytes == 0 {
event := newProgressEvent(ProgressStartedEvent, 0, r.consumedBytes, r.totalBytes)
progressCallback(r.listener, event)
}
n, err := r.reader.Read(p)
if err != nil && err != io.EOF {
event := newProgressEvent(ProgressFailedEvent, 0, r.consumedBytes, r.totalBytes, err)
progressCallback(r.listener, event)
}
if n > 0 {
r.consumedBytes += int64(n)
if r.writer != nil {
if n, err := r.writer.Write(p[:n]); err != nil {
return n, err
}
}
if r.listener != nil {
event := newProgressEvent(ProgressDataEvent, int64(n), r.consumedBytes, r.totalBytes)
progressCallback(r.listener, event)
}
}
if err == io.EOF {
event := newProgressEvent(ProgressCompletedEvent, int64(n), r.consumedBytes, r.totalBytes)
progressCallback(r.listener, event)
}
return n, err
}
func (r *teeReader) Close() error {
if rc, ok := r.reader.(io.ReadCloser); ok {
return rc.Close()
}
return nil
}
func TeeReader(reader io.Reader, writer io.Writer, total int64, listener ProgressListener) *teeReader {
return &teeReader{
reader: reader,
writer: writer,
consumedBytes: 0,
totalBytes: total,
listener: listener,
}
}
type FixedLengthReader interface {
io.Reader
Size() int64
}
type DefaultProgressListener struct {
}
func (l *DefaultProgressListener) ProgressChangedCallback(event *ProgressEvent) {
switch event.EventType {
case ProgressStartedEvent:
fmt.Printf("Transfer Start [ConsumedBytes/TotalBytes: %d/%d]\n",
event.ConsumedBytes, event.TotalBytes)
case ProgressDataEvent:
fmt.Printf("\rTransfer Data [ConsumedBytes/TotalBytes: %d/%d, %d%%]",
event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes)
case ProgressCompletedEvent:
fmt.Printf("\nTransfer Complete [ConsumedBytes/TotalBytes: %d/%d]\n",
event.ConsumedBytes, event.TotalBytes)
case ProgressFailedEvent:
fmt.Printf("\nTransfer Failed [ConsumedBytes/TotalBytes: %d/%d] [Err: %v]\n",
event.ConsumedBytes, event.TotalBytes, event.Err)
default:
fmt.Printf("Progress Changed Error: unknown progress event type\n")
}
}
Loading…
Cancel
Save