You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

937 lines
31 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package cos
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/xml"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "sort"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. // ObjectService 相关 API
  19. type ObjectService service
  20. // ObjectGetOptions is the option of GetObject
  21. type ObjectGetOptions struct {
  22. ResponseContentType string `url:"response-content-type,omitempty" header:"-"`
  23. ResponseContentLanguage string `url:"response-content-language,omitempty" header:"-"`
  24. ResponseExpires string `url:"response-expires,omitempty" header:"-"`
  25. ResponseCacheControl string `url:"response-cache-control,omitempty" header:"-"`
  26. ResponseContentDisposition string `url:"response-content-disposition,omitempty" header:"-"`
  27. ResponseContentEncoding string `url:"response-content-encoding,omitempty" header:"-"`
  28. Range string `url:"-" header:"Range,omitempty"`
  29. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  30. // SSE-C
  31. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  32. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  33. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  34. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  35. // 下载进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
  36. Listener ProgressListener `header:"-" url:"-" xml:"-"`
  37. }
  38. // presignedURLTestingOptions is the opt of presigned url
  39. type presignedURLTestingOptions struct {
  40. authTime *AuthTime
  41. }
  42. // Get Object 请求可以将一个文件(Object)下载至本地。
  43. // 该操作需要对目标 Object 具有读权限或目标 Object 对所有人都开放了读权限(公有读)。
  44. //
  45. // https://www.qcloud.com/document/product/436/7753
  46. func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  47. var u string
  48. if len(id) == 1 {
  49. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  50. } else if len(id) == 0 {
  51. u = "/" + encodeURIComponent(name)
  52. } else {
  53. return nil, errors.New("wrong params")
  54. }
  55. sendOpt := sendOptions{
  56. baseURL: s.client.BaseURL.BucketURL,
  57. uri: u,
  58. method: http.MethodGet,
  59. optQuery: opt,
  60. optHeader: opt,
  61. disableCloseBody: true,
  62. }
  63. resp, err := s.client.send(ctx, &sendOpt)
  64. if opt != nil && opt.Listener != nil {
  65. if err == nil && resp != nil {
  66. if totalBytes, e := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64); e == nil {
  67. resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener)
  68. }
  69. }
  70. }
  71. return resp, err
  72. }
  73. // GetToFile download the object to local file
  74. func (s *ObjectService) GetToFile(ctx context.Context, name, localpath string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  75. resp, err := s.Get(ctx, name, opt, id...)
  76. if err != nil {
  77. return resp, err
  78. }
  79. defer resp.Body.Close()
  80. // If file exist, overwrite it
  81. fd, err := os.OpenFile(localpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  82. if err != nil {
  83. return resp, err
  84. }
  85. _, err = io.Copy(fd, resp.Body)
  86. fd.Close()
  87. if err != nil {
  88. return resp, err
  89. }
  90. return resp, nil
  91. }
  92. // GetPresignedURL get the object presigned to down or upload file by url
  93. func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, ak, sk string, expired time.Duration, opt interface{}) (*url.URL, error) {
  94. sendOpt := sendOptions{
  95. baseURL: s.client.BaseURL.BucketURL,
  96. uri: "/" + encodeURIComponent(name),
  97. method: httpMethod,
  98. optQuery: opt,
  99. optHeader: opt,
  100. }
  101. req, err := s.client.newRequest(ctx, sendOpt.baseURL, sendOpt.uri, sendOpt.method, sendOpt.body, sendOpt.optQuery, sendOpt.optHeader)
  102. if err != nil {
  103. return nil, err
  104. }
  105. var authTime *AuthTime
  106. if opt != nil {
  107. if opt, ok := opt.(*presignedURLTestingOptions); ok {
  108. authTime = opt.authTime
  109. }
  110. }
  111. if authTime == nil {
  112. authTime = NewAuthTime(expired)
  113. }
  114. authorization := newAuthorization(ak, sk, req, authTime)
  115. sign := encodeURIComponent(authorization, []byte{'&', '='})
  116. if req.URL.RawQuery == "" {
  117. req.URL.RawQuery = fmt.Sprintf("%s", sign)
  118. } else {
  119. req.URL.RawQuery = fmt.Sprintf("%s&%s", req.URL.RawQuery, sign)
  120. }
  121. return req.URL, nil
  122. }
  123. // ObjectPutHeaderOptions the options of header of the put object
  124. type ObjectPutHeaderOptions struct {
  125. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  126. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  127. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  128. ContentType string `header:"Content-Type,omitempty" url:"-"`
  129. ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
  130. ContentLength int `header:"Content-Length,omitempty" url:"-"`
  131. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  132. Expect string `header:"Expect,omitempty" url:"-"`
  133. Expires string `header:"Expires,omitempty" url:"-"`
  134. XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
  135. // 自定义的 x-cos-meta-* header
  136. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  137. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-"`
  138. // 可选值: Normal, Appendable
  139. //XCosObjectType string `header:"x-cos-object-type,omitempty" url:"-"`
  140. // Enable Server Side Encryption, Only supported: AES256
  141. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  142. // SSE-C
  143. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  144. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  145. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  146. //兼容其他自定义头部
  147. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  148. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  149. // 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
  150. Listener ProgressListener `header:"-" url:"-" xml:"-"`
  151. }
  152. // ObjectPutOptions the options of put object
  153. type ObjectPutOptions struct {
  154. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  155. *ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  156. }
  157. // Put Object请求可以将一个文件(Oject)上传至指定Bucket。
  158. //
  159. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  160. //
  161. // https://www.qcloud.com/document/product/436/7749
  162. func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  163. if err := CheckReaderLen(r); err != nil {
  164. return nil, err
  165. }
  166. if opt != nil && opt.Listener != nil {
  167. totalBytes, err := GetReaderLen(r)
  168. if err != nil {
  169. return nil, err
  170. }
  171. r = TeeReader(r, nil, totalBytes, opt.Listener)
  172. }
  173. sendOpt := sendOptions{
  174. baseURL: s.client.BaseURL.BucketURL,
  175. uri: "/" + encodeURIComponent(name),
  176. method: http.MethodPut,
  177. body: r,
  178. optHeader: opt,
  179. }
  180. resp, err := s.client.send(ctx, &sendOpt)
  181. return resp, err
  182. }
  183. // PutFromFile put object from local file
  184. // Notice that when use this put large file need set non-body of debug req/resp, otherwise will out of memory
  185. func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (*Response, error) {
  186. fd, err := os.Open(filePath)
  187. if err != nil {
  188. return nil, err
  189. }
  190. defer fd.Close()
  191. return s.Put(ctx, name, fd, opt)
  192. }
  193. // ObjectCopyHeaderOptions is the head option of the Copy
  194. type ObjectCopyHeaderOptions struct {
  195. // When use replace directive to update meta infos
  196. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  197. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  198. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  199. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  200. ContentType string `header:"Content-Type,omitempty" url:"-"`
  201. Expires string `header:"Expires,omitempty" url:"-"`
  202. Expect string `header:"Expect,omitempty" url:"-"`
  203. XCosMetadataDirective string `header:"x-cos-metadata-directive,omitempty" url:"-" xml:"-"`
  204. XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-" xml:"-"`
  205. XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-" xml:"-"`
  206. XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-" xml:"-"`
  207. XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-" xml:"-"`
  208. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-" xml:"-"`
  209. // 自定义的 x-cos-meta-* header
  210. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  211. XCosCopySource string `header:"x-cos-copy-source" url:"-" xml:"-"`
  212. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  213. // SSE-C
  214. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  215. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  216. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  217. XCosCopySourceSSECustomerAglo string `header:"x-cos-copy-source-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  218. XCosCopySourceSSECustomerKey string `header:"x-cos-copy-source-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  219. XCosCopySourceSSECustomerKeyMD5 string `header:"x-cos-copy-source-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  220. //兼容其他自定义头部
  221. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  222. }
  223. // ObjectCopyOptions is the option of Copy, choose header or body
  224. type ObjectCopyOptions struct {
  225. *ObjectCopyHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  226. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  227. }
  228. // ObjectCopyResult is the result of Copy
  229. type ObjectCopyResult struct {
  230. XMLName xml.Name `xml:"CopyObjectResult"`
  231. ETag string `xml:"ETag,omitempty"`
  232. LastModified string `xml:"LastModified,omitempty"`
  233. }
  234. // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
  235. // 超过 5G 的文件请使用分块上传 Upload - Copy。在拷贝的过程中,文件元属性和 ACL 可以被修改。
  236. //
  237. // 用户可以通过该接口实现文件移动,文件重命名,修改文件属性和创建副本。
  238. //
  239. // 注意:在跨帐号复制的时候,需要先设置被复制文件的权限为公有读,或者对目标帐号赋权,同帐号则不需要。
  240. //
  241. // https://cloud.tencent.com/document/product/436/10881
  242. func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *ObjectCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
  243. surl := strings.SplitN(sourceURL, "/", 2)
  244. if len(surl) < 2 {
  245. return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
  246. }
  247. var u string
  248. if len(id) == 1 {
  249. u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
  250. } else if len(id) == 0 {
  251. u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
  252. } else {
  253. return nil, nil, errors.New("wrong params")
  254. }
  255. var res ObjectCopyResult
  256. copyOpt := &ObjectCopyOptions{
  257. &ObjectCopyHeaderOptions{},
  258. &ACLHeaderOptions{},
  259. }
  260. if opt != nil {
  261. if opt.ObjectCopyHeaderOptions != nil {
  262. *copyOpt.ObjectCopyHeaderOptions = *opt.ObjectCopyHeaderOptions
  263. }
  264. if opt.ACLHeaderOptions != nil {
  265. *copyOpt.ACLHeaderOptions = *opt.ACLHeaderOptions
  266. }
  267. }
  268. copyOpt.XCosCopySource = u
  269. sendOpt := sendOptions{
  270. baseURL: s.client.BaseURL.BucketURL,
  271. uri: "/" + encodeURIComponent(name),
  272. method: http.MethodPut,
  273. body: nil,
  274. optHeader: copyOpt,
  275. result: &res,
  276. }
  277. resp, err := s.client.send(ctx, &sendOpt)
  278. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  279. if err == nil && resp.StatusCode == 200 {
  280. if res.ETag == "" {
  281. return &res, resp, errors.New("response 200 OK, but body contains an error")
  282. }
  283. }
  284. return &res, resp, err
  285. }
  286. type ObjectDeleteOptions struct {
  287. // SSE-C
  288. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  289. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  290. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  291. //兼容其他自定义头部
  292. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  293. VersionId string `header:"-" url:"VersionId,omitempty" xml:"-"`
  294. }
  295. // Delete Object请求可以将一个文件(Object)删除。
  296. //
  297. // https://www.qcloud.com/document/product/436/7743
  298. func (s *ObjectService) Delete(ctx context.Context, name string, opt ...*ObjectDeleteOptions) (*Response, error) {
  299. var optHeader *ObjectDeleteOptions
  300. // When use "" string might call the delete bucket interface
  301. if len(name) == 0 {
  302. return nil, errors.New("empty object name")
  303. }
  304. if len(opt) > 0 {
  305. optHeader = opt[0]
  306. }
  307. sendOpt := sendOptions{
  308. baseURL: s.client.BaseURL.BucketURL,
  309. uri: "/" + encodeURIComponent(name),
  310. method: http.MethodDelete,
  311. optHeader: optHeader,
  312. optQuery: optHeader,
  313. }
  314. resp, err := s.client.send(ctx, &sendOpt)
  315. return resp, err
  316. }
  317. // ObjectHeadOptions is the option of HeadObject
  318. type ObjectHeadOptions struct {
  319. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  320. // SSE-C
  321. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  322. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  323. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  324. }
  325. // Head Object请求可以取回对应Object的元数据,Head的权限与Get的权限一致
  326. //
  327. // https://www.qcloud.com/document/product/436/7745
  328. func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOptions, id ...string) (*Response, error) {
  329. var u string
  330. if len(id) == 1 {
  331. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  332. } else if len(id) == 0 {
  333. u = "/" + encodeURIComponent(name)
  334. } else {
  335. return nil, errors.New("wrong params")
  336. }
  337. sendOpt := sendOptions{
  338. baseURL: s.client.BaseURL.BucketURL,
  339. uri: u,
  340. method: http.MethodHead,
  341. optHeader: opt,
  342. }
  343. resp, err := s.client.send(ctx, &sendOpt)
  344. if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
  345. resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
  346. }
  347. return resp, err
  348. }
  349. // ObjectOptionsOptions is the option of object options
  350. type ObjectOptionsOptions struct {
  351. Origin string `url:"-" header:"Origin"`
  352. AccessControlRequestMethod string `url:"-" header:"Access-Control-Request-Method"`
  353. AccessControlRequestHeaders string `url:"-" header:"Access-Control-Request-Headers,omitempty"`
  354. }
  355. // Options Object请求实现跨域访问的预请求。即发出一个 OPTIONS 请求给服务器以确认是否可以进行跨域操作。
  356. //
  357. // 当CORS配置不存在时,请求返回403 Forbidden。
  358. //
  359. // https://www.qcloud.com/document/product/436/8288
  360. func (s *ObjectService) Options(ctx context.Context, name string, opt *ObjectOptionsOptions) (*Response, error) {
  361. sendOpt := sendOptions{
  362. baseURL: s.client.BaseURL.BucketURL,
  363. uri: "/" + encodeURIComponent(name),
  364. method: http.MethodOptions,
  365. optHeader: opt,
  366. }
  367. resp, err := s.client.send(ctx, &sendOpt)
  368. return resp, err
  369. }
  370. // CASJobParameters support three way: Standard(in 35 hours), Expedited(quick way, in 15 mins), Bulk(in 5-12 hours_
  371. type CASJobParameters struct {
  372. Tier string `xml:"Tier"`
  373. }
  374. // ObjectRestoreOptions is the option of object restore
  375. type ObjectRestoreOptions struct {
  376. XMLName xml.Name `xml:"RestoreRequest"`
  377. Days int `xml:"Days"`
  378. Tier *CASJobParameters `xml:"CASJobParameters"`
  379. }
  380. // PutRestore API can recover an object of type archived by COS archive.
  381. //
  382. // https://cloud.tencent.com/document/product/436/12633
  383. func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *ObjectRestoreOptions) (*Response, error) {
  384. u := fmt.Sprintf("/%s?restore", encodeURIComponent(name))
  385. sendOpt := sendOptions{
  386. baseURL: s.client.BaseURL.BucketURL,
  387. uri: u,
  388. method: http.MethodPost,
  389. body: opt,
  390. }
  391. resp, err := s.client.send(ctx, &sendOpt)
  392. return resp, err
  393. }
  394. // TODO Append 接口在优化未开放使用
  395. //
  396. // Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
  397. // 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
  398. //
  399. // 文件属性可以在Head Object操作中被查询到,当您发起Head Object请求时,会返回自定义Header『x-cos-object-type』,该Header只有两个枚举值:Normal或者Appendable。
  400. //
  401. // 追加上传建议文件大小1M - 5G。如果position的值和当前Object的长度不致,COS会返回409错误。
  402. // 如果Append一个Normal的Object,COS会返回409 ObjectNotAppendable。
  403. //
  404. // Appendable的文件不可以被复制,不参与版本管理,不参与生命周期管理,不可跨区域复制。
  405. //
  406. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  407. //
  408. // https://www.qcloud.com/document/product/436/7741
  409. // func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  410. // u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
  411. // if position != 0{
  412. // opt = nil
  413. // }
  414. // sendOpt := sendOptions{
  415. // baseURL: s.client.BaseURL.BucketURL,
  416. // uri: u,
  417. // method: http.MethodPost,
  418. // optHeader: opt,
  419. // body: r,
  420. // }
  421. // resp, err := s.client.send(ctx, &sendOpt)
  422. // return resp, err
  423. // }
  424. // ObjectDeleteMultiOptions is the option of DeleteMulti
  425. type ObjectDeleteMultiOptions struct {
  426. XMLName xml.Name `xml:"Delete" header:"-"`
  427. Quiet bool `xml:"Quiet" header:"-"`
  428. Objects []Object `xml:"Object" header:"-"`
  429. //XCosSha1 string `xml:"-" header:"x-cos-sha1"`
  430. }
  431. // ObjectDeleteMultiResult is the result of DeleteMulti
  432. type ObjectDeleteMultiResult struct {
  433. XMLName xml.Name `xml:"DeleteResult"`
  434. DeletedObjects []Object `xml:"Deleted,omitempty"`
  435. Errors []struct {
  436. Key string `xml:",omitempty"`
  437. Code string `xml:",omitempty"`
  438. Message string `xml:",omitempty"`
  439. VersionId string `xml:",omitempty"`
  440. } `xml:"Error,omitempty"`
  441. }
  442. // DeleteMulti 请求实现批量删除文件,最大支持单次删除1000个文件。
  443. // 对于返回结果,COS提供Verbose和Quiet两种结果模式。Verbose模式将返回每个Object的删除结果;
  444. // Quiet模式只返回报错的Object信息。
  445. // https://www.qcloud.com/document/product/436/8289
  446. func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiOptions) (*ObjectDeleteMultiResult, *Response, error) {
  447. var res ObjectDeleteMultiResult
  448. sendOpt := sendOptions{
  449. baseURL: s.client.BaseURL.BucketURL,
  450. uri: "/?delete",
  451. method: http.MethodPost,
  452. body: opt,
  453. result: &res,
  454. }
  455. resp, err := s.client.send(ctx, &sendOpt)
  456. return &res, resp, err
  457. }
  458. // Object is the meta info of the object
  459. type Object struct {
  460. Key string `xml:",omitempty"`
  461. ETag string `xml:",omitempty"`
  462. Size int `xml:",omitempty"`
  463. PartNumber int `xml:",omitempty"`
  464. LastModified string `xml:",omitempty"`
  465. StorageClass string `xml:",omitempty"`
  466. Owner *Owner `xml:",omitempty"`
  467. VersionId string `xml:",omitempty"`
  468. }
  469. // MultiUploadOptions is the option of the multiupload,
  470. // ThreadPoolSize default is one
  471. type MultiUploadOptions struct {
  472. OptIni *InitiateMultipartUploadOptions
  473. PartSize int64
  474. ThreadPoolSize int
  475. CheckPoint bool
  476. }
  477. type Chunk struct {
  478. Number int
  479. OffSet int64
  480. Size int64
  481. Done bool
  482. ETag string
  483. }
  484. // jobs
  485. type Jobs struct {
  486. Name string
  487. UploadId string
  488. FilePath string
  489. RetryTimes int
  490. Chunk Chunk
  491. Data io.Reader
  492. Opt *ObjectUploadPartOptions
  493. }
  494. type Results struct {
  495. PartNumber int
  496. Resp *Response
  497. err error
  498. }
  499. func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  500. for j := range jobs {
  501. fd, err := os.Open(j.FilePath)
  502. var res Results
  503. if err != nil {
  504. res.err = err
  505. res.PartNumber = j.Chunk.Number
  506. res.Resp = nil
  507. results <- &res
  508. }
  509. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  510. // UploadPart do not support the chunk trsf, so need to add the content-length
  511. j.Opt.ContentLength = int(j.Chunk.Size)
  512. rt := j.RetryTimes
  513. for {
  514. resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
  515. &io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt)
  516. res.PartNumber = j.Chunk.Number
  517. res.Resp = resp
  518. res.err = err
  519. if err != nil {
  520. rt--
  521. if rt == 0 {
  522. fd.Close()
  523. results <- &res
  524. break
  525. }
  526. continue
  527. }
  528. fd.Close()
  529. results <- &res
  530. break
  531. }
  532. }
  533. }
  534. func DividePart(fileSize int64) (int64, int64) {
  535. partSize := int64(1 * 1024 * 1024)
  536. partNum := fileSize / partSize
  537. for partNum >= 10000 {
  538. partSize = partSize * 2
  539. partNum = fileSize / partSize
  540. }
  541. return partNum, partSize
  542. }
  543. func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int, error) {
  544. if filePath == "" {
  545. return 0, nil, 0, errors.New("filePath invalid")
  546. }
  547. file, err := os.Open(filePath)
  548. if err != nil {
  549. return 0, nil, 0, err
  550. }
  551. defer file.Close()
  552. stat, err := file.Stat()
  553. if err != nil {
  554. return 0, nil, 0, err
  555. }
  556. var partNum int64
  557. if partSize > 0 {
  558. partSize = partSize * 1024 * 1024
  559. partNum = stat.Size() / partSize
  560. if partNum >= 10000 {
  561. return 0, nil, 0, errors.New("Too many parts, out of 10000")
  562. }
  563. } else {
  564. partNum, partSize = DividePart(stat.Size())
  565. }
  566. var chunks []Chunk
  567. var chunk = Chunk{}
  568. for i := int64(0); i < partNum; i++ {
  569. chunk.Number = int(i + 1)
  570. chunk.OffSet = i * partSize
  571. chunk.Size = partSize
  572. chunks = append(chunks, chunk)
  573. }
  574. if stat.Size()%partSize > 0 {
  575. chunk.Number = len(chunks) + 1
  576. chunk.OffSet = int64(len(chunks)) * partSize
  577. chunk.Size = stat.Size() % partSize
  578. chunks = append(chunks, chunk)
  579. partNum++
  580. }
  581. return int64(stat.Size()), chunks, int(partNum), nil
  582. }
  583. func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) {
  584. opt := &ObjectListUploadsOptions{
  585. Prefix: name,
  586. EncodingType: "url",
  587. }
  588. res, _, err := s.ListUploads(ctx, opt)
  589. if err != nil {
  590. return "", err
  591. }
  592. if len(res.Upload) == 0 {
  593. return "", nil
  594. }
  595. last := len(res.Upload) - 1
  596. for last >= 0 {
  597. decodeKey, _ := decodeURIComponent(res.Upload[last].Key)
  598. if decodeKey == name {
  599. return decodeURIComponent(res.Upload[last].UploadID)
  600. }
  601. last = last - 1
  602. }
  603. return "", nil
  604. }
  605. func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error {
  606. var uploadedParts []Object
  607. isTruncated := true
  608. opt := &ObjectListPartsOptions{
  609. EncodingType: "url",
  610. }
  611. for isTruncated {
  612. res, _, err := s.ListParts(ctx, name, UploadID, opt)
  613. if err != nil {
  614. return err
  615. }
  616. if len(res.Parts) > 0 {
  617. uploadedParts = append(uploadedParts, res.Parts...)
  618. }
  619. isTruncated = res.IsTruncated
  620. opt.PartNumberMarker = res.NextPartNumberMarker
  621. }
  622. fd, err := os.Open(filepath)
  623. if err != nil {
  624. return err
  625. }
  626. defer fd.Close()
  627. // 某个分块出错, 重置chunks
  628. ret := func(e error) error {
  629. for i, _ := range chunks {
  630. chunks[i].Done = false
  631. chunks[i].ETag = ""
  632. }
  633. return e
  634. }
  635. for _, part := range uploadedParts {
  636. partNumber := part.PartNumber
  637. if partNumber > partNum {
  638. return ret(errors.New("Part Number is not consistent"))
  639. }
  640. partNumber = partNumber - 1
  641. fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET)
  642. bs, err := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size))
  643. if err != nil {
  644. return ret(err)
  645. }
  646. localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs))
  647. if localMD5 != part.ETag {
  648. return ret(errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber)))
  649. }
  650. chunks[partNumber].Done = true
  651. chunks[partNumber].ETag = part.ETag
  652. }
  653. return nil
  654. }
  655. // MultiUpload/Upload 为高级upload接口,并发分块上传
  656. // 注意该接口目前只供参考
  657. //
  658. // 当 partSize > 0 时,由调用者指定分块大小,否则由 SDK 自动切分,单位为MB
  659. // 由调用者指定分块大小时,请确认分块数量不超过10000
  660. //
  661. func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  662. return s.Upload(ctx, name, filepath, opt)
  663. }
  664. func (s *ObjectService) Upload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  665. if opt == nil {
  666. opt = &MultiUploadOptions{}
  667. }
  668. // 1.Get the file chunk
  669. totalBytes, chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
  670. if err != nil {
  671. return nil, nil, err
  672. }
  673. // filesize=0 , use simple upload
  674. if partNum == 0 {
  675. var opt0 *ObjectPutOptions
  676. if opt.OptIni != nil {
  677. opt0 = &ObjectPutOptions{
  678. opt.OptIni.ACLHeaderOptions,
  679. opt.OptIni.ObjectPutHeaderOptions,
  680. }
  681. }
  682. rsp, err := s.PutFromFile(ctx, name, filepath, opt0)
  683. if err != nil {
  684. return nil, rsp, err
  685. }
  686. result := &CompleteMultipartUploadResult{
  687. Key: name,
  688. ETag: rsp.Header.Get("ETag"),
  689. }
  690. return result, rsp, nil
  691. }
  692. var uploadID string
  693. resumableFlag := false
  694. if opt.CheckPoint {
  695. var err error
  696. uploadID, err = s.getResumableUploadID(ctx, name)
  697. if err == nil && uploadID != "" {
  698. err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum)
  699. resumableFlag = (err == nil)
  700. }
  701. }
  702. // 2.Init
  703. optini := opt.OptIni
  704. if !resumableFlag {
  705. res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
  706. if err != nil {
  707. return nil, nil, err
  708. }
  709. uploadID = res.UploadID
  710. }
  711. var poolSize int
  712. if opt.ThreadPoolSize > 0 {
  713. poolSize = opt.ThreadPoolSize
  714. } else {
  715. // Default is one
  716. poolSize = 1
  717. }
  718. chjobs := make(chan *Jobs, 100)
  719. chresults := make(chan *Results, 10000)
  720. optcom := &CompleteMultipartUploadOptions{}
  721. // 3.Start worker
  722. for w := 1; w <= poolSize; w++ {
  723. go worker(s, chjobs, chresults)
  724. }
  725. // progress started event
  726. var listener ProgressListener
  727. var consumedBytes int64
  728. if opt.OptIni != nil {
  729. listener = opt.OptIni.Listener
  730. }
  731. event := newProgressEvent(ProgressStartedEvent, 0, 0, totalBytes)
  732. progressCallback(listener, event)
  733. // 4.Push jobs
  734. go func() {
  735. for _, chunk := range chunks {
  736. if chunk.Done {
  737. continue
  738. }
  739. partOpt := &ObjectUploadPartOptions{}
  740. if optini != nil && optini.ObjectPutHeaderOptions != nil {
  741. partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
  742. partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
  743. partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
  744. partOpt.XCosTrafficLimit = optini.XCosTrafficLimit
  745. }
  746. job := &Jobs{
  747. Name: name,
  748. RetryTimes: 3,
  749. FilePath: filepath,
  750. UploadId: uploadID,
  751. Chunk: chunk,
  752. Opt: partOpt,
  753. }
  754. chjobs <- job
  755. }
  756. close(chjobs)
  757. }()
  758. // 5.Recv the resp etag to complete
  759. for i := 0; i < partNum; i++ {
  760. if chunks[i].Done {
  761. optcom.Parts = append(optcom.Parts, Object{
  762. PartNumber: chunks[i].Number, ETag: chunks[i].ETag},
  763. )
  764. consumedBytes += chunks[i].Size
  765. event = newProgressEvent(ProgressDataEvent, chunks[i].Size, consumedBytes, totalBytes)
  766. progressCallback(listener, event)
  767. continue
  768. }
  769. res := <-chresults
  770. // Notice one part fail can not get the etag according.
  771. if res.Resp == nil || res.err != nil {
  772. // Some part already fail, can not to get the header inside.
  773. err := fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
  774. event = newProgressEvent(ProgressFailedEvent, 0, consumedBytes, totalBytes, err)
  775. progressCallback(listener, event)
  776. return nil, nil, err
  777. }
  778. // Notice one part fail can not get the etag according.
  779. etag := res.Resp.Header.Get("ETag")
  780. optcom.Parts = append(optcom.Parts, Object{
  781. PartNumber: res.PartNumber, ETag: etag},
  782. )
  783. consumedBytes += chunks[res.PartNumber-1].Size
  784. event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes)
  785. progressCallback(listener, event)
  786. }
  787. close(chresults)
  788. sort.Sort(ObjectList(optcom.Parts))
  789. event = newProgressEvent(ProgressCompletedEvent, 0, consumedBytes, totalBytes)
  790. progressCallback(listener, event)
  791. v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
  792. return v, resp, err
  793. }
  794. type ObjectPutTaggingOptions struct {
  795. XMLName xml.Name `xml:"Tagging"`
  796. TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`
  797. }
  798. type ObjectTaggingTag BucketTaggingTag
  799. type ObjectGetTaggingResult ObjectPutTaggingOptions
  800. func (s *ObjectService) PutTagging(ctx context.Context, name string, opt *ObjectPutTaggingOptions, id ...string) (*Response, error) {
  801. var u string
  802. if len(id) == 1 {
  803. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  804. } else if len(id) == 0 {
  805. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  806. } else {
  807. return nil, errors.New("wrong params")
  808. }
  809. sendOpt := &sendOptions{
  810. baseURL: s.client.BaseURL.BucketURL,
  811. uri: u,
  812. method: http.MethodPut,
  813. body: opt,
  814. }
  815. resp, err := s.client.send(ctx, sendOpt)
  816. return resp, err
  817. }
  818. func (s *ObjectService) GetTagging(ctx context.Context, name string, id ...string) (*ObjectGetTaggingResult, *Response, error) {
  819. var u string
  820. if len(id) == 1 {
  821. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  822. } else if len(id) == 0 {
  823. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  824. } else {
  825. return nil, nil, errors.New("wrong params")
  826. }
  827. var res ObjectGetTaggingResult
  828. sendOpt := &sendOptions{
  829. baseURL: s.client.BaseURL.BucketURL,
  830. uri: u,
  831. method: http.MethodGet,
  832. result: &res,
  833. }
  834. resp, err := s.client.send(ctx, sendOpt)
  835. return &res, resp, err
  836. }
  837. func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...string) (*Response, error) {
  838. var u string
  839. if len(id) == 1 {
  840. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  841. } else if len(id) == 0 {
  842. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  843. } else {
  844. return nil, errors.New("wrong params")
  845. }
  846. sendOpt := &sendOptions{
  847. baseURL: s.client.BaseURL.BucketURL,
  848. uri: u,
  849. method: http.MethodDelete,
  850. }
  851. resp, err := s.client.send(ctx, sendOpt)
  852. return resp, err
  853. }