Browse Source

Merge pull request #50 from toranger/master

Fix the multiupload property
tags/v0.7.8
toranger 6 years ago
committed by GitHub
parent
commit
f07404cefc
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      example/object/MutiUpload.go
  2. 184
      object.go

23
example/object/MutiUpload.go

@ -2,10 +2,10 @@ package main
import (
"context"
"net/http"
"net/url"
"os"
"time"
"net/http"
"fmt"
"github.com/tencentyun/cos-go-sdk-v5"
@ -13,14 +13,14 @@ import (
)
func main() {
u, _ := url.Parse("http://tencentyun02-1252448703.cos.ap-guangzhou.myqcloud.com")
u, _ := url.Parse("http://alanbj-1251668577.cos.ap-beijing.myqcloud.com")
b := &cos.BaseURL{BucketURL: u}
c := cos.NewClient(b, &http.Client{
//设置超时时间
Timeout: 100 * time.Second,
Transport: &cos.AuthorizationTransport{
SecretID: os.Getenv("COS_Key"),
SecretKey: os.Getenv("COS_Secret"),
SecretID: os.Getenv("COS_SECRETID"),
SecretKey: os.Getenv("COS_SECRETKEY"),
Transport: &debug.DebugRequestTransport{
RequestHeader: false,
RequestBody: false,
@ -29,15 +29,16 @@ func main() {
},
},
})
f,err:=os.Open("E:/cos-php-sdk.zip")
if err!=nil {panic(err)}
opt := &cos.MultiUploadOptions{
OptIni: nil,
PartSize:1,
OptIni: nil,
PartSize: 1,
}
v, _, err := c.Object.MultiUpload(
context.Background(), "test", f, opt,
context.Background(), "test/gomulput1G", "./test1G", opt,
)
if err!=nil {panic(err)}
if err != nil {
panic(err)
}
fmt.Println(v)
}
}

184
object.go

@ -9,7 +9,7 @@ import (
"net/http"
"net/url"
"os"
"strings"
"sort"
"time"
)
@ -421,9 +421,114 @@ type Object struct {
Owner *Owner `xml:",omitempty"`
}
// MultiUploadOptions is the option of the multiupload,
// ThreadPoolSize default is one
type MultiUploadOptions struct {
OptIni *InitiateMultipartUploadOptions
PartSize int
OptIni *InitiateMultipartUploadOptions
PartSize int64
ThreadPoolSize int
}
type Chunk struct {
Number int
OffSet int64
Size int64
}
// jobs
type Jobs struct {
Name string
UploadId string
FilePath string
RetryTimes int
Chunk Chunk
Data io.Reader
Opt *ObjectUploadPartOptions
}
type Results struct {
PartNumber int
Resp *Response
}
func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
for j := range jobs {
fd, err := os.Open(j.FilePath)
var res Results
if err != nil {
res.PartNumber = j.Chunk.Number
res.Resp = nil
results <- &res
}
fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
// UploadPart do not support the chunk trsf, so need to add the content-length
opt := &ObjectUploadPartOptions{
ContentLength: int(j.Chunk.Size),
}
rt := j.RetryTimes
for {
resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
&io.LimitedReader{R: fd, N: j.Chunk.Size}, opt)
res.PartNumber = j.Chunk.Number
res.Resp = resp
if err != nil {
rt--
if rt == 0 {
fd.Close()
results <- &res
break
}
continue
}
fd.Close()
results <- &res
break
}
}
}
func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) {
if filePath == "" || partSize <= 0 {
return nil, 0, errors.New("chunkSize invalid")
}
file, err := os.Open(filePath)
if err != nil {
return nil, 0, err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return nil, 0, err
}
var partNum = stat.Size() / partSize
// 10000 max part size
if partNum >= 10000 {
return nil, 0, errors.New("Too many parts, out of 10000")
}
var chunks []Chunk
var chunk = Chunk{}
for i := int64(0); i < partNum; i++ {
chunk.Number = int(i + 1)
chunk.OffSet = i * partSize
chunk.Size = partSize
chunks = append(chunks, chunk)
}
if stat.Size()%partSize > 0 {
chunk.Number = len(chunks) + 1
chunk.OffSet = int64(len(chunks)) * partSize
chunk.Size = stat.Size() % partSize
chunks = append(chunks, chunk)
partNum++
}
return chunks, int(partNum), nil
}
// MultiUpload 为高级upload接口,并发分块上传
@ -433,59 +538,66 @@ type MultiUploadOptions struct {
// 同时请确认分块数量不超过10000
//
func (s *ObjectService) MultiUpload(ctx context.Context, name string, r io.Reader, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
// 1.Get the file chunk
bufSize := opt.PartSize * 1024 * 1024
chunks, partNum, err := SplitFileIntoChunks(filepath, bufSize)
if err != nil {
return nil, nil, err
}
// 2.Init
optini := opt.OptIni
res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
if err != nil {
return nil, nil, err
}
uploadID := res.UploadID
bufSize := opt.PartSize * 1024 * 1024
buffer := make([]byte, bufSize)
optcom := &CompleteMultipartUploadOptions{}
var poolSize int
if opt.ThreadPoolSize > 0 {
poolSize = opt.ThreadPoolSize
} else {
// Default is one
poolSize = 1
}
PartUpload := func(ch chan *Response, ctx context.Context, name string, uploadId string, partNumber int, data io.Reader, opt *ObjectUploadPartOptions) {
chjobs := make(chan *Jobs, 100)
chresults := make(chan *Results, 10000)
optcom := &CompleteMultipartUploadOptions{}
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
resp, _ := s.UploadPart(context.Background(), name, uploadId, partNumber, data, nil)
ch <- resp
// 3.Start worker
for w := 1; w <= poolSize; w++ {
go worker(s, chjobs, chresults)
}
chs := make([]chan *Response, 10000)
PartNumber := 0
for i := 1; true; i++ {
bytesread, err := r.Read(buffer)
if err != nil {
if err != io.EOF {
return nil, nil, err
}
// If read fail also need to create i index respon in chan,
// in case below out of index to panic.
chs[i] = make(chan *Response)
PartNumber = i
break
// 4.Push jobs
for _, chunk := range chunks {
job := &Jobs{
Name: name,
RetryTimes: 3,
FilePath: filepath,
UploadId: uploadID,
Chunk: chunk,
}
chs[i] = make(chan *Response)
go PartUpload(chs[i], context.Background(), name, uploadID, i, strings.NewReader(string(buffer[:bytesread])), nil)
chjobs <- job
}
close(chjobs)
for i := 1; i < PartNumber; i++ {
resp := <-chs[i]
// 5.Recv the resp etag to complete
for i := 1; i <= partNum; i++ {
res := <-chresults
// Notice one part fail can not get the etag according.
if resp == nil {
if res.Resp == nil {
// Some part already fail, can not to get the header inside.
return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, i)
return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, res.PartNumber)
}
etag := resp.Header.Get("ETag")
// Notice one part fail can not get the etag according.
etag := res.Resp.Header.Get("ETag")
optcom.Parts = append(optcom.Parts, Object{
PartNumber: i, ETag: etag},
PartNumber: res.PartNumber, ETag: etag},
)
}
sort.Sort(ObjectList(optcom.Parts))
v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)

Loading…
Cancel
Save