You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

880 lines
29 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package cos
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/xml"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "sort"
  14. "strings"
  15. "time"
  16. )
  17. // ObjectService 相关 API
  18. type ObjectService service
  19. // ObjectGetOptions is the option of GetObject
  20. type ObjectGetOptions struct {
  21. ResponseContentType string `url:"response-content-type,omitempty" header:"-"`
  22. ResponseContentLanguage string `url:"response-content-language,omitempty" header:"-"`
  23. ResponseExpires string `url:"response-expires,omitempty" header:"-"`
  24. ResponseCacheControl string `url:"response-cache-control,omitempty" header:"-"`
  25. ResponseContentDisposition string `url:"response-content-disposition,omitempty" header:"-"`
  26. ResponseContentEncoding string `url:"response-content-encoding,omitempty" header:"-"`
  27. Range string `url:"-" header:"Range,omitempty"`
  28. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  29. // SSE-C
  30. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  31. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  32. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  33. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  34. }
  35. // presignedURLTestingOptions is the opt of presigned url
  36. type presignedURLTestingOptions struct {
  37. authTime *AuthTime
  38. }
  39. // Get Object 请求可以将一个文件(Object)下载至本地。
  40. // 该操作需要对目标 Object 具有读权限或目标 Object 对所有人都开放了读权限(公有读)。
  41. //
  42. // https://www.qcloud.com/document/product/436/7753
  43. func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  44. var u string
  45. if len(id) == 1 {
  46. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  47. } else if len(id) == 0 {
  48. u = "/" + encodeURIComponent(name)
  49. } else {
  50. return nil, errors.New("wrong params")
  51. }
  52. sendOpt := sendOptions{
  53. baseURL: s.client.BaseURL.BucketURL,
  54. uri: u,
  55. method: http.MethodGet,
  56. optQuery: opt,
  57. optHeader: opt,
  58. disableCloseBody: true,
  59. }
  60. resp, err := s.client.send(ctx, &sendOpt)
  61. return resp, err
  62. }
  63. // GetToFile download the object to local file
  64. func (s *ObjectService) GetToFile(ctx context.Context, name, localpath string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  65. resp, err := s.Get(ctx, name, opt, id...)
  66. if err != nil {
  67. return resp, err
  68. }
  69. defer resp.Body.Close()
  70. // If file exist, overwrite it
  71. fd, err := os.OpenFile(localpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  72. if err != nil {
  73. return resp, err
  74. }
  75. _, err = io.Copy(fd, resp.Body)
  76. fd.Close()
  77. if err != nil {
  78. return resp, err
  79. }
  80. return resp, nil
  81. }
  82. // GetPresignedURL get the object presigned to down or upload file by url
  83. func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, ak, sk string, expired time.Duration, opt interface{}) (*url.URL, error) {
  84. sendOpt := sendOptions{
  85. baseURL: s.client.BaseURL.BucketURL,
  86. uri: "/" + encodeURIComponent(name),
  87. method: httpMethod,
  88. optQuery: opt,
  89. optHeader: opt,
  90. }
  91. req, err := s.client.newRequest(ctx, sendOpt.baseURL, sendOpt.uri, sendOpt.method, sendOpt.body, sendOpt.optQuery, sendOpt.optHeader)
  92. if err != nil {
  93. return nil, err
  94. }
  95. var authTime *AuthTime
  96. if opt != nil {
  97. if opt, ok := opt.(*presignedURLTestingOptions); ok {
  98. authTime = opt.authTime
  99. }
  100. }
  101. if authTime == nil {
  102. authTime = NewAuthTime(expired)
  103. }
  104. authorization := newAuthorization(ak, sk, req, authTime)
  105. sign := encodeURIComponent(authorization)
  106. if req.URL.RawQuery == "" {
  107. req.URL.RawQuery = fmt.Sprintf("sign=%s", sign)
  108. } else {
  109. req.URL.RawQuery = fmt.Sprintf("%s&sign=%s", req.URL.RawQuery, sign)
  110. }
  111. return req.URL, nil
  112. }
  113. // ObjectPutHeaderOptions the options of header of the put object
  114. type ObjectPutHeaderOptions struct {
  115. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  116. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  117. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  118. ContentType string `header:"Content-Type,omitempty" url:"-"`
  119. ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
  120. ContentLength int `header:"Content-Length,omitempty" url:"-"`
  121. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  122. Expect string `header:"Expect,omitempty" url:"-"`
  123. Expires string `header:"Expires,omitempty" url:"-"`
  124. XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
  125. // 自定义的 x-cos-meta-* header
  126. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  127. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-"`
  128. // 可选值: Normal, Appendable
  129. //XCosObjectType string `header:"x-cos-object-type,omitempty" url:"-"`
  130. // Enable Server Side Encryption, Only supported: AES256
  131. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  132. // SSE-C
  133. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  134. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  135. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  136. //兼容其他自定义头部
  137. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  138. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  139. }
  140. // ObjectPutOptions the options of put object
  141. type ObjectPutOptions struct {
  142. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  143. *ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  144. }
  145. // Put Object请求可以将一个文件(Oject)上传至指定Bucket。
  146. //
  147. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  148. //
  149. // https://www.qcloud.com/document/product/436/7749
  150. func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  151. sendOpt := sendOptions{
  152. baseURL: s.client.BaseURL.BucketURL,
  153. uri: "/" + encodeURIComponent(name),
  154. method: http.MethodPut,
  155. body: r,
  156. optHeader: opt,
  157. }
  158. resp, err := s.client.send(ctx, &sendOpt)
  159. return resp, err
  160. }
  161. // PutFromFile put object from local file
  162. // Notice that when use this put large file need set non-body of debug req/resp, otherwise will out of memory
  163. func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (*Response, error) {
  164. fd, err := os.Open(filePath)
  165. if err != nil {
  166. return nil, err
  167. }
  168. defer fd.Close()
  169. return s.Put(ctx, name, fd, opt)
  170. }
  171. // ObjectCopyHeaderOptions is the head option of the Copy
  172. type ObjectCopyHeaderOptions struct {
  173. // When use replace directive to update meta infos
  174. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  175. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  176. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  177. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  178. ContentType string `header:"Content-Type,omitempty" url:"-"`
  179. Expires string `header:"Expires,omitempty" url:"-"`
  180. Expect string `header:"Expect,omitempty" url:"-"`
  181. XCosMetadataDirective string `header:"x-cos-metadata-directive,omitempty" url:"-" xml:"-"`
  182. XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-" xml:"-"`
  183. XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-" xml:"-"`
  184. XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-" xml:"-"`
  185. XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-" xml:"-"`
  186. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-" xml:"-"`
  187. // 自定义的 x-cos-meta-* header
  188. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  189. XCosCopySource string `header:"x-cos-copy-source" url:"-" xml:"-"`
  190. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  191. // SSE-C
  192. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  193. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  194. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  195. XCosCopySourceSSECustomerAglo string `header:"x-cos-copy-source-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  196. XCosCopySourceSSECustomerKey string `header:"x-cos-copy-source-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  197. XCosCopySourceSSECustomerKeyMD5 string `header:"x-cos-copy-source-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  198. //兼容其他自定义头部
  199. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  200. }
  201. // ObjectCopyOptions is the option of Copy, choose header or body
  202. type ObjectCopyOptions struct {
  203. *ObjectCopyHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  204. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  205. }
  206. // ObjectCopyResult is the result of Copy
  207. type ObjectCopyResult struct {
  208. XMLName xml.Name `xml:"CopyObjectResult"`
  209. ETag string `xml:"ETag,omitempty"`
  210. LastModified string `xml:"LastModified,omitempty"`
  211. }
  212. // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
  213. // 超过 5G 的文件请使用分块上传 Upload - Copy。在拷贝的过程中,文件元属性和 ACL 可以被修改。
  214. //
  215. // 用户可以通过该接口实现文件移动,文件重命名,修改文件属性和创建副本。
  216. //
  217. // 注意:在跨帐号复制的时候,需要先设置被复制文件的权限为公有读,或者对目标帐号赋权,同帐号则不需要。
  218. //
  219. // https://cloud.tencent.com/document/product/436/10881
  220. func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *ObjectCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
  221. surl := strings.SplitN(sourceURL, "/", 2)
  222. if len(surl) < 2 {
  223. return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
  224. }
  225. var u string
  226. if len(id) == 1 {
  227. u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
  228. } else if len(id) == 0 {
  229. u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
  230. } else {
  231. return nil, nil, errors.New("wrong params")
  232. }
  233. var res ObjectCopyResult
  234. if opt == nil {
  235. opt = new(ObjectCopyOptions)
  236. }
  237. if opt.ObjectCopyHeaderOptions == nil {
  238. opt.ObjectCopyHeaderOptions = new(ObjectCopyHeaderOptions)
  239. }
  240. opt.XCosCopySource = u
  241. sendOpt := sendOptions{
  242. baseURL: s.client.BaseURL.BucketURL,
  243. uri: "/" + encodeURIComponent(name),
  244. method: http.MethodPut,
  245. body: nil,
  246. optHeader: opt,
  247. result: &res,
  248. }
  249. resp, err := s.client.send(ctx, &sendOpt)
  250. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  251. if err == nil && resp.StatusCode == 200 {
  252. if res.ETag == "" {
  253. return &res, resp, errors.New("response 200 OK, but body contains an error")
  254. }
  255. }
  256. return &res, resp, err
  257. }
  258. type ObjectDeleteOptions struct {
  259. // SSE-C
  260. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  261. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  262. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  263. //兼容其他自定义头部
  264. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  265. VersionId string `header:"-" url:"VersionId,omitempty" xml:"-"`
  266. }
  267. // Delete Object请求可以将一个文件(Object)删除。
  268. //
  269. // https://www.qcloud.com/document/product/436/7743
  270. func (s *ObjectService) Delete(ctx context.Context, name string, opt ...*ObjectDeleteOptions) (*Response, error) {
  271. var optHeader *ObjectDeleteOptions
  272. // When use "" string might call the delete bucket interface
  273. if len(name) == 0 {
  274. return nil, errors.New("empty object name")
  275. }
  276. if len(opt) > 0 {
  277. optHeader = opt[0]
  278. }
  279. sendOpt := sendOptions{
  280. baseURL: s.client.BaseURL.BucketURL,
  281. uri: "/" + encodeURIComponent(name),
  282. method: http.MethodDelete,
  283. optHeader: optHeader,
  284. optQuery: optHeader,
  285. }
  286. resp, err := s.client.send(ctx, &sendOpt)
  287. return resp, err
  288. }
  289. // ObjectHeadOptions is the option of HeadObject
  290. type ObjectHeadOptions struct {
  291. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  292. // SSE-C
  293. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  294. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  295. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  296. }
  297. // Head Object请求可以取回对应Object的元数据,Head的权限与Get的权限一致
  298. //
  299. // https://www.qcloud.com/document/product/436/7745
  300. func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOptions, id ...string) (*Response, error) {
  301. var u string
  302. if len(id) == 1 {
  303. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  304. } else if len(id) == 0 {
  305. u = "/" + encodeURIComponent(name)
  306. } else {
  307. return nil, errors.New("wrong params")
  308. }
  309. sendOpt := sendOptions{
  310. baseURL: s.client.BaseURL.BucketURL,
  311. uri: u,
  312. method: http.MethodHead,
  313. optHeader: opt,
  314. }
  315. resp, err := s.client.send(ctx, &sendOpt)
  316. if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
  317. resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
  318. }
  319. return resp, err
  320. }
  321. // ObjectOptionsOptions is the option of object options
  322. type ObjectOptionsOptions struct {
  323. Origin string `url:"-" header:"Origin"`
  324. AccessControlRequestMethod string `url:"-" header:"Access-Control-Request-Method"`
  325. AccessControlRequestHeaders string `url:"-" header:"Access-Control-Request-Headers,omitempty"`
  326. }
  327. // Options Object请求实现跨域访问的预请求。即发出一个 OPTIONS 请求给服务器以确认是否可以进行跨域操作。
  328. //
  329. // 当CORS配置不存在时,请求返回403 Forbidden。
  330. //
  331. // https://www.qcloud.com/document/product/436/8288
  332. func (s *ObjectService) Options(ctx context.Context, name string, opt *ObjectOptionsOptions) (*Response, error) {
  333. sendOpt := sendOptions{
  334. baseURL: s.client.BaseURL.BucketURL,
  335. uri: "/" + encodeURIComponent(name),
  336. method: http.MethodOptions,
  337. optHeader: opt,
  338. }
  339. resp, err := s.client.send(ctx, &sendOpt)
  340. return resp, err
  341. }
  342. // CASJobParameters support three way: Standard(in 35 hours), Expedited(quick way, in 15 mins), Bulk(in 5-12 hours_
  343. type CASJobParameters struct {
  344. Tier string `xml:"Tier"`
  345. }
  346. // ObjectRestoreOptions is the option of object restore
  347. type ObjectRestoreOptions struct {
  348. XMLName xml.Name `xml:"RestoreRequest"`
  349. Days int `xml:"Days"`
  350. Tier *CASJobParameters `xml:"CASJobParameters"`
  351. }
  352. // PutRestore API can recover an object of type archived by COS archive.
  353. //
  354. // https://cloud.tencent.com/document/product/436/12633
  355. func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *ObjectRestoreOptions) (*Response, error) {
  356. u := fmt.Sprintf("/%s?restore", encodeURIComponent(name))
  357. sendOpt := sendOptions{
  358. baseURL: s.client.BaseURL.BucketURL,
  359. uri: u,
  360. method: http.MethodPost,
  361. body: opt,
  362. }
  363. resp, err := s.client.send(ctx, &sendOpt)
  364. return resp, err
  365. }
  366. // TODO Append 接口在优化未开放使用
  367. //
  368. // Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
  369. // 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
  370. //
  371. // 文件属性可以在Head Object操作中被查询到,当您发起Head Object请求时,会返回自定义Header『x-cos-object-type』,该Header只有两个枚举值:Normal或者Appendable。
  372. //
  373. // 追加上传建议文件大小1M - 5G。如果position的值和当前Object的长度不致,COS会返回409错误。
  374. // 如果Append一个Normal的Object,COS会返回409 ObjectNotAppendable。
  375. //
  376. // Appendable的文件不可以被复制,不参与版本管理,不参与生命周期管理,不可跨区域复制。
  377. //
  378. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  379. //
  380. // https://www.qcloud.com/document/product/436/7741
  381. // func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  382. // u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
  383. // if position != 0{
  384. // opt = nil
  385. // }
  386. // sendOpt := sendOptions{
  387. // baseURL: s.client.BaseURL.BucketURL,
  388. // uri: u,
  389. // method: http.MethodPost,
  390. // optHeader: opt,
  391. // body: r,
  392. // }
  393. // resp, err := s.client.send(ctx, &sendOpt)
  394. // return resp, err
  395. // }
  396. // ObjectDeleteMultiOptions is the option of DeleteMulti
  397. type ObjectDeleteMultiOptions struct {
  398. XMLName xml.Name `xml:"Delete" header:"-"`
  399. Quiet bool `xml:"Quiet" header:"-"`
  400. Objects []Object `xml:"Object" header:"-"`
  401. //XCosSha1 string `xml:"-" header:"x-cos-sha1"`
  402. }
  403. // ObjectDeleteMultiResult is the result of DeleteMulti
  404. type ObjectDeleteMultiResult struct {
  405. XMLName xml.Name `xml:"DeleteResult"`
  406. DeletedObjects []Object `xml:"Deleted,omitempty"`
  407. Errors []struct {
  408. Key string `xml:",omitempty"`
  409. Code string `xml:",omitempty"`
  410. Message string `xml:",omitempty"`
  411. VersionId string `xml:",omitempty"`
  412. } `xml:"Error,omitempty"`
  413. }
  414. // DeleteMulti 请求实现批量删除文件,最大支持单次删除1000个文件。
  415. // 对于返回结果,COS提供Verbose和Quiet两种结果模式。Verbose模式将返回每个Object的删除结果;
  416. // Quiet模式只返回报错的Object信息。
  417. // https://www.qcloud.com/document/product/436/8289
  418. func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiOptions) (*ObjectDeleteMultiResult, *Response, error) {
  419. var res ObjectDeleteMultiResult
  420. sendOpt := sendOptions{
  421. baseURL: s.client.BaseURL.BucketURL,
  422. uri: "/?delete",
  423. method: http.MethodPost,
  424. body: opt,
  425. result: &res,
  426. }
  427. resp, err := s.client.send(ctx, &sendOpt)
  428. return &res, resp, err
  429. }
  430. // Object is the meta info of the object
  431. type Object struct {
  432. Key string `xml:",omitempty"`
  433. ETag string `xml:",omitempty"`
  434. Size int `xml:",omitempty"`
  435. PartNumber int `xml:",omitempty"`
  436. LastModified string `xml:",omitempty"`
  437. StorageClass string `xml:",omitempty"`
  438. Owner *Owner `xml:",omitempty"`
  439. VersionId string `xml:",omitempty"`
  440. }
  441. // MultiUploadOptions is the option of the multiupload,
  442. // ThreadPoolSize default is one
  443. type MultiUploadOptions struct {
  444. OptIni *InitiateMultipartUploadOptions
  445. PartSize int64
  446. ThreadPoolSize int
  447. CheckPoint bool
  448. }
  449. type Chunk struct {
  450. Number int
  451. OffSet int64
  452. Size int64
  453. Done bool
  454. ETag string
  455. }
  456. // jobs
  457. type Jobs struct {
  458. Name string
  459. UploadId string
  460. FilePath string
  461. RetryTimes int
  462. Chunk Chunk
  463. Data io.Reader
  464. Opt *ObjectUploadPartOptions
  465. }
  466. type Results struct {
  467. PartNumber int
  468. Resp *Response
  469. err error
  470. }
  471. func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  472. for j := range jobs {
  473. fd, err := os.Open(j.FilePath)
  474. var res Results
  475. if err != nil {
  476. res.err = err
  477. res.PartNumber = j.Chunk.Number
  478. res.Resp = nil
  479. results <- &res
  480. }
  481. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  482. // UploadPart do not support the chunk trsf, so need to add the content-length
  483. j.Opt.ContentLength = int(j.Chunk.Size)
  484. rt := j.RetryTimes
  485. for {
  486. resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
  487. &io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt)
  488. res.PartNumber = j.Chunk.Number
  489. res.Resp = resp
  490. res.err = err
  491. if err != nil {
  492. rt--
  493. if rt == 0 {
  494. fd.Close()
  495. results <- &res
  496. break
  497. }
  498. continue
  499. }
  500. fd.Close()
  501. results <- &res
  502. break
  503. }
  504. }
  505. }
  506. func DividePart(fileSize int64) (int64, int64) {
  507. partSize := int64(1 * 1024 * 1024)
  508. partNum := fileSize / partSize
  509. for partNum >= 10000 {
  510. partSize = partSize * 2
  511. partNum = fileSize / partSize
  512. }
  513. return partNum, partSize
  514. }
  515. func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) {
  516. if filePath == "" {
  517. return nil, 0, errors.New("filePath invalid")
  518. }
  519. file, err := os.Open(filePath)
  520. if err != nil {
  521. return nil, 0, err
  522. }
  523. defer file.Close()
  524. stat, err := file.Stat()
  525. if err != nil {
  526. return nil, 0, err
  527. }
  528. var partNum int64
  529. if partSize > 0 {
  530. partSize = partSize * 1024 * 1024
  531. partNum = stat.Size() / partSize
  532. if partNum >= 10000 {
  533. return nil, 0, errors.New("Too many parts, out of 10000")
  534. }
  535. } else {
  536. partNum, partSize = DividePart(stat.Size())
  537. }
  538. var chunks []Chunk
  539. var chunk = Chunk{}
  540. for i := int64(0); i < partNum; i++ {
  541. chunk.Number = int(i + 1)
  542. chunk.OffSet = i * partSize
  543. chunk.Size = partSize
  544. chunks = append(chunks, chunk)
  545. }
  546. if stat.Size()%partSize > 0 {
  547. chunk.Number = len(chunks) + 1
  548. chunk.OffSet = int64(len(chunks)) * partSize
  549. chunk.Size = stat.Size() % partSize
  550. chunks = append(chunks, chunk)
  551. partNum++
  552. }
  553. return chunks, int(partNum), nil
  554. }
  555. func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) {
  556. opt := &ObjectListUploadsOptions{
  557. Prefix: name,
  558. EncodingType: "url",
  559. }
  560. res, _, err := s.ListUploads(ctx, opt)
  561. if err != nil {
  562. return "", err
  563. }
  564. if len(res.Upload) == 0 {
  565. return "", nil
  566. }
  567. last := len(res.Upload) - 1
  568. for last >= 0 {
  569. decodeKey, _ := decodeURIComponent(res.Upload[last].Key)
  570. if decodeKey == name {
  571. return decodeURIComponent(res.Upload[last].UploadID)
  572. }
  573. last = last - 1
  574. }
  575. return "", nil
  576. }
  577. func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error {
  578. var uploadedParts []Object
  579. isTruncated := true
  580. opt := &ObjectListPartsOptions{
  581. EncodingType: "url",
  582. }
  583. for isTruncated {
  584. res, _, err := s.ListParts(ctx, name, UploadID, opt)
  585. if err != nil {
  586. return err
  587. }
  588. if len(res.Parts) > 0 {
  589. uploadedParts = append(uploadedParts, res.Parts...)
  590. }
  591. isTruncated = res.IsTruncated
  592. opt.PartNumberMarker = res.NextPartNumberMarker
  593. }
  594. fd, err := os.Open(filepath)
  595. if err != nil {
  596. return err
  597. }
  598. defer fd.Close()
  599. // 某个分块出错, 重置chunks
  600. ret := func(e error) error {
  601. for i, _ := range chunks {
  602. chunks[i].Done = false
  603. chunks[i].ETag = ""
  604. }
  605. return e
  606. }
  607. for _, part := range uploadedParts {
  608. partNumber := part.PartNumber
  609. if partNumber > partNum {
  610. return ret(errors.New("Part Number is not consistent"))
  611. }
  612. partNumber = partNumber - 1
  613. fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET)
  614. bs, err := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size))
  615. if err != nil {
  616. return ret(err)
  617. }
  618. localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs))
  619. if localMD5 != part.ETag {
  620. return ret(errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber)))
  621. }
  622. chunks[partNumber].Done = true
  623. chunks[partNumber].ETag = part.ETag
  624. }
  625. return nil
  626. }
  627. // MultiUpload/Upload 为高级upload接口,并发分块上传
  628. // 注意该接口目前只供参考
  629. //
  630. // 当 partSize > 0 时,由调用者指定分块大小,否则由 SDK 自动切分,单位为MB
  631. // 由调用者指定分块大小时,请确认分块数量不超过10000
  632. //
  633. func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  634. return s.Upload(ctx, name, filepath, opt)
  635. }
  636. func (s *ObjectService) Upload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  637. if opt == nil {
  638. opt = &MultiUploadOptions{}
  639. }
  640. // 1.Get the file chunk
  641. chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
  642. if err != nil {
  643. return nil, nil, err
  644. }
  645. // filesize=0 , use simple upload
  646. if partNum == 0 {
  647. var opt0 *ObjectPutOptions
  648. if opt.OptIni != nil {
  649. opt0 = &ObjectPutOptions{
  650. opt.OptIni.ACLHeaderOptions,
  651. opt.OptIni.ObjectPutHeaderOptions,
  652. }
  653. }
  654. rsp, err := s.PutFromFile(ctx, name, filepath, opt0)
  655. if err != nil {
  656. return nil, rsp, err
  657. }
  658. result := &CompleteMultipartUploadResult{
  659. Key: name,
  660. ETag: rsp.Header.Get("ETag"),
  661. }
  662. return result, rsp, nil
  663. }
  664. var uploadID string
  665. resumableFlag := false
  666. if opt.CheckPoint {
  667. var err error
  668. uploadID, err = s.getResumableUploadID(ctx, name)
  669. if err == nil && uploadID != "" {
  670. err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum)
  671. resumableFlag = (err == nil)
  672. }
  673. }
  674. // 2.Init
  675. optini := opt.OptIni
  676. if !resumableFlag {
  677. res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
  678. if err != nil {
  679. return nil, nil, err
  680. }
  681. uploadID = res.UploadID
  682. }
  683. var poolSize int
  684. if opt.ThreadPoolSize > 0 {
  685. poolSize = opt.ThreadPoolSize
  686. } else {
  687. // Default is one
  688. poolSize = 1
  689. }
  690. chjobs := make(chan *Jobs, 100)
  691. chresults := make(chan *Results, 10000)
  692. optcom := &CompleteMultipartUploadOptions{}
  693. // 3.Start worker
  694. for w := 1; w <= poolSize; w++ {
  695. go worker(s, chjobs, chresults)
  696. }
  697. // 4.Push jobs
  698. for _, chunk := range chunks {
  699. if chunk.Done {
  700. continue
  701. }
  702. partOpt := &ObjectUploadPartOptions{}
  703. if optini != nil && optini.ObjectPutHeaderOptions != nil {
  704. partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
  705. partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
  706. partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
  707. partOpt.XCosTrafficLimit = optini.XCosTrafficLimit
  708. }
  709. job := &Jobs{
  710. Name: name,
  711. RetryTimes: 3,
  712. FilePath: filepath,
  713. UploadId: uploadID,
  714. Chunk: chunk,
  715. Opt: partOpt,
  716. }
  717. chjobs <- job
  718. }
  719. close(chjobs)
  720. // 5.Recv the resp etag to complete
  721. for i := 0; i < partNum; i++ {
  722. if chunks[i].Done {
  723. optcom.Parts = append(optcom.Parts, Object{
  724. PartNumber: chunks[i].Number, ETag: chunks[i].ETag},
  725. )
  726. continue
  727. }
  728. res := <-chresults
  729. // Notice one part fail can not get the etag according.
  730. if res.Resp == nil || res.err != nil {
  731. // Some part already fail, can not to get the header inside.
  732. return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
  733. }
  734. // Notice one part fail can not get the etag according.
  735. etag := res.Resp.Header.Get("ETag")
  736. optcom.Parts = append(optcom.Parts, Object{
  737. PartNumber: res.PartNumber, ETag: etag},
  738. )
  739. }
  740. sort.Sort(ObjectList(optcom.Parts))
  741. v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
  742. return v, resp, err
  743. }
  744. type ObjectPutTaggingOptions struct {
  745. XMLName xml.Name `xml:"Tagging"`
  746. TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`
  747. }
  748. type ObjectTaggingTag BucketTaggingTag
  749. type ObjectGetTaggingResult ObjectPutTaggingOptions
  750. func (s *ObjectService) PutTagging(ctx context.Context, name string, opt *ObjectPutTaggingOptions, id ...string) (*Response, error) {
  751. var u string
  752. if len(id) == 1 {
  753. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  754. } else if len(id) == 0 {
  755. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  756. } else {
  757. return nil, errors.New("wrong params")
  758. }
  759. sendOpt := &sendOptions{
  760. baseURL: s.client.BaseURL.BucketURL,
  761. uri: u,
  762. method: http.MethodPut,
  763. body: opt,
  764. }
  765. resp, err := s.client.send(ctx, sendOpt)
  766. return resp, err
  767. }
  768. func (s *ObjectService) GetTagging(ctx context.Context, name string, id ...string) (*ObjectGetTaggingResult, *Response, error) {
  769. var u string
  770. if len(id) == 1 {
  771. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  772. } else if len(id) == 0 {
  773. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  774. } else {
  775. return nil, nil, errors.New("wrong params")
  776. }
  777. var res ObjectGetTaggingResult
  778. sendOpt := &sendOptions{
  779. baseURL: s.client.BaseURL.BucketURL,
  780. uri: u,
  781. method: http.MethodGet,
  782. result: &res,
  783. }
  784. resp, err := s.client.send(ctx, sendOpt)
  785. return &res, resp, err
  786. }
  787. func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...string) (*Response, error) {
  788. var u string
  789. if len(id) == 1 {
  790. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  791. } else if len(id) == 0 {
  792. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  793. } else {
  794. return nil, errors.New("wrong params")
  795. }
  796. sendOpt := &sendOptions{
  797. baseURL: s.client.BaseURL.BucketURL,
  798. uri: u,
  799. method: http.MethodDelete,
  800. }
  801. resp, err := s.client.send(ctx, sendOpt)
  802. return resp, err
  803. }