Fix the multiupload property
This commit is contained in:
@@ -2,10 +2,10 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
"net/http"
|
|
||||||
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/tencentyun/cos-go-sdk-v5"
|
"github.com/tencentyun/cos-go-sdk-v5"
|
||||||
@@ -13,14 +13,14 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
u, _ := url.Parse("http://tencentyun02-1252448703.cos.ap-guangzhou.myqcloud.com")
|
u, _ := url.Parse("http://alanbj-1251668577.cos.ap-beijing.myqcloud.com")
|
||||||
b := &cos.BaseURL{BucketURL: u}
|
b := &cos.BaseURL{BucketURL: u}
|
||||||
c := cos.NewClient(b, &http.Client{
|
c := cos.NewClient(b, &http.Client{
|
||||||
//设置超时时间
|
//设置超时时间
|
||||||
Timeout: 100 * time.Second,
|
Timeout: 100 * time.Second,
|
||||||
Transport: &cos.AuthorizationTransport{
|
Transport: &cos.AuthorizationTransport{
|
||||||
SecretID: os.Getenv("COS_Key"),
|
SecretID: os.Getenv("COS_SECRETID"),
|
||||||
SecretKey: os.Getenv("COS_Secret"),
|
SecretKey: os.Getenv("COS_SECRETKEY"),
|
||||||
Transport: &debug.DebugRequestTransport{
|
Transport: &debug.DebugRequestTransport{
|
||||||
RequestHeader: false,
|
RequestHeader: false,
|
||||||
RequestBody: false,
|
RequestBody: false,
|
||||||
@@ -29,15 +29,16 @@ func main() {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
f,err:=os.Open("E:/cos-php-sdk.zip")
|
|
||||||
if err!=nil {panic(err)}
|
|
||||||
opt := &cos.MultiUploadOptions{
|
opt := &cos.MultiUploadOptions{
|
||||||
OptIni: nil,
|
OptIni: nil,
|
||||||
PartSize:1,
|
PartSize: 1,
|
||||||
}
|
}
|
||||||
v, _, err := c.Object.MultiUpload(
|
v, _, err := c.Object.MultiUpload(
|
||||||
context.Background(), "test", f, opt,
|
context.Background(), "test/gomulput1G", "./test1G", opt,
|
||||||
)
|
)
|
||||||
if err!=nil {panic(err)}
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
fmt.Println(v)
|
fmt.Println(v)
|
||||||
}
|
}
|
||||||
|
|||||||
184
object.go
184
object.go
@@ -9,7 +9,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -421,9 +421,114 @@ type Object struct {
|
|||||||
Owner *Owner `xml:",omitempty"`
|
Owner *Owner `xml:",omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MultiUploadOptions is the option of the multiupload,
|
||||||
|
// ThreadPoolSize default is one
|
||||||
type MultiUploadOptions struct {
|
type MultiUploadOptions struct {
|
||||||
OptIni *InitiateMultipartUploadOptions
|
OptIni *InitiateMultipartUploadOptions
|
||||||
PartSize int
|
PartSize int64
|
||||||
|
ThreadPoolSize int
|
||||||
|
}
|
||||||
|
|
||||||
|
type Chunk struct {
|
||||||
|
Number int
|
||||||
|
OffSet int64
|
||||||
|
Size int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// jobs
|
||||||
|
type Jobs struct {
|
||||||
|
Name string
|
||||||
|
UploadId string
|
||||||
|
FilePath string
|
||||||
|
RetryTimes int
|
||||||
|
Chunk Chunk
|
||||||
|
Data io.Reader
|
||||||
|
Opt *ObjectUploadPartOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
type Results struct {
|
||||||
|
PartNumber int
|
||||||
|
Resp *Response
|
||||||
|
}
|
||||||
|
|
||||||
|
func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
|
||||||
|
for j := range jobs {
|
||||||
|
fd, err := os.Open(j.FilePath)
|
||||||
|
var res Results
|
||||||
|
if err != nil {
|
||||||
|
res.PartNumber = j.Chunk.Number
|
||||||
|
res.Resp = nil
|
||||||
|
results <- &res
|
||||||
|
}
|
||||||
|
|
||||||
|
fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
|
||||||
|
// UploadPart do not support the chunk trsf, so need to add the content-length
|
||||||
|
opt := &ObjectUploadPartOptions{
|
||||||
|
ContentLength: int(j.Chunk.Size),
|
||||||
|
}
|
||||||
|
|
||||||
|
rt := j.RetryTimes
|
||||||
|
for {
|
||||||
|
resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
|
||||||
|
&io.LimitedReader{R: fd, N: j.Chunk.Size}, opt)
|
||||||
|
res.PartNumber = j.Chunk.Number
|
||||||
|
res.Resp = resp
|
||||||
|
if err != nil {
|
||||||
|
rt--
|
||||||
|
if rt == 0 {
|
||||||
|
fd.Close()
|
||||||
|
results <- &res
|
||||||
|
break
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fd.Close()
|
||||||
|
results <- &res
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) {
|
||||||
|
if filePath == "" || partSize <= 0 {
|
||||||
|
return nil, 0, errors.New("chunkSize invalid")
|
||||||
|
}
|
||||||
|
|
||||||
|
file, err := os.Open(filePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
stat, err := file.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
var partNum = stat.Size() / partSize
|
||||||
|
// 10000 max part size
|
||||||
|
if partNum >= 10000 {
|
||||||
|
return nil, 0, errors.New("Too many parts, out of 10000")
|
||||||
|
}
|
||||||
|
|
||||||
|
var chunks []Chunk
|
||||||
|
var chunk = Chunk{}
|
||||||
|
for i := int64(0); i < partNum; i++ {
|
||||||
|
chunk.Number = int(i + 1)
|
||||||
|
chunk.OffSet = i * partSize
|
||||||
|
chunk.Size = partSize
|
||||||
|
chunks = append(chunks, chunk)
|
||||||
|
}
|
||||||
|
|
||||||
|
if stat.Size()%partSize > 0 {
|
||||||
|
chunk.Number = len(chunks) + 1
|
||||||
|
chunk.OffSet = int64(len(chunks)) * partSize
|
||||||
|
chunk.Size = stat.Size() % partSize
|
||||||
|
chunks = append(chunks, chunk)
|
||||||
|
partNum++
|
||||||
|
}
|
||||||
|
|
||||||
|
return chunks, int(partNum), nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MultiUpload 为高级upload接口,并发分块上传
|
// MultiUpload 为高级upload接口,并发分块上传
|
||||||
@@ -433,59 +538,66 @@ type MultiUploadOptions struct {
|
|||||||
// 同时请确认分块数量不超过10000
|
// 同时请确认分块数量不超过10000
|
||||||
//
|
//
|
||||||
|
|
||||||
func (s *ObjectService) MultiUpload(ctx context.Context, name string, r io.Reader, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
|
func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
|
||||||
|
// 1.Get the file chunk
|
||||||
|
bufSize := opt.PartSize * 1024 * 1024
|
||||||
|
chunks, partNum, err := SplitFileIntoChunks(filepath, bufSize)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2.Init
|
||||||
optini := opt.OptIni
|
optini := opt.OptIni
|
||||||
res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
|
res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
uploadID := res.UploadID
|
uploadID := res.UploadID
|
||||||
bufSize := opt.PartSize * 1024 * 1024
|
var poolSize int
|
||||||
buffer := make([]byte, bufSize)
|
if opt.ThreadPoolSize > 0 {
|
||||||
|
poolSize = opt.ThreadPoolSize
|
||||||
|
} else {
|
||||||
|
// Default is one
|
||||||
|
poolSize = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
chjobs := make(chan *Jobs, 100)
|
||||||
|
chresults := make(chan *Results, 10000)
|
||||||
optcom := &CompleteMultipartUploadOptions{}
|
optcom := &CompleteMultipartUploadOptions{}
|
||||||
|
|
||||||
PartUpload := func(ch chan *Response, ctx context.Context, name string, uploadId string, partNumber int, data io.Reader, opt *ObjectUploadPartOptions) {
|
// 3.Start worker
|
||||||
|
for w := 1; w <= poolSize; w++ {
|
||||||
defer func() {
|
go worker(s, chjobs, chresults)
|
||||||
if err := recover(); err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
resp, _ := s.UploadPart(context.Background(), name, uploadId, partNumber, data, nil)
|
|
||||||
ch <- resp
|
|
||||||
}
|
}
|
||||||
|
|
||||||
chs := make([]chan *Response, 10000)
|
// 4.Push jobs
|
||||||
PartNumber := 0
|
for _, chunk := range chunks {
|
||||||
for i := 1; true; i++ {
|
job := &Jobs{
|
||||||
bytesread, err := r.Read(buffer)
|
Name: name,
|
||||||
if err != nil {
|
RetryTimes: 3,
|
||||||
if err != io.EOF {
|
FilePath: filepath,
|
||||||
return nil, nil, err
|
UploadId: uploadID,
|
||||||
}
|
Chunk: chunk,
|
||||||
// If read fail also need to create i index respon in chan,
|
|
||||||
// in case below out of index to panic.
|
|
||||||
chs[i] = make(chan *Response)
|
|
||||||
PartNumber = i
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
chs[i] = make(chan *Response)
|
chjobs <- job
|
||||||
go PartUpload(chs[i], context.Background(), name, uploadID, i, strings.NewReader(string(buffer[:bytesread])), nil)
|
|
||||||
}
|
}
|
||||||
|
close(chjobs)
|
||||||
|
|
||||||
for i := 1; i < PartNumber; i++ {
|
// 5.Recv the resp etag to complete
|
||||||
resp := <-chs[i]
|
for i := 1; i <= partNum; i++ {
|
||||||
|
res := <-chresults
|
||||||
// Notice one part fail can not get the etag according.
|
// Notice one part fail can not get the etag according.
|
||||||
if resp == nil {
|
if res.Resp == nil {
|
||||||
// Some part already fail, can not to get the header inside.
|
// Some part already fail, can not to get the header inside.
|
||||||
return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, i)
|
return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, res.PartNumber)
|
||||||
}
|
}
|
||||||
etag := resp.Header.Get("ETag")
|
// Notice one part fail can not get the etag according.
|
||||||
|
etag := res.Resp.Header.Get("ETag")
|
||||||
optcom.Parts = append(optcom.Parts, Object{
|
optcom.Parts = append(optcom.Parts, Object{
|
||||||
PartNumber: i, ETag: etag},
|
PartNumber: res.PartNumber, ETag: etag},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
sort.Sort(ObjectList(optcom.Parts))
|
||||||
|
|
||||||
v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
|
v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user