You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

979 lines
32 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package cos
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/xml"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "sort"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. // ObjectService 相关 API
  19. type ObjectService service
  20. // ObjectGetOptions is the option of GetObject
  21. type ObjectGetOptions struct {
  22. ResponseContentType string `url:"response-content-type,omitempty" header:"-"`
  23. ResponseContentLanguage string `url:"response-content-language,omitempty" header:"-"`
  24. ResponseExpires string `url:"response-expires,omitempty" header:"-"`
  25. ResponseCacheControl string `url:"response-cache-control,omitempty" header:"-"`
  26. ResponseContentDisposition string `url:"response-content-disposition,omitempty" header:"-"`
  27. ResponseContentEncoding string `url:"response-content-encoding,omitempty" header:"-"`
  28. Range string `url:"-" header:"Range,omitempty"`
  29. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  30. // SSE-C
  31. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  32. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  33. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  34. //兼容其他自定义头部
  35. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  36. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  37. // 下载进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
  38. Listener ProgressListener `header:"-" url:"-" xml:"-"`
  39. }
  40. // presignedURLTestingOptions is the opt of presigned url
  41. type presignedURLTestingOptions struct {
  42. authTime *AuthTime
  43. }
  44. // Get Object 请求可以将一个文件(Object)下载至本地。
  45. // 该操作需要对目标 Object 具有读权限或目标 Object 对所有人都开放了读权限(公有读)。
  46. //
  47. // https://www.qcloud.com/document/product/436/7753
  48. func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  49. var u string
  50. if len(id) == 1 {
  51. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  52. } else if len(id) == 0 {
  53. u = "/" + encodeURIComponent(name)
  54. } else {
  55. return nil, errors.New("wrong params")
  56. }
  57. sendOpt := sendOptions{
  58. baseURL: s.client.BaseURL.BucketURL,
  59. uri: u,
  60. method: http.MethodGet,
  61. optQuery: opt,
  62. optHeader: opt,
  63. disableCloseBody: true,
  64. }
  65. resp, err := s.client.send(ctx, &sendOpt)
  66. if opt != nil && opt.Listener != nil {
  67. if err == nil && resp != nil {
  68. if totalBytes, e := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64); e == nil {
  69. resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener)
  70. }
  71. }
  72. }
  73. return resp, err
  74. }
  75. // GetToFile download the object to local file
  76. func (s *ObjectService) GetToFile(ctx context.Context, name, localpath string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  77. resp, err := s.Get(ctx, name, opt, id...)
  78. if err != nil {
  79. return resp, err
  80. }
  81. defer resp.Body.Close()
  82. // If file exist, overwrite it
  83. fd, err := os.OpenFile(localpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  84. if err != nil {
  85. return resp, err
  86. }
  87. _, err = io.Copy(fd, resp.Body)
  88. fd.Close()
  89. if err != nil {
  90. return resp, err
  91. }
  92. return resp, nil
  93. }
  94. // GetPresignedURL get the object presigned to down or upload file by url
  95. func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, ak, sk string, expired time.Duration, opt interface{}) (*url.URL, error) {
  96. sendOpt := sendOptions{
  97. baseURL: s.client.BaseURL.BucketURL,
  98. uri: "/" + encodeURIComponent(name),
  99. method: httpMethod,
  100. optQuery: opt,
  101. optHeader: opt,
  102. }
  103. req, err := s.client.newRequest(ctx, sendOpt.baseURL, sendOpt.uri, sendOpt.method, sendOpt.body, sendOpt.optQuery, sendOpt.optHeader)
  104. if err != nil {
  105. return nil, err
  106. }
  107. var authTime *AuthTime
  108. if opt != nil {
  109. if opt, ok := opt.(*presignedURLTestingOptions); ok {
  110. authTime = opt.authTime
  111. }
  112. }
  113. if authTime == nil {
  114. authTime = NewAuthTime(expired)
  115. }
  116. authorization := newAuthorization(ak, sk, req, authTime)
  117. sign := encodeURIComponent(authorization, []byte{'&', '='})
  118. if req.URL.RawQuery == "" {
  119. req.URL.RawQuery = fmt.Sprintf("%s", sign)
  120. } else {
  121. req.URL.RawQuery = fmt.Sprintf("%s&%s", req.URL.RawQuery, sign)
  122. }
  123. return req.URL, nil
  124. }
  125. // ObjectPutHeaderOptions the options of header of the put object
  126. type ObjectPutHeaderOptions struct {
  127. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  128. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  129. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  130. ContentType string `header:"Content-Type,omitempty" url:"-"`
  131. ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
  132. ContentLength int `header:"Content-Length,omitempty" url:"-"`
  133. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  134. Expect string `header:"Expect,omitempty" url:"-"`
  135. Expires string `header:"Expires,omitempty" url:"-"`
  136. XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
  137. // 自定义的 x-cos-meta-* header
  138. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  139. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-"`
  140. // 可选值: Normal, Appendable
  141. //XCosObjectType string `header:"x-cos-object-type,omitempty" url:"-"`
  142. // Enable Server Side Encryption, Only supported: AES256
  143. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  144. // SSE-C
  145. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  146. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  147. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  148. //兼容其他自定义头部
  149. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  150. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  151. // 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
  152. Listener ProgressListener `header:"-" url:"-" xml:"-"`
  153. }
  154. // ObjectPutOptions the options of put object
  155. type ObjectPutOptions struct {
  156. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  157. *ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  158. }
  159. // Put Object请求可以将一个文件(Oject)上传至指定Bucket。
  160. //
  161. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  162. //
  163. // https://www.qcloud.com/document/product/436/7749
  164. func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  165. if err := CheckReaderLen(r); err != nil {
  166. return nil, err
  167. }
  168. if opt != nil && opt.Listener != nil {
  169. totalBytes, err := GetReaderLen(r)
  170. if err != nil {
  171. return nil, err
  172. }
  173. r = TeeReader(r, nil, totalBytes, opt.Listener)
  174. }
  175. sendOpt := sendOptions{
  176. baseURL: s.client.BaseURL.BucketURL,
  177. uri: "/" + encodeURIComponent(name),
  178. method: http.MethodPut,
  179. body: r,
  180. optHeader: opt,
  181. }
  182. resp, err := s.client.send(ctx, &sendOpt)
  183. return resp, err
  184. }
  185. // PutFromFile put object from local file
  186. // Notice that when use this put large file need set non-body of debug req/resp, otherwise will out of memory
  187. func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (*Response, error) {
  188. fd, err := os.Open(filePath)
  189. if err != nil {
  190. return nil, err
  191. }
  192. defer fd.Close()
  193. return s.Put(ctx, name, fd, opt)
  194. }
  195. // ObjectCopyHeaderOptions is the head option of the Copy
  196. type ObjectCopyHeaderOptions struct {
  197. // When use replace directive to update meta infos
  198. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  199. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  200. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  201. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  202. ContentType string `header:"Content-Type,omitempty" url:"-"`
  203. Expires string `header:"Expires,omitempty" url:"-"`
  204. Expect string `header:"Expect,omitempty" url:"-"`
  205. XCosMetadataDirective string `header:"x-cos-metadata-directive,omitempty" url:"-" xml:"-"`
  206. XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-" xml:"-"`
  207. XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-" xml:"-"`
  208. XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-" xml:"-"`
  209. XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-" xml:"-"`
  210. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-" xml:"-"`
  211. // 自定义的 x-cos-meta-* header
  212. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  213. XCosCopySource string `header:"x-cos-copy-source" url:"-" xml:"-"`
  214. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  215. // SSE-C
  216. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  217. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  218. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  219. XCosCopySourceSSECustomerAglo string `header:"x-cos-copy-source-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  220. XCosCopySourceSSECustomerKey string `header:"x-cos-copy-source-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  221. XCosCopySourceSSECustomerKeyMD5 string `header:"x-cos-copy-source-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  222. //兼容其他自定义头部
  223. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  224. }
  225. // ObjectCopyOptions is the option of Copy, choose header or body
  226. type ObjectCopyOptions struct {
  227. *ObjectCopyHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  228. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  229. }
  230. // ObjectCopyResult is the result of Copy
  231. type ObjectCopyResult struct {
  232. XMLName xml.Name `xml:"CopyObjectResult"`
  233. ETag string `xml:"ETag,omitempty"`
  234. LastModified string `xml:"LastModified,omitempty"`
  235. CRC64 string `xml:"CRC64,omitempty"`
  236. VersionId string `xml:"VersionId,omitempty"`
  237. }
  238. // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
  239. // 超过 5G 的文件请使用分块上传 Upload - Copy。在拷贝的过程中,文件元属性和 ACL 可以被修改。
  240. //
  241. // 用户可以通过该接口实现文件移动,文件重命名,修改文件属性和创建副本。
  242. //
  243. // 注意:在跨帐号复制的时候,需要先设置被复制文件的权限为公有读,或者对目标帐号赋权,同帐号则不需要。
  244. //
  245. // https://cloud.tencent.com/document/product/436/10881
  246. func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *ObjectCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
  247. surl := strings.SplitN(sourceURL, "/", 2)
  248. if len(surl) < 2 {
  249. return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
  250. }
  251. var u string
  252. if len(id) == 1 {
  253. u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
  254. } else if len(id) == 0 {
  255. u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
  256. } else {
  257. return nil, nil, errors.New("wrong params")
  258. }
  259. var res ObjectCopyResult
  260. copyOpt := &ObjectCopyOptions{
  261. &ObjectCopyHeaderOptions{},
  262. &ACLHeaderOptions{},
  263. }
  264. if opt != nil {
  265. if opt.ObjectCopyHeaderOptions != nil {
  266. *copyOpt.ObjectCopyHeaderOptions = *opt.ObjectCopyHeaderOptions
  267. }
  268. if opt.ACLHeaderOptions != nil {
  269. *copyOpt.ACLHeaderOptions = *opt.ACLHeaderOptions
  270. }
  271. }
  272. copyOpt.XCosCopySource = u
  273. sendOpt := sendOptions{
  274. baseURL: s.client.BaseURL.BucketURL,
  275. uri: "/" + encodeURIComponent(name),
  276. method: http.MethodPut,
  277. body: nil,
  278. optHeader: copyOpt,
  279. result: &res,
  280. }
  281. resp, err := s.client.send(ctx, &sendOpt)
  282. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  283. if err == nil && resp.StatusCode == 200 {
  284. if res.ETag == "" {
  285. return &res, resp, errors.New("response 200 OK, but body contains an error")
  286. }
  287. }
  288. return &res, resp, err
  289. }
  290. type ObjectDeleteOptions struct {
  291. // SSE-C
  292. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  293. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  294. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  295. //兼容其他自定义头部
  296. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  297. VersionId string `header:"-" url:"VersionId,omitempty" xml:"-"`
  298. }
  299. // Delete Object请求可以将一个文件(Object)删除。
  300. //
  301. // https://www.qcloud.com/document/product/436/7743
  302. func (s *ObjectService) Delete(ctx context.Context, name string, opt ...*ObjectDeleteOptions) (*Response, error) {
  303. var optHeader *ObjectDeleteOptions
  304. // When use "" string might call the delete bucket interface
  305. if len(name) == 0 {
  306. return nil, errors.New("empty object name")
  307. }
  308. if len(opt) > 0 {
  309. optHeader = opt[0]
  310. }
  311. sendOpt := sendOptions{
  312. baseURL: s.client.BaseURL.BucketURL,
  313. uri: "/" + encodeURIComponent(name),
  314. method: http.MethodDelete,
  315. optHeader: optHeader,
  316. optQuery: optHeader,
  317. }
  318. resp, err := s.client.send(ctx, &sendOpt)
  319. return resp, err
  320. }
  321. // ObjectHeadOptions is the option of HeadObject
  322. type ObjectHeadOptions struct {
  323. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  324. // SSE-C
  325. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  326. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  327. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  328. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  329. }
  330. // Head Object请求可以取回对应Object的元数据,Head的权限与Get的权限一致
  331. //
  332. // https://www.qcloud.com/document/product/436/7745
  333. func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOptions, id ...string) (*Response, error) {
  334. var u string
  335. if len(id) == 1 {
  336. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  337. } else if len(id) == 0 {
  338. u = "/" + encodeURIComponent(name)
  339. } else {
  340. return nil, errors.New("wrong params")
  341. }
  342. sendOpt := sendOptions{
  343. baseURL: s.client.BaseURL.BucketURL,
  344. uri: u,
  345. method: http.MethodHead,
  346. optHeader: opt,
  347. }
  348. resp, err := s.client.send(ctx, &sendOpt)
  349. if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
  350. resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
  351. }
  352. return resp, err
  353. }
  354. // ObjectOptionsOptions is the option of object options
  355. type ObjectOptionsOptions struct {
  356. Origin string `url:"-" header:"Origin"`
  357. AccessControlRequestMethod string `url:"-" header:"Access-Control-Request-Method"`
  358. AccessControlRequestHeaders string `url:"-" header:"Access-Control-Request-Headers,omitempty"`
  359. }
  360. // Options Object请求实现跨域访问的预请求。即发出一个 OPTIONS 请求给服务器以确认是否可以进行跨域操作。
  361. //
  362. // 当CORS配置不存在时,请求返回403 Forbidden。
  363. //
  364. // https://www.qcloud.com/document/product/436/8288
  365. func (s *ObjectService) Options(ctx context.Context, name string, opt *ObjectOptionsOptions) (*Response, error) {
  366. sendOpt := sendOptions{
  367. baseURL: s.client.BaseURL.BucketURL,
  368. uri: "/" + encodeURIComponent(name),
  369. method: http.MethodOptions,
  370. optHeader: opt,
  371. }
  372. resp, err := s.client.send(ctx, &sendOpt)
  373. return resp, err
  374. }
  375. // CASJobParameters support three way: Standard(in 35 hours), Expedited(quick way, in 15 mins), Bulk(in 5-12 hours_
  376. type CASJobParameters struct {
  377. Tier string `xml:"Tier"`
  378. }
  379. // ObjectRestoreOptions is the option of object restore
  380. type ObjectRestoreOptions struct {
  381. XMLName xml.Name `xml:"RestoreRequest"`
  382. Days int `xml:"Days"`
  383. Tier *CASJobParameters `xml:"CASJobParameters"`
  384. }
  385. // PutRestore API can recover an object of type archived by COS archive.
  386. //
  387. // https://cloud.tencent.com/document/product/436/12633
  388. func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *ObjectRestoreOptions) (*Response, error) {
  389. u := fmt.Sprintf("/%s?restore", encodeURIComponent(name))
  390. sendOpt := sendOptions{
  391. baseURL: s.client.BaseURL.BucketURL,
  392. uri: u,
  393. method: http.MethodPost,
  394. body: opt,
  395. }
  396. resp, err := s.client.send(ctx, &sendOpt)
  397. return resp, err
  398. }
  399. // TODO Append 接口在优化未开放使用
  400. //
  401. // Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
  402. // 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
  403. //
  404. // 文件属性可以在Head Object操作中被查询到,当您发起Head Object请求时,会返回自定义Header『x-cos-object-type』,该Header只有两个枚举值:Normal或者Appendable。
  405. //
  406. // 追加上传建议文件大小1M - 5G。如果position的值和当前Object的长度不致,COS会返回409错误。
  407. // 如果Append一个Normal的Object,COS会返回409 ObjectNotAppendable。
  408. //
  409. // Appendable的文件不可以被复制,不参与版本管理,不参与生命周期管理,不可跨区域复制。
  410. //
  411. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  412. //
  413. // https://www.qcloud.com/document/product/436/7741
  414. // func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  415. // u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
  416. // if position != 0{
  417. // opt = nil
  418. // }
  419. // sendOpt := sendOptions{
  420. // baseURL: s.client.BaseURL.BucketURL,
  421. // uri: u,
  422. // method: http.MethodPost,
  423. // optHeader: opt,
  424. // body: r,
  425. // }
  426. // resp, err := s.client.send(ctx, &sendOpt)
  427. // return resp, err
  428. // }
  429. // ObjectDeleteMultiOptions is the option of DeleteMulti
  430. type ObjectDeleteMultiOptions struct {
  431. XMLName xml.Name `xml:"Delete" header:"-"`
  432. Quiet bool `xml:"Quiet" header:"-"`
  433. Objects []Object `xml:"Object" header:"-"`
  434. //XCosSha1 string `xml:"-" header:"x-cos-sha1"`
  435. }
  436. // ObjectDeleteMultiResult is the result of DeleteMulti
  437. type ObjectDeleteMultiResult struct {
  438. XMLName xml.Name `xml:"DeleteResult"`
  439. DeletedObjects []Object `xml:"Deleted,omitempty"`
  440. Errors []struct {
  441. Key string `xml:",omitempty"`
  442. Code string `xml:",omitempty"`
  443. Message string `xml:",omitempty"`
  444. VersionId string `xml:",omitempty"`
  445. } `xml:"Error,omitempty"`
  446. }
  447. // DeleteMulti 请求实现批量删除文件,最大支持单次删除1000个文件。
  448. // 对于返回结果,COS提供Verbose和Quiet两种结果模式。Verbose模式将返回每个Object的删除结果;
  449. // Quiet模式只返回报错的Object信息。
  450. // https://www.qcloud.com/document/product/436/8289
  451. func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiOptions) (*ObjectDeleteMultiResult, *Response, error) {
  452. var res ObjectDeleteMultiResult
  453. sendOpt := sendOptions{
  454. baseURL: s.client.BaseURL.BucketURL,
  455. uri: "/?delete",
  456. method: http.MethodPost,
  457. body: opt,
  458. result: &res,
  459. }
  460. resp, err := s.client.send(ctx, &sendOpt)
  461. return &res, resp, err
  462. }
  463. // Object is the meta info of the object
  464. type Object struct {
  465. Key string `xml:",omitempty"`
  466. ETag string `xml:",omitempty"`
  467. Size int64 `xml:",omitempty"`
  468. PartNumber int `xml:",omitempty"`
  469. LastModified string `xml:",omitempty"`
  470. StorageClass string `xml:",omitempty"`
  471. Owner *Owner `xml:",omitempty"`
  472. VersionId string `xml:",omitempty"`
  473. }
  474. // MultiUploadOptions is the option of the multiupload,
  475. // ThreadPoolSize default is one
  476. type MultiUploadOptions struct {
  477. OptIni *InitiateMultipartUploadOptions
  478. PartSize int64
  479. ThreadPoolSize int
  480. CheckPoint bool
  481. EnableVerification bool
  482. }
  483. type Chunk struct {
  484. Number int
  485. OffSet int64
  486. Size int64
  487. Done bool
  488. ETag string
  489. }
  490. // jobs
  491. type Jobs struct {
  492. Name string
  493. UploadId string
  494. FilePath string
  495. RetryTimes int
  496. Chunk Chunk
  497. Data io.Reader
  498. Opt *ObjectUploadPartOptions
  499. }
  500. type Results struct {
  501. PartNumber int
  502. Resp *Response
  503. err error
  504. }
  505. func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  506. for j := range jobs {
  507. fd, err := os.Open(j.FilePath)
  508. var res Results
  509. if err != nil {
  510. res.err = err
  511. res.PartNumber = j.Chunk.Number
  512. res.Resp = nil
  513. results <- &res
  514. }
  515. // UploadPart do not support the chunk trsf, so need to add the content-length
  516. j.Opt.ContentLength = int(j.Chunk.Size)
  517. rt := j.RetryTimes
  518. for {
  519. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  520. resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
  521. &io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt)
  522. res.PartNumber = j.Chunk.Number
  523. res.Resp = resp
  524. res.err = err
  525. if err != nil {
  526. rt--
  527. if rt == 0 {
  528. fd.Close()
  529. results <- &res
  530. break
  531. }
  532. continue
  533. }
  534. fd.Close()
  535. results <- &res
  536. break
  537. }
  538. }
  539. }
  540. func DividePart(fileSize int64, last int) (int64, int64) {
  541. partSize := int64(last * 1024 * 1024)
  542. partNum := fileSize / partSize
  543. for partNum >= 10000 {
  544. partSize = partSize * 2
  545. partNum = fileSize / partSize
  546. }
  547. return partNum, partSize
  548. }
  549. func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int, error) {
  550. if filePath == "" {
  551. return 0, nil, 0, errors.New("filePath invalid")
  552. }
  553. file, err := os.Open(filePath)
  554. if err != nil {
  555. return 0, nil, 0, err
  556. }
  557. defer file.Close()
  558. stat, err := file.Stat()
  559. if err != nil {
  560. return 0, nil, 0, err
  561. }
  562. var partNum int64
  563. if partSize > 0 {
  564. partSize = partSize * 1024 * 1024
  565. partNum = stat.Size() / partSize
  566. if partNum >= 10000 {
  567. return 0, nil, 0, errors.New("Too many parts, out of 10000")
  568. }
  569. } else {
  570. partNum, partSize = DividePart(stat.Size(), 64)
  571. }
  572. var chunks []Chunk
  573. var chunk = Chunk{}
  574. for i := int64(0); i < partNum; i++ {
  575. chunk.Number = int(i + 1)
  576. chunk.OffSet = i * partSize
  577. chunk.Size = partSize
  578. chunks = append(chunks, chunk)
  579. }
  580. if stat.Size()%partSize > 0 {
  581. chunk.Number = len(chunks) + 1
  582. chunk.OffSet = int64(len(chunks)) * partSize
  583. chunk.Size = stat.Size() % partSize
  584. chunks = append(chunks, chunk)
  585. partNum++
  586. }
  587. return int64(stat.Size()), chunks, int(partNum), nil
  588. }
  589. func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) {
  590. opt := &ObjectListUploadsOptions{
  591. Prefix: name,
  592. EncodingType: "url",
  593. }
  594. res, _, err := s.ListUploads(ctx, opt)
  595. if err != nil {
  596. return "", err
  597. }
  598. if len(res.Upload) == 0 {
  599. return "", nil
  600. }
  601. last := len(res.Upload) - 1
  602. for last >= 0 {
  603. decodeKey, _ := decodeURIComponent(res.Upload[last].Key)
  604. if decodeKey == name {
  605. return decodeURIComponent(res.Upload[last].UploadID)
  606. }
  607. last = last - 1
  608. }
  609. return "", nil
  610. }
  611. func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error {
  612. var uploadedParts []Object
  613. isTruncated := true
  614. opt := &ObjectListPartsOptions{
  615. EncodingType: "url",
  616. }
  617. for isTruncated {
  618. res, _, err := s.ListParts(ctx, name, UploadID, opt)
  619. if err != nil {
  620. return err
  621. }
  622. if len(res.Parts) > 0 {
  623. uploadedParts = append(uploadedParts, res.Parts...)
  624. }
  625. isTruncated = res.IsTruncated
  626. opt.PartNumberMarker = res.NextPartNumberMarker
  627. }
  628. fd, err := os.Open(filepath)
  629. if err != nil {
  630. return err
  631. }
  632. defer fd.Close()
  633. // 某个分块出错, 重置chunks
  634. ret := func(e error) error {
  635. for i, _ := range chunks {
  636. chunks[i].Done = false
  637. chunks[i].ETag = ""
  638. }
  639. return e
  640. }
  641. for _, part := range uploadedParts {
  642. partNumber := part.PartNumber
  643. if partNumber > partNum {
  644. return ret(errors.New("Part Number is not consistent"))
  645. }
  646. partNumber = partNumber - 1
  647. fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET)
  648. bs, err := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size))
  649. if err != nil {
  650. return ret(err)
  651. }
  652. localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs))
  653. if localMD5 != part.ETag {
  654. return ret(errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber)))
  655. }
  656. chunks[partNumber].Done = true
  657. chunks[partNumber].ETag = part.ETag
  658. }
  659. return nil
  660. }
  661. // MultiUpload/Upload 为高级upload接口,并发分块上传
  662. // 注意该接口目前只供参考
  663. //
  664. // 当 partSize > 0 时,由调用者指定分块大小,否则由 SDK 自动切分,单位为MB
  665. // 由调用者指定分块大小时,请确认分块数量不超过10000
  666. //
  667. func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  668. return s.Upload(ctx, name, filepath, opt)
  669. }
  670. func (s *ObjectService) Upload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  671. if opt == nil {
  672. opt = &MultiUploadOptions{}
  673. }
  674. var localcrc uint64
  675. // 1.Get the file chunk
  676. totalBytes, chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
  677. if err != nil {
  678. return nil, nil, err
  679. }
  680. // 校验
  681. if s.client.Conf.EnableCRC {
  682. fd, err := os.Open(filepath)
  683. if err != nil {
  684. return nil, nil, err
  685. }
  686. defer fd.Close()
  687. localcrc, err = calCRC64(fd)
  688. }
  689. // filesize=0 , use simple upload
  690. if partNum == 0 {
  691. var opt0 *ObjectPutOptions
  692. if opt.OptIni != nil {
  693. opt0 = &ObjectPutOptions{
  694. opt.OptIni.ACLHeaderOptions,
  695. opt.OptIni.ObjectPutHeaderOptions,
  696. }
  697. }
  698. rsp, err := s.PutFromFile(ctx, name, filepath, opt0)
  699. if err != nil {
  700. return nil, rsp, err
  701. }
  702. result := &CompleteMultipartUploadResult{
  703. Key: name,
  704. ETag: rsp.Header.Get("ETag"),
  705. }
  706. if rsp != nil && s.client.Conf.EnableCRC {
  707. scoscrc := rsp.Header.Get("x-cos-hash-crc64ecma")
  708. icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64)
  709. if icoscrc != localcrc {
  710. return result, rsp, fmt.Errorf("verification failed, want:%v, return:%v", localcrc, icoscrc)
  711. }
  712. }
  713. return result, rsp, nil
  714. }
  715. var uploadID string
  716. resumableFlag := false
  717. if opt.CheckPoint {
  718. var err error
  719. uploadID, err = s.getResumableUploadID(ctx, name)
  720. if err == nil && uploadID != "" {
  721. err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum)
  722. resumableFlag = (err == nil)
  723. }
  724. }
  725. // 2.Init
  726. optini := opt.OptIni
  727. if !resumableFlag {
  728. res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
  729. if err != nil {
  730. return nil, nil, err
  731. }
  732. uploadID = res.UploadID
  733. }
  734. var poolSize int
  735. if opt.ThreadPoolSize > 0 {
  736. poolSize = opt.ThreadPoolSize
  737. } else {
  738. // Default is one
  739. poolSize = 1
  740. }
  741. chjobs := make(chan *Jobs, 100)
  742. chresults := make(chan *Results, 10000)
  743. optcom := &CompleteMultipartUploadOptions{}
  744. // 3.Start worker
  745. for w := 1; w <= poolSize; w++ {
  746. go worker(s, chjobs, chresults)
  747. }
  748. // progress started event
  749. var listener ProgressListener
  750. var consumedBytes int64
  751. if opt.OptIni != nil {
  752. listener = opt.OptIni.Listener
  753. }
  754. event := newProgressEvent(ProgressStartedEvent, 0, 0, totalBytes)
  755. progressCallback(listener, event)
  756. // 4.Push jobs
  757. go func() {
  758. for _, chunk := range chunks {
  759. if chunk.Done {
  760. continue
  761. }
  762. partOpt := &ObjectUploadPartOptions{}
  763. if optini != nil && optini.ObjectPutHeaderOptions != nil {
  764. partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
  765. partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
  766. partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
  767. partOpt.XCosTrafficLimit = optini.XCosTrafficLimit
  768. }
  769. job := &Jobs{
  770. Name: name,
  771. RetryTimes: 3,
  772. FilePath: filepath,
  773. UploadId: uploadID,
  774. Chunk: chunk,
  775. Opt: partOpt,
  776. }
  777. chjobs <- job
  778. }
  779. close(chjobs)
  780. }()
  781. // 5.Recv the resp etag to complete
  782. err = nil
  783. for i := 0; i < partNum; i++ {
  784. if chunks[i].Done {
  785. optcom.Parts = append(optcom.Parts, Object{
  786. PartNumber: chunks[i].Number, ETag: chunks[i].ETag},
  787. )
  788. if err == nil {
  789. consumedBytes += chunks[i].Size
  790. event = newProgressEvent(ProgressDataEvent, chunks[i].Size, consumedBytes, totalBytes)
  791. progressCallback(listener, event)
  792. }
  793. continue
  794. }
  795. res := <-chresults
  796. // Notice one part fail can not get the etag according.
  797. if res.Resp == nil || res.err != nil {
  798. // Some part already fail, can not to get the header inside.
  799. err = fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
  800. continue
  801. }
  802. // Notice one part fail can not get the etag according.
  803. etag := res.Resp.Header.Get("ETag")
  804. optcom.Parts = append(optcom.Parts, Object{
  805. PartNumber: res.PartNumber, ETag: etag},
  806. )
  807. if err == nil {
  808. consumedBytes += chunks[res.PartNumber-1].Size
  809. event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes)
  810. progressCallback(listener, event)
  811. }
  812. }
  813. close(chresults)
  814. if err != nil {
  815. event = newProgressEvent(ProgressFailedEvent, 0, consumedBytes, totalBytes, err)
  816. progressCallback(listener, event)
  817. return nil, nil, err
  818. }
  819. sort.Sort(ObjectList(optcom.Parts))
  820. event = newProgressEvent(ProgressCompletedEvent, 0, consumedBytes, totalBytes)
  821. progressCallback(listener, event)
  822. v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
  823. if err != nil {
  824. s.AbortMultipartUpload(ctx, name, uploadID)
  825. return v, resp, err
  826. }
  827. if resp != nil && s.client.Conf.EnableCRC {
  828. scoscrc := resp.Header.Get("x-cos-hash-crc64ecma")
  829. icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64)
  830. if icoscrc != localcrc {
  831. return v, resp, fmt.Errorf("verification failed, want:%v, return:%v", localcrc, icoscrc)
  832. }
  833. }
  834. return v, resp, err
  835. }
  836. type ObjectPutTaggingOptions struct {
  837. XMLName xml.Name `xml:"Tagging"`
  838. TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`
  839. }
  840. type ObjectTaggingTag BucketTaggingTag
  841. type ObjectGetTaggingResult ObjectPutTaggingOptions
  842. func (s *ObjectService) PutTagging(ctx context.Context, name string, opt *ObjectPutTaggingOptions, id ...string) (*Response, error) {
  843. var u string
  844. if len(id) == 1 {
  845. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  846. } else if len(id) == 0 {
  847. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  848. } else {
  849. return nil, errors.New("wrong params")
  850. }
  851. sendOpt := &sendOptions{
  852. baseURL: s.client.BaseURL.BucketURL,
  853. uri: u,
  854. method: http.MethodPut,
  855. body: opt,
  856. }
  857. resp, err := s.client.send(ctx, sendOpt)
  858. return resp, err
  859. }
  860. func (s *ObjectService) GetTagging(ctx context.Context, name string, id ...string) (*ObjectGetTaggingResult, *Response, error) {
  861. var u string
  862. if len(id) == 1 {
  863. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  864. } else if len(id) == 0 {
  865. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  866. } else {
  867. return nil, nil, errors.New("wrong params")
  868. }
  869. var res ObjectGetTaggingResult
  870. sendOpt := &sendOptions{
  871. baseURL: s.client.BaseURL.BucketURL,
  872. uri: u,
  873. method: http.MethodGet,
  874. result: &res,
  875. }
  876. resp, err := s.client.send(ctx, sendOpt)
  877. return &res, resp, err
  878. }
  879. func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...string) (*Response, error) {
  880. var u string
  881. if len(id) == 1 {
  882. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  883. } else if len(id) == 0 {
  884. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  885. } else {
  886. return nil, errors.New("wrong params")
  887. }
  888. sendOpt := &sendOptions{
  889. baseURL: s.client.BaseURL.BucketURL,
  890. uri: u,
  891. method: http.MethodDelete,
  892. }
  893. resp, err := s.client.send(ctx, sendOpt)
  894. return resp, err
  895. }