You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

931 lines
31 KiB

package cos
import (
"context"
"crypto/md5"
"encoding/xml"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"os"
"sort"
"strconv"
"strings"
"time"
)
// ObjectService 相关 API
type ObjectService service
// ObjectGetOptions is the option of GetObject
type ObjectGetOptions struct {
ResponseContentType string `url:"response-content-type,omitempty" header:"-"`
ResponseContentLanguage string `url:"response-content-language,omitempty" header:"-"`
ResponseExpires string `url:"response-expires,omitempty" header:"-"`
ResponseCacheControl string `url:"response-cache-control,omitempty" header:"-"`
ResponseContentDisposition string `url:"response-content-disposition,omitempty" header:"-"`
ResponseContentEncoding string `url:"response-content-encoding,omitempty" header:"-"`
Range string `url:"-" header:"Range,omitempty"`
IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
// SSE-C
XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
// 下载进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
Listener ProgressListener `header:"-" url:"-" xml:"-"`
}
// presignedURLTestingOptions is the opt of presigned url
type presignedURLTestingOptions struct {
authTime *AuthTime
}
// Get Object 请求可以将一个文件(Object)下载至本地。
// 该操作需要对目标 Object 具有读权限或目标 Object 对所有人都开放了读权限(公有读)。
//
// https://www.qcloud.com/document/product/436/7753
func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOptions, id ...string) (*Response, error) {
var u string
if len(id) == 1 {
u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
} else if len(id) == 0 {
u = "/" + encodeURIComponent(name)
} else {
return nil, errors.New("wrong params")
}
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: u,
method: http.MethodGet,
optQuery: opt,
optHeader: opt,
disableCloseBody: true,
}
resp, err := s.client.send(ctx, &sendOpt)
if opt != nil && opt.Listener != nil {
if err == nil && resp != nil {
if totalBytes, e := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64); e == nil {
resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener)
}
}
}
return resp, err
}
// GetToFile download the object to local file
func (s *ObjectService) GetToFile(ctx context.Context, name, localpath string, opt *ObjectGetOptions, id ...string) (*Response, error) {
resp, err := s.Get(ctx, name, opt, id...)
if err != nil {
return resp, err
}
defer resp.Body.Close()
// If file exist, overwrite it
fd, err := os.OpenFile(localpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
if err != nil {
return resp, err
}
_, err = io.Copy(fd, resp.Body)
fd.Close()
if err != nil {
return resp, err
}
return resp, nil
}
// GetPresignedURL get the object presigned to down or upload file by url
func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, ak, sk string, expired time.Duration, opt interface{}) (*url.URL, error) {
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: "/" + encodeURIComponent(name),
method: httpMethod,
optQuery: opt,
optHeader: opt,
}
req, err := s.client.newRequest(ctx, sendOpt.baseURL, sendOpt.uri, sendOpt.method, sendOpt.body, sendOpt.optQuery, sendOpt.optHeader)
if err != nil {
return nil, err
}
var authTime *AuthTime
if opt != nil {
if opt, ok := opt.(*presignedURLTestingOptions); ok {
authTime = opt.authTime
}
}
if authTime == nil {
authTime = NewAuthTime(expired)
}
authorization := newAuthorization(ak, sk, req, authTime)
sign := encodeURIComponent(authorization, []byte{'&','='})
if req.URL.RawQuery == "" {
req.URL.RawQuery = fmt.Sprintf("%s", sign)
} else {
req.URL.RawQuery = fmt.Sprintf("%s&%s", req.URL.RawQuery, sign)
}
return req.URL, nil
}
// ObjectPutHeaderOptions the options of header of the put object
type ObjectPutHeaderOptions struct {
CacheControl string `header:"Cache-Control,omitempty" url:"-"`
ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
ContentType string `header:"Content-Type,omitempty" url:"-"`
ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
ContentLength int `header:"Content-Length,omitempty" url:"-"`
ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
Expect string `header:"Expect,omitempty" url:"-"`
Expires string `header:"Expires,omitempty" url:"-"`
XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
// 自定义的 x-cos-meta-* header
XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-"`
// 可选值: Normal, Appendable
//XCosObjectType string `header:"x-cos-object-type,omitempty" url:"-"`
// Enable Server Side Encryption, Only supported: AES256
XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
// SSE-C
XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
//兼容其他自定义头部
XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
// 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
Listener ProgressListener `header:"-" url:"-" xml:"-"`
}
// ObjectPutOptions the options of put object
type ObjectPutOptions struct {
*ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
*ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
}
// Put Object请求可以将一个文件(Oject)上传至指定Bucket。
//
// 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
//
// https://www.qcloud.com/document/product/436/7749
func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
if opt != nil && opt.Listener != nil {
totalBytes, err := GetReaderLen(r)
if err != nil {
return nil, err
}
r = TeeReader(r, nil, totalBytes, opt.Listener)
}
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: "/" + encodeURIComponent(name),
method: http.MethodPut,
body: r,
optHeader: opt,
}
resp, err := s.client.send(ctx, &sendOpt)
return resp, err
}
// PutFromFile put object from local file
// Notice that when use this put large file need set non-body of debug req/resp, otherwise will out of memory
func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (*Response, error) {
fd, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer fd.Close()
return s.Put(ctx, name, fd, opt)
}
// ObjectCopyHeaderOptions is the head option of the Copy
type ObjectCopyHeaderOptions struct {
// When use replace directive to update meta infos
CacheControl string `header:"Cache-Control,omitempty" url:"-"`
ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
ContentType string `header:"Content-Type,omitempty" url:"-"`
Expires string `header:"Expires,omitempty" url:"-"`
Expect string `header:"Expect,omitempty" url:"-"`
XCosMetadataDirective string `header:"x-cos-metadata-directive,omitempty" url:"-" xml:"-"`
XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-" xml:"-"`
XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-" xml:"-"`
XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-" xml:"-"`
XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-" xml:"-"`
XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-" xml:"-"`
// 自定义的 x-cos-meta-* header
XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
XCosCopySource string `header:"x-cos-copy-source" url:"-" xml:"-"`
XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
// SSE-C
XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
XCosCopySourceSSECustomerAglo string `header:"x-cos-copy-source-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
XCosCopySourceSSECustomerKey string `header:"x-cos-copy-source-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
XCosCopySourceSSECustomerKeyMD5 string `header:"x-cos-copy-source-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
//兼容其他自定义头部
XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
}
// ObjectCopyOptions is the option of Copy, choose header or body
type ObjectCopyOptions struct {
*ObjectCopyHeaderOptions `header:",omitempty" url:"-" xml:"-"`
*ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
}
// ObjectCopyResult is the result of Copy
type ObjectCopyResult struct {
XMLName xml.Name `xml:"CopyObjectResult"`
ETag string `xml:"ETag,omitempty"`
LastModified string `xml:"LastModified,omitempty"`
}
// Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
// 超过 5G 的文件请使用分块上传 Upload - Copy。在拷贝的过程中,文件元属性和 ACL 可以被修改。
//
// 用户可以通过该接口实现文件移动,文件重命名,修改文件属性和创建副本。
//
// 注意:在跨帐号复制的时候,需要先设置被复制文件的权限为公有读,或者对目标帐号赋权,同帐号则不需要。
//
// https://cloud.tencent.com/document/product/436/10881
func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *ObjectCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
surl := strings.SplitN(sourceURL, "/", 2)
if len(surl) < 2 {
return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
}
var u string
if len(id) == 1 {
u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
} else if len(id) == 0 {
u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
} else {
return nil, nil, errors.New("wrong params")
}
var res ObjectCopyResult
copyOpt := &ObjectCopyOptions{
&ObjectCopyHeaderOptions{},
&ACLHeaderOptions{},
}
if opt != nil {
if opt.ObjectCopyHeaderOptions != nil {
*copyOpt.ObjectCopyHeaderOptions = *opt.ObjectCopyHeaderOptions
}
if opt.ACLHeaderOptions != nil {
*copyOpt.ACLHeaderOptions = *opt.ACLHeaderOptions
}
}
copyOpt.XCosCopySource = u
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: "/" + encodeURIComponent(name),
method: http.MethodPut,
body: nil,
optHeader: copyOpt,
result: &res,
}
resp, err := s.client.send(ctx, &sendOpt)
// If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
if err == nil && resp.StatusCode == 200 {
if res.ETag == "" {
return &res, resp, errors.New("response 200 OK, but body contains an error")
}
}
return &res, resp, err
}
type ObjectDeleteOptions struct {
// SSE-C
XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
//兼容其他自定义头部
XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
VersionId string `header:"-" url:"VersionId,omitempty" xml:"-"`
}
// Delete Object请求可以将一个文件(Object)删除。
//
// https://www.qcloud.com/document/product/436/7743
func (s *ObjectService) Delete(ctx context.Context, name string, opt ...*ObjectDeleteOptions) (*Response, error) {
var optHeader *ObjectDeleteOptions
// When use "" string might call the delete bucket interface
if len(name) == 0 {
return nil, errors.New("empty object name")
}
if len(opt) > 0 {
optHeader = opt[0]
}
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: "/" + encodeURIComponent(name),
method: http.MethodDelete,
optHeader: optHeader,
optQuery: optHeader,
}
resp, err := s.client.send(ctx, &sendOpt)
return resp, err
}
// ObjectHeadOptions is the option of HeadObject
type ObjectHeadOptions struct {
IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
// SSE-C
XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
}
// Head Object请求可以取回对应Object的元数据,Head的权限与Get的权限一致
//
// https://www.qcloud.com/document/product/436/7745
func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOptions, id ...string) (*Response, error) {
var u string
if len(id) == 1 {
u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
} else if len(id) == 0 {
u = "/" + encodeURIComponent(name)
} else {
return nil, errors.New("wrong params")
}
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: u,
method: http.MethodHead,
optHeader: opt,
}
resp, err := s.client.send(ctx, &sendOpt)
if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
}
return resp, err
}
// ObjectOptionsOptions is the option of object options
type ObjectOptionsOptions struct {
Origin string `url:"-" header:"Origin"`
AccessControlRequestMethod string `url:"-" header:"Access-Control-Request-Method"`
AccessControlRequestHeaders string `url:"-" header:"Access-Control-Request-Headers,omitempty"`
}
// Options Object请求实现跨域访问的预请求。即发出一个 OPTIONS 请求给服务器以确认是否可以进行跨域操作。
//
// 当CORS配置不存在时,请求返回403 Forbidden。
//
// https://www.qcloud.com/document/product/436/8288
func (s *ObjectService) Options(ctx context.Context, name string, opt *ObjectOptionsOptions) (*Response, error) {
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: "/" + encodeURIComponent(name),
method: http.MethodOptions,
optHeader: opt,
}
resp, err := s.client.send(ctx, &sendOpt)
return resp, err
}
// CASJobParameters support three way: Standard(in 35 hours), Expedited(quick way, in 15 mins), Bulk(in 5-12 hours_
type CASJobParameters struct {
Tier string `xml:"Tier"`
}
// ObjectRestoreOptions is the option of object restore
type ObjectRestoreOptions struct {
XMLName xml.Name `xml:"RestoreRequest"`
Days int `xml:"Days"`
Tier *CASJobParameters `xml:"CASJobParameters"`
}
// PutRestore API can recover an object of type archived by COS archive.
//
// https://cloud.tencent.com/document/product/436/12633
func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *ObjectRestoreOptions) (*Response, error) {
u := fmt.Sprintf("/%s?restore", encodeURIComponent(name))
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: u,
method: http.MethodPost,
body: opt,
}
resp, err := s.client.send(ctx, &sendOpt)
return resp, err
}
// TODO Append 接口在优化未开放使用
//
// Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
// 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
//
// 文件属性可以在Head Object操作中被查询到,当您发起Head Object请求时,会返回自定义Header『x-cos-object-type』,该Header只有两个枚举值:Normal或者Appendable。
//
// 追加上传建议文件大小1M - 5G。如果position的值和当前Object的长度不致,COS会返回409错误。
// 如果Append一个Normal的Object,COS会返回409 ObjectNotAppendable。
//
// Appendable的文件不可以被复制,不参与版本管理,不参与生命周期管理,不可跨区域复制。
//
// 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
//
// https://www.qcloud.com/document/product/436/7741
// func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
// u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
// if position != 0{
// opt = nil
// }
// sendOpt := sendOptions{
// baseURL: s.client.BaseURL.BucketURL,
// uri: u,
// method: http.MethodPost,
// optHeader: opt,
// body: r,
// }
// resp, err := s.client.send(ctx, &sendOpt)
// return resp, err
// }
// ObjectDeleteMultiOptions is the option of DeleteMulti
type ObjectDeleteMultiOptions struct {
XMLName xml.Name `xml:"Delete" header:"-"`
Quiet bool `xml:"Quiet" header:"-"`
Objects []Object `xml:"Object" header:"-"`
//XCosSha1 string `xml:"-" header:"x-cos-sha1"`
}
// ObjectDeleteMultiResult is the result of DeleteMulti
type ObjectDeleteMultiResult struct {
XMLName xml.Name `xml:"DeleteResult"`
DeletedObjects []Object `xml:"Deleted,omitempty"`
Errors []struct {
Key string `xml:",omitempty"`
Code string `xml:",omitempty"`
Message string `xml:",omitempty"`
VersionId string `xml:",omitempty"`
} `xml:"Error,omitempty"`
}
// DeleteMulti 请求实现批量删除文件,最大支持单次删除1000个文件。
// 对于返回结果,COS提供Verbose和Quiet两种结果模式。Verbose模式将返回每个Object的删除结果;
// Quiet模式只返回报错的Object信息。
// https://www.qcloud.com/document/product/436/8289
func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiOptions) (*ObjectDeleteMultiResult, *Response, error) {
var res ObjectDeleteMultiResult
sendOpt := sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: "/?delete",
method: http.MethodPost,
body: opt,
result: &res,
}
resp, err := s.client.send(ctx, &sendOpt)
return &res, resp, err
}
// Object is the meta info of the object
type Object struct {
Key string `xml:",omitempty"`
ETag string `xml:",omitempty"`
Size int `xml:",omitempty"`
PartNumber int `xml:",omitempty"`
LastModified string `xml:",omitempty"`
StorageClass string `xml:",omitempty"`
Owner *Owner `xml:",omitempty"`
VersionId string `xml:",omitempty"`
}
// MultiUploadOptions is the option of the multiupload,
// ThreadPoolSize default is one
type MultiUploadOptions struct {
OptIni *InitiateMultipartUploadOptions
PartSize int64
ThreadPoolSize int
CheckPoint bool
}
type Chunk struct {
Number int
OffSet int64
Size int64
Done bool
ETag string
}
// jobs
type Jobs struct {
Name string
UploadId string
FilePath string
RetryTimes int
Chunk Chunk
Data io.Reader
Opt *ObjectUploadPartOptions
}
type Results struct {
PartNumber int
Resp *Response
err error
}
func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
for j := range jobs {
fd, err := os.Open(j.FilePath)
var res Results
if err != nil {
res.err = err
res.PartNumber = j.Chunk.Number
res.Resp = nil
results <- &res
}
fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
// UploadPart do not support the chunk trsf, so need to add the content-length
j.Opt.ContentLength = int(j.Chunk.Size)
rt := j.RetryTimes
for {
resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
&io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt)
res.PartNumber = j.Chunk.Number
res.Resp = resp
res.err = err
if err != nil {
rt--
if rt == 0 {
fd.Close()
results <- &res
break
}
continue
}
fd.Close()
results <- &res
break
}
}
}
func DividePart(fileSize int64) (int64, int64) {
partSize := int64(1 * 1024 * 1024)
partNum := fileSize / partSize
for partNum >= 10000 {
partSize = partSize * 2
partNum = fileSize / partSize
}
return partNum, partSize
}
func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int, error) {
if filePath == "" {
return 0, nil, 0, errors.New("filePath invalid")
}
file, err := os.Open(filePath)
if err != nil {
return 0, nil, 0, err
}
defer file.Close()
stat, err := file.Stat()
if err != nil {
return 0, nil, 0, err
}
var partNum int64
if partSize > 0 {
partSize = partSize * 1024 * 1024
partNum = stat.Size() / partSize
if partNum >= 10000 {
return 0, nil, 0, errors.New("Too many parts, out of 10000")
}
} else {
partNum, partSize = DividePart(stat.Size())
}
var chunks []Chunk
var chunk = Chunk{}
for i := int64(0); i < partNum; i++ {
chunk.Number = int(i + 1)
chunk.OffSet = i * partSize
chunk.Size = partSize
chunks = append(chunks, chunk)
}
if stat.Size()%partSize > 0 {
chunk.Number = len(chunks) + 1
chunk.OffSet = int64(len(chunks)) * partSize
chunk.Size = stat.Size() % partSize
chunks = append(chunks, chunk)
partNum++
}
return int64(stat.Size()), chunks, int(partNum), nil
}
func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) {
opt := &ObjectListUploadsOptions{
Prefix: name,
EncodingType: "url",
}
res, _, err := s.ListUploads(ctx, opt)
if err != nil {
return "", err
}
if len(res.Upload) == 0 {
return "", nil
}
last := len(res.Upload) - 1
for last >= 0 {
decodeKey, _ := decodeURIComponent(res.Upload[last].Key)
if decodeKey == name {
return decodeURIComponent(res.Upload[last].UploadID)
}
last = last - 1
}
return "", nil
}
func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error {
var uploadedParts []Object
isTruncated := true
opt := &ObjectListPartsOptions{
EncodingType: "url",
}
for isTruncated {
res, _, err := s.ListParts(ctx, name, UploadID, opt)
if err != nil {
return err
}
if len(res.Parts) > 0 {
uploadedParts = append(uploadedParts, res.Parts...)
}
isTruncated = res.IsTruncated
opt.PartNumberMarker = res.NextPartNumberMarker
}
fd, err := os.Open(filepath)
if err != nil {
return err
}
defer fd.Close()
// 某个分块出错, 重置chunks
ret := func(e error) error {
for i, _ := range chunks {
chunks[i].Done = false
chunks[i].ETag = ""
}
return e
}
for _, part := range uploadedParts {
partNumber := part.PartNumber
if partNumber > partNum {
return ret(errors.New("Part Number is not consistent"))
}
partNumber = partNumber - 1
fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET)
bs, err := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size))
if err != nil {
return ret(err)
}
localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs))
if localMD5 != part.ETag {
return ret(errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber)))
}
chunks[partNumber].Done = true
chunks[partNumber].ETag = part.ETag
}
return nil
}
// MultiUpload/Upload 为高级upload接口,并发分块上传
// 注意该接口目前只供参考
//
// 当 partSize > 0 时,由调用者指定分块大小,否则由 SDK 自动切分,单位为MB
// 由调用者指定分块大小时,请确认分块数量不超过10000
//
func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
return s.Upload(ctx, name, filepath, opt)
}
func (s *ObjectService) Upload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
if opt == nil {
opt = &MultiUploadOptions{}
}
// 1.Get the file chunk
totalBytes, chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
if err != nil {
return nil, nil, err
}
// filesize=0 , use simple upload
if partNum == 0 {
var opt0 *ObjectPutOptions
if opt.OptIni != nil {
opt0 = &ObjectPutOptions{
opt.OptIni.ACLHeaderOptions,
opt.OptIni.ObjectPutHeaderOptions,
}
}
rsp, err := s.PutFromFile(ctx, name, filepath, opt0)
if err != nil {
return nil, rsp, err
}
result := &CompleteMultipartUploadResult{
Key: name,
ETag: rsp.Header.Get("ETag"),
}
return result, rsp, nil
}
var uploadID string
resumableFlag := false
if opt.CheckPoint {
var err error
uploadID, err = s.getResumableUploadID(ctx, name)
if err == nil && uploadID != "" {
err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum)
resumableFlag = (err == nil)
}
}
// 2.Init
optini := opt.OptIni
if !resumableFlag {
res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
if err != nil {
return nil, nil, err
}
uploadID = res.UploadID
}
var poolSize int
if opt.ThreadPoolSize > 0 {
poolSize = opt.ThreadPoolSize
} else {
// Default is one
poolSize = 1
}
chjobs := make(chan *Jobs, 100)
chresults := make(chan *Results, 10000)
optcom := &CompleteMultipartUploadOptions{}
// 3.Start worker
for w := 1; w <= poolSize; w++ {
go worker(s, chjobs, chresults)
}
// progress started event
var listener ProgressListener
var consumedBytes int64
if opt.OptIni != nil {
listener = opt.OptIni.Listener
}
event := newProgressEvent(ProgressStartedEvent, 0, 0, totalBytes)
progressCallback(listener, event)
// 4.Push jobs
for _, chunk := range chunks {
if chunk.Done {
continue
}
partOpt := &ObjectUploadPartOptions{}
if optini != nil && optini.ObjectPutHeaderOptions != nil {
partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
partOpt.XCosTrafficLimit = optini.XCosTrafficLimit
}
job := &Jobs{
Name: name,
RetryTimes: 3,
FilePath: filepath,
UploadId: uploadID,
Chunk: chunk,
Opt: partOpt,
}
chjobs <- job
}
close(chjobs)
// 5.Recv the resp etag to complete
for i := 0; i < partNum; i++ {
if chunks[i].Done {
optcom.Parts = append(optcom.Parts, Object{
PartNumber: chunks[i].Number, ETag: chunks[i].ETag},
)
consumedBytes += chunks[i].Size
event = newProgressEvent(ProgressDataEvent, chunks[i].Size, consumedBytes, totalBytes)
progressCallback(listener, event)
continue
}
res := <-chresults
// Notice one part fail can not get the etag according.
if res.Resp == nil || res.err != nil {
// Some part already fail, can not to get the header inside.
err := fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
event = newProgressEvent(ProgressFailedEvent, 0, consumedBytes, totalBytes, err)
progressCallback(listener, event)
return nil, nil, err
}
// Notice one part fail can not get the etag according.
etag := res.Resp.Header.Get("ETag")
optcom.Parts = append(optcom.Parts, Object{
PartNumber: res.PartNumber, ETag: etag},
)
consumedBytes += chunks[res.PartNumber-1].Size
event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes)
progressCallback(listener, event)
}
sort.Sort(ObjectList(optcom.Parts))
event = newProgressEvent(ProgressCompletedEvent, 0, consumedBytes, totalBytes)
progressCallback(listener, event)
v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
return v, resp, err
}
type ObjectPutTaggingOptions struct {
XMLName xml.Name `xml:"Tagging"`
TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`
}
type ObjectTaggingTag BucketTaggingTag
type ObjectGetTaggingResult ObjectPutTaggingOptions
func (s *ObjectService) PutTagging(ctx context.Context, name string, opt *ObjectPutTaggingOptions, id ...string) (*Response, error) {
var u string
if len(id) == 1 {
u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
} else if len(id) == 0 {
u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
} else {
return nil, errors.New("wrong params")
}
sendOpt := &sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: u,
method: http.MethodPut,
body: opt,
}
resp, err := s.client.send(ctx, sendOpt)
return resp, err
}
func (s *ObjectService) GetTagging(ctx context.Context, name string, id ...string) (*ObjectGetTaggingResult, *Response, error) {
var u string
if len(id) == 1 {
u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
} else if len(id) == 0 {
u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
} else {
return nil, nil, errors.New("wrong params")
}
var res ObjectGetTaggingResult
sendOpt := &sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: u,
method: http.MethodGet,
result: &res,
}
resp, err := s.client.send(ctx, sendOpt)
return &res, resp, err
}
func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...string) (*Response, error) {
var u string
if len(id) == 1 {
u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
} else if len(id) == 0 {
u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
} else {
return nil, errors.New("wrong params")
}
sendOpt := &sendOptions{
baseURL: s.client.BaseURL.BucketURL,
uri: u,
method: http.MethodDelete,
}
resp, err := s.client.send(ctx, sendOpt)
return resp, err
}