You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

886 lines
29 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package cos
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/xml"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "io/ioutil"
  10. "net/http"
  11. "net/url"
  12. "os"
  13. "sort"
  14. "strings"
  15. "time"
  16. )
  17. // ObjectService 相关 API
  18. type ObjectService service
  19. // ObjectGetOptions is the option of GetObject
  20. type ObjectGetOptions struct {
  21. ResponseContentType string `url:"response-content-type,omitempty" header:"-"`
  22. ResponseContentLanguage string `url:"response-content-language,omitempty" header:"-"`
  23. ResponseExpires string `url:"response-expires,omitempty" header:"-"`
  24. ResponseCacheControl string `url:"response-cache-control,omitempty" header:"-"`
  25. ResponseContentDisposition string `url:"response-content-disposition,omitempty" header:"-"`
  26. ResponseContentEncoding string `url:"response-content-encoding,omitempty" header:"-"`
  27. Range string `url:"-" header:"Range,omitempty"`
  28. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  29. // SSE-C
  30. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  31. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  32. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  33. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  34. }
  35. // presignedURLTestingOptions is the opt of presigned url
  36. type presignedURLTestingOptions struct {
  37. authTime *AuthTime
  38. }
  39. // Get Object 请求可以将一个文件(Object)下载至本地。
  40. // 该操作需要对目标 Object 具有读权限或目标 Object 对所有人都开放了读权限(公有读)。
  41. //
  42. // https://www.qcloud.com/document/product/436/7753
  43. func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  44. var u string
  45. if len(id) == 1 {
  46. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  47. } else if len(id) == 0 {
  48. u = "/" + encodeURIComponent(name)
  49. } else {
  50. return nil, errors.New("wrong params")
  51. }
  52. sendOpt := sendOptions{
  53. baseURL: s.client.BaseURL.BucketURL,
  54. uri: u,
  55. method: http.MethodGet,
  56. optQuery: opt,
  57. optHeader: opt,
  58. disableCloseBody: true,
  59. }
  60. resp, err := s.client.send(ctx, &sendOpt)
  61. return resp, err
  62. }
  63. // GetToFile download the object to local file
  64. func (s *ObjectService) GetToFile(ctx context.Context, name, localpath string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  65. resp, err := s.Get(ctx, name, opt, id...)
  66. if err != nil {
  67. return resp, err
  68. }
  69. defer resp.Body.Close()
  70. // If file exist, overwrite it
  71. fd, err := os.OpenFile(localpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  72. if err != nil {
  73. return resp, err
  74. }
  75. _, err = io.Copy(fd, resp.Body)
  76. fd.Close()
  77. if err != nil {
  78. return resp, err
  79. }
  80. return resp, nil
  81. }
  82. // GetPresignedURL get the object presigned to down or upload file by url
  83. func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, ak, sk string, expired time.Duration, opt interface{}) (*url.URL, error) {
  84. sendOpt := sendOptions{
  85. baseURL: s.client.BaseURL.BucketURL,
  86. uri: "/" + encodeURIComponent(name),
  87. method: httpMethod,
  88. optQuery: opt,
  89. optHeader: opt,
  90. }
  91. req, err := s.client.newRequest(ctx, sendOpt.baseURL, sendOpt.uri, sendOpt.method, sendOpt.body, sendOpt.optQuery, sendOpt.optHeader)
  92. if err != nil {
  93. return nil, err
  94. }
  95. var authTime *AuthTime
  96. if opt != nil {
  97. if opt, ok := opt.(*presignedURLTestingOptions); ok {
  98. authTime = opt.authTime
  99. }
  100. }
  101. if authTime == nil {
  102. authTime = NewAuthTime(expired)
  103. }
  104. authorization := newAuthorization(ak, sk, req, authTime)
  105. sign := encodeURIComponent(authorization, []byte{'&','='})
  106. if req.URL.RawQuery == "" {
  107. req.URL.RawQuery = fmt.Sprintf("%s", sign)
  108. } else {
  109. req.URL.RawQuery = fmt.Sprintf("%s&%s", req.URL.RawQuery, sign)
  110. }
  111. return req.URL, nil
  112. }
  113. // ObjectPutHeaderOptions the options of header of the put object
  114. type ObjectPutHeaderOptions struct {
  115. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  116. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  117. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  118. ContentType string `header:"Content-Type,omitempty" url:"-"`
  119. ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
  120. ContentLength int `header:"Content-Length,omitempty" url:"-"`
  121. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  122. Expect string `header:"Expect,omitempty" url:"-"`
  123. Expires string `header:"Expires,omitempty" url:"-"`
  124. XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
  125. // 自定义的 x-cos-meta-* header
  126. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  127. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-"`
  128. // 可选值: Normal, Appendable
  129. //XCosObjectType string `header:"x-cos-object-type,omitempty" url:"-"`
  130. // Enable Server Side Encryption, Only supported: AES256
  131. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  132. // SSE-C
  133. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  134. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  135. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  136. //兼容其他自定义头部
  137. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  138. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  139. }
  140. // ObjectPutOptions the options of put object
  141. type ObjectPutOptions struct {
  142. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  143. *ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  144. }
  145. // Put Object请求可以将一个文件(Oject)上传至指定Bucket。
  146. //
  147. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  148. //
  149. // https://www.qcloud.com/document/product/436/7749
  150. func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  151. sendOpt := sendOptions{
  152. baseURL: s.client.BaseURL.BucketURL,
  153. uri: "/" + encodeURIComponent(name),
  154. method: http.MethodPut,
  155. body: r,
  156. optHeader: opt,
  157. }
  158. resp, err := s.client.send(ctx, &sendOpt)
  159. return resp, err
  160. }
  161. // PutFromFile put object from local file
  162. // Notice that when use this put large file need set non-body of debug req/resp, otherwise will out of memory
  163. func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (*Response, error) {
  164. fd, err := os.Open(filePath)
  165. if err != nil {
  166. return nil, err
  167. }
  168. defer fd.Close()
  169. return s.Put(ctx, name, fd, opt)
  170. }
  171. // ObjectCopyHeaderOptions is the head option of the Copy
  172. type ObjectCopyHeaderOptions struct {
  173. // When use replace directive to update meta infos
  174. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  175. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  176. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  177. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  178. ContentType string `header:"Content-Type,omitempty" url:"-"`
  179. Expires string `header:"Expires,omitempty" url:"-"`
  180. Expect string `header:"Expect,omitempty" url:"-"`
  181. XCosMetadataDirective string `header:"x-cos-metadata-directive,omitempty" url:"-" xml:"-"`
  182. XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-" xml:"-"`
  183. XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-" xml:"-"`
  184. XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-" xml:"-"`
  185. XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-" xml:"-"`
  186. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-" xml:"-"`
  187. // 自定义的 x-cos-meta-* header
  188. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  189. XCosCopySource string `header:"x-cos-copy-source" url:"-" xml:"-"`
  190. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  191. // SSE-C
  192. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  193. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  194. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  195. XCosCopySourceSSECustomerAglo string `header:"x-cos-copy-source-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  196. XCosCopySourceSSECustomerKey string `header:"x-cos-copy-source-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  197. XCosCopySourceSSECustomerKeyMD5 string `header:"x-cos-copy-source-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  198. //兼容其他自定义头部
  199. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  200. }
  201. // ObjectCopyOptions is the option of Copy, choose header or body
  202. type ObjectCopyOptions struct {
  203. *ObjectCopyHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  204. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  205. }
  206. // ObjectCopyResult is the result of Copy
  207. type ObjectCopyResult struct {
  208. XMLName xml.Name `xml:"CopyObjectResult"`
  209. ETag string `xml:"ETag,omitempty"`
  210. LastModified string `xml:"LastModified,omitempty"`
  211. }
  212. // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
  213. // 超过 5G 的文件请使用分块上传 Upload - Copy。在拷贝的过程中,文件元属性和 ACL 可以被修改。
  214. //
  215. // 用户可以通过该接口实现文件移动,文件重命名,修改文件属性和创建副本。
  216. //
  217. // 注意:在跨帐号复制的时候,需要先设置被复制文件的权限为公有读,或者对目标帐号赋权,同帐号则不需要。
  218. //
  219. // https://cloud.tencent.com/document/product/436/10881
  220. func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *ObjectCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
  221. surl := strings.SplitN(sourceURL, "/", 2)
  222. if len(surl) < 2 {
  223. return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
  224. }
  225. var u string
  226. if len(id) == 1 {
  227. u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
  228. } else if len(id) == 0 {
  229. u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
  230. } else {
  231. return nil, nil, errors.New("wrong params")
  232. }
  233. var res ObjectCopyResult
  234. copyOpt := &ObjectCopyOptions{
  235. &ObjectCopyHeaderOptions{},
  236. &ACLHeaderOptions{},
  237. }
  238. if opt != nil {
  239. if opt.ObjectCopyHeaderOptions != nil {
  240. *copyOpt.ObjectCopyHeaderOptions = *opt.ObjectCopyHeaderOptions
  241. }
  242. if opt.ACLHeaderOptions != nil {
  243. *copyOpt.ACLHeaderOptions = *opt.ACLHeaderOptions
  244. }
  245. }
  246. copyOpt.XCosCopySource = u
  247. sendOpt := sendOptions{
  248. baseURL: s.client.BaseURL.BucketURL,
  249. uri: "/" + encodeURIComponent(name),
  250. method: http.MethodPut,
  251. body: nil,
  252. optHeader: copyOpt,
  253. result: &res,
  254. }
  255. resp, err := s.client.send(ctx, &sendOpt)
  256. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  257. if err == nil && resp.StatusCode == 200 {
  258. if res.ETag == "" {
  259. return &res, resp, errors.New("response 200 OK, but body contains an error")
  260. }
  261. }
  262. return &res, resp, err
  263. }
  264. type ObjectDeleteOptions struct {
  265. // SSE-C
  266. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  267. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  268. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  269. //兼容其他自定义头部
  270. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  271. VersionId string `header:"-" url:"VersionId,omitempty" xml:"-"`
  272. }
  273. // Delete Object请求可以将一个文件(Object)删除。
  274. //
  275. // https://www.qcloud.com/document/product/436/7743
  276. func (s *ObjectService) Delete(ctx context.Context, name string, opt ...*ObjectDeleteOptions) (*Response, error) {
  277. var optHeader *ObjectDeleteOptions
  278. // When use "" string might call the delete bucket interface
  279. if len(name) == 0 {
  280. return nil, errors.New("empty object name")
  281. }
  282. if len(opt) > 0 {
  283. optHeader = opt[0]
  284. }
  285. sendOpt := sendOptions{
  286. baseURL: s.client.BaseURL.BucketURL,
  287. uri: "/" + encodeURIComponent(name),
  288. method: http.MethodDelete,
  289. optHeader: optHeader,
  290. optQuery: optHeader,
  291. }
  292. resp, err := s.client.send(ctx, &sendOpt)
  293. return resp, err
  294. }
  295. // ObjectHeadOptions is the option of HeadObject
  296. type ObjectHeadOptions struct {
  297. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  298. // SSE-C
  299. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  300. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  301. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  302. }
  303. // Head Object请求可以取回对应Object的元数据,Head的权限与Get的权限一致
  304. //
  305. // https://www.qcloud.com/document/product/436/7745
  306. func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOptions, id ...string) (*Response, error) {
  307. var u string
  308. if len(id) == 1 {
  309. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  310. } else if len(id) == 0 {
  311. u = "/" + encodeURIComponent(name)
  312. } else {
  313. return nil, errors.New("wrong params")
  314. }
  315. sendOpt := sendOptions{
  316. baseURL: s.client.BaseURL.BucketURL,
  317. uri: u,
  318. method: http.MethodHead,
  319. optHeader: opt,
  320. }
  321. resp, err := s.client.send(ctx, &sendOpt)
  322. if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
  323. resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
  324. }
  325. return resp, err
  326. }
  327. // ObjectOptionsOptions is the option of object options
  328. type ObjectOptionsOptions struct {
  329. Origin string `url:"-" header:"Origin"`
  330. AccessControlRequestMethod string `url:"-" header:"Access-Control-Request-Method"`
  331. AccessControlRequestHeaders string `url:"-" header:"Access-Control-Request-Headers,omitempty"`
  332. }
  333. // Options Object请求实现跨域访问的预请求。即发出一个 OPTIONS 请求给服务器以确认是否可以进行跨域操作。
  334. //
  335. // 当CORS配置不存在时,请求返回403 Forbidden。
  336. //
  337. // https://www.qcloud.com/document/product/436/8288
  338. func (s *ObjectService) Options(ctx context.Context, name string, opt *ObjectOptionsOptions) (*Response, error) {
  339. sendOpt := sendOptions{
  340. baseURL: s.client.BaseURL.BucketURL,
  341. uri: "/" + encodeURIComponent(name),
  342. method: http.MethodOptions,
  343. optHeader: opt,
  344. }
  345. resp, err := s.client.send(ctx, &sendOpt)
  346. return resp, err
  347. }
  348. // CASJobParameters support three way: Standard(in 35 hours), Expedited(quick way, in 15 mins), Bulk(in 5-12 hours_
  349. type CASJobParameters struct {
  350. Tier string `xml:"Tier"`
  351. }
  352. // ObjectRestoreOptions is the option of object restore
  353. type ObjectRestoreOptions struct {
  354. XMLName xml.Name `xml:"RestoreRequest"`
  355. Days int `xml:"Days"`
  356. Tier *CASJobParameters `xml:"CASJobParameters"`
  357. }
  358. // PutRestore API can recover an object of type archived by COS archive.
  359. //
  360. // https://cloud.tencent.com/document/product/436/12633
  361. func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *ObjectRestoreOptions) (*Response, error) {
  362. u := fmt.Sprintf("/%s?restore", encodeURIComponent(name))
  363. sendOpt := sendOptions{
  364. baseURL: s.client.BaseURL.BucketURL,
  365. uri: u,
  366. method: http.MethodPost,
  367. body: opt,
  368. }
  369. resp, err := s.client.send(ctx, &sendOpt)
  370. return resp, err
  371. }
  372. // TODO Append 接口在优化未开放使用
  373. //
  374. // Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
  375. // 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
  376. //
  377. // 文件属性可以在Head Object操作中被查询到,当您发起Head Object请求时,会返回自定义Header『x-cos-object-type』,该Header只有两个枚举值:Normal或者Appendable。
  378. //
  379. // 追加上传建议文件大小1M - 5G。如果position的值和当前Object的长度不致,COS会返回409错误。
  380. // 如果Append一个Normal的Object,COS会返回409 ObjectNotAppendable。
  381. //
  382. // Appendable的文件不可以被复制,不参与版本管理,不参与生命周期管理,不可跨区域复制。
  383. //
  384. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  385. //
  386. // https://www.qcloud.com/document/product/436/7741
  387. // func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  388. // u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
  389. // if position != 0{
  390. // opt = nil
  391. // }
  392. // sendOpt := sendOptions{
  393. // baseURL: s.client.BaseURL.BucketURL,
  394. // uri: u,
  395. // method: http.MethodPost,
  396. // optHeader: opt,
  397. // body: r,
  398. // }
  399. // resp, err := s.client.send(ctx, &sendOpt)
  400. // return resp, err
  401. // }
  402. // ObjectDeleteMultiOptions is the option of DeleteMulti
  403. type ObjectDeleteMultiOptions struct {
  404. XMLName xml.Name `xml:"Delete" header:"-"`
  405. Quiet bool `xml:"Quiet" header:"-"`
  406. Objects []Object `xml:"Object" header:"-"`
  407. //XCosSha1 string `xml:"-" header:"x-cos-sha1"`
  408. }
  409. // ObjectDeleteMultiResult is the result of DeleteMulti
  410. type ObjectDeleteMultiResult struct {
  411. XMLName xml.Name `xml:"DeleteResult"`
  412. DeletedObjects []Object `xml:"Deleted,omitempty"`
  413. Errors []struct {
  414. Key string `xml:",omitempty"`
  415. Code string `xml:",omitempty"`
  416. Message string `xml:",omitempty"`
  417. VersionId string `xml:",omitempty"`
  418. } `xml:"Error,omitempty"`
  419. }
  420. // DeleteMulti 请求实现批量删除文件,最大支持单次删除1000个文件。
  421. // 对于返回结果,COS提供Verbose和Quiet两种结果模式。Verbose模式将返回每个Object的删除结果;
  422. // Quiet模式只返回报错的Object信息。
  423. // https://www.qcloud.com/document/product/436/8289
  424. func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiOptions) (*ObjectDeleteMultiResult, *Response, error) {
  425. var res ObjectDeleteMultiResult
  426. sendOpt := sendOptions{
  427. baseURL: s.client.BaseURL.BucketURL,
  428. uri: "/?delete",
  429. method: http.MethodPost,
  430. body: opt,
  431. result: &res,
  432. }
  433. resp, err := s.client.send(ctx, &sendOpt)
  434. return &res, resp, err
  435. }
  436. // Object is the meta info of the object
  437. type Object struct {
  438. Key string `xml:",omitempty"`
  439. ETag string `xml:",omitempty"`
  440. Size int `xml:",omitempty"`
  441. PartNumber int `xml:",omitempty"`
  442. LastModified string `xml:",omitempty"`
  443. StorageClass string `xml:",omitempty"`
  444. Owner *Owner `xml:",omitempty"`
  445. VersionId string `xml:",omitempty"`
  446. }
  447. // MultiUploadOptions is the option of the multiupload,
  448. // ThreadPoolSize default is one
  449. type MultiUploadOptions struct {
  450. OptIni *InitiateMultipartUploadOptions
  451. PartSize int64
  452. ThreadPoolSize int
  453. CheckPoint bool
  454. }
  455. type Chunk struct {
  456. Number int
  457. OffSet int64
  458. Size int64
  459. Done bool
  460. ETag string
  461. }
  462. // jobs
  463. type Jobs struct {
  464. Name string
  465. UploadId string
  466. FilePath string
  467. RetryTimes int
  468. Chunk Chunk
  469. Data io.Reader
  470. Opt *ObjectUploadPartOptions
  471. }
  472. type Results struct {
  473. PartNumber int
  474. Resp *Response
  475. err error
  476. }
  477. func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  478. for j := range jobs {
  479. fd, err := os.Open(j.FilePath)
  480. var res Results
  481. if err != nil {
  482. res.err = err
  483. res.PartNumber = j.Chunk.Number
  484. res.Resp = nil
  485. results <- &res
  486. }
  487. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  488. // UploadPart do not support the chunk trsf, so need to add the content-length
  489. j.Opt.ContentLength = int(j.Chunk.Size)
  490. rt := j.RetryTimes
  491. for {
  492. resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
  493. &io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt)
  494. res.PartNumber = j.Chunk.Number
  495. res.Resp = resp
  496. res.err = err
  497. if err != nil {
  498. rt--
  499. if rt == 0 {
  500. fd.Close()
  501. results <- &res
  502. break
  503. }
  504. continue
  505. }
  506. fd.Close()
  507. results <- &res
  508. break
  509. }
  510. }
  511. }
  512. func DividePart(fileSize int64) (int64, int64) {
  513. partSize := int64(1 * 1024 * 1024)
  514. partNum := fileSize / partSize
  515. for partNum >= 10000 {
  516. partSize = partSize * 2
  517. partNum = fileSize / partSize
  518. }
  519. return partNum, partSize
  520. }
  521. func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) {
  522. if filePath == "" {
  523. return nil, 0, errors.New("filePath invalid")
  524. }
  525. file, err := os.Open(filePath)
  526. if err != nil {
  527. return nil, 0, err
  528. }
  529. defer file.Close()
  530. stat, err := file.Stat()
  531. if err != nil {
  532. return nil, 0, err
  533. }
  534. var partNum int64
  535. if partSize > 0 {
  536. partSize = partSize * 1024 * 1024
  537. partNum = stat.Size() / partSize
  538. if partNum >= 10000 {
  539. return nil, 0, errors.New("Too many parts, out of 10000")
  540. }
  541. } else {
  542. partNum, partSize = DividePart(stat.Size())
  543. }
  544. var chunks []Chunk
  545. var chunk = Chunk{}
  546. for i := int64(0); i < partNum; i++ {
  547. chunk.Number = int(i + 1)
  548. chunk.OffSet = i * partSize
  549. chunk.Size = partSize
  550. chunks = append(chunks, chunk)
  551. }
  552. if stat.Size()%partSize > 0 {
  553. chunk.Number = len(chunks) + 1
  554. chunk.OffSet = int64(len(chunks)) * partSize
  555. chunk.Size = stat.Size() % partSize
  556. chunks = append(chunks, chunk)
  557. partNum++
  558. }
  559. return chunks, int(partNum), nil
  560. }
  561. func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) {
  562. opt := &ObjectListUploadsOptions{
  563. Prefix: name,
  564. EncodingType: "url",
  565. }
  566. res, _, err := s.ListUploads(ctx, opt)
  567. if err != nil {
  568. return "", err
  569. }
  570. if len(res.Upload) == 0 {
  571. return "", nil
  572. }
  573. last := len(res.Upload) - 1
  574. for last >= 0 {
  575. decodeKey, _ := decodeURIComponent(res.Upload[last].Key)
  576. if decodeKey == name {
  577. return decodeURIComponent(res.Upload[last].UploadID)
  578. }
  579. last = last - 1
  580. }
  581. return "", nil
  582. }
  583. func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error {
  584. var uploadedParts []Object
  585. isTruncated := true
  586. opt := &ObjectListPartsOptions{
  587. EncodingType: "url",
  588. }
  589. for isTruncated {
  590. res, _, err := s.ListParts(ctx, name, UploadID, opt)
  591. if err != nil {
  592. return err
  593. }
  594. if len(res.Parts) > 0 {
  595. uploadedParts = append(uploadedParts, res.Parts...)
  596. }
  597. isTruncated = res.IsTruncated
  598. opt.PartNumberMarker = res.NextPartNumberMarker
  599. }
  600. fd, err := os.Open(filepath)
  601. if err != nil {
  602. return err
  603. }
  604. defer fd.Close()
  605. // 某个分块出错, 重置chunks
  606. ret := func(e error) error {
  607. for i, _ := range chunks {
  608. chunks[i].Done = false
  609. chunks[i].ETag = ""
  610. }
  611. return e
  612. }
  613. for _, part := range uploadedParts {
  614. partNumber := part.PartNumber
  615. if partNumber > partNum {
  616. return ret(errors.New("Part Number is not consistent"))
  617. }
  618. partNumber = partNumber - 1
  619. fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET)
  620. bs, err := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size))
  621. if err != nil {
  622. return ret(err)
  623. }
  624. localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs))
  625. if localMD5 != part.ETag {
  626. return ret(errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber)))
  627. }
  628. chunks[partNumber].Done = true
  629. chunks[partNumber].ETag = part.ETag
  630. }
  631. return nil
  632. }
  633. // MultiUpload/Upload 为高级upload接口,并发分块上传
  634. // 注意该接口目前只供参考
  635. //
  636. // 当 partSize > 0 时,由调用者指定分块大小,否则由 SDK 自动切分,单位为MB
  637. // 由调用者指定分块大小时,请确认分块数量不超过10000
  638. //
  639. func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  640. return s.Upload(ctx, name, filepath, opt)
  641. }
  642. func (s *ObjectService) Upload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  643. if opt == nil {
  644. opt = &MultiUploadOptions{}
  645. }
  646. // 1.Get the file chunk
  647. chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
  648. if err != nil {
  649. return nil, nil, err
  650. }
  651. // filesize=0 , use simple upload
  652. if partNum == 0 {
  653. var opt0 *ObjectPutOptions
  654. if opt.OptIni != nil {
  655. opt0 = &ObjectPutOptions{
  656. opt.OptIni.ACLHeaderOptions,
  657. opt.OptIni.ObjectPutHeaderOptions,
  658. }
  659. }
  660. rsp, err := s.PutFromFile(ctx, name, filepath, opt0)
  661. if err != nil {
  662. return nil, rsp, err
  663. }
  664. result := &CompleteMultipartUploadResult{
  665. Key: name,
  666. ETag: rsp.Header.Get("ETag"),
  667. }
  668. return result, rsp, nil
  669. }
  670. var uploadID string
  671. resumableFlag := false
  672. if opt.CheckPoint {
  673. var err error
  674. uploadID, err = s.getResumableUploadID(ctx, name)
  675. if err == nil && uploadID != "" {
  676. err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum)
  677. resumableFlag = (err == nil)
  678. }
  679. }
  680. // 2.Init
  681. optini := opt.OptIni
  682. if !resumableFlag {
  683. res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
  684. if err != nil {
  685. return nil, nil, err
  686. }
  687. uploadID = res.UploadID
  688. }
  689. var poolSize int
  690. if opt.ThreadPoolSize > 0 {
  691. poolSize = opt.ThreadPoolSize
  692. } else {
  693. // Default is one
  694. poolSize = 1
  695. }
  696. chjobs := make(chan *Jobs, 100)
  697. chresults := make(chan *Results, 10000)
  698. optcom := &CompleteMultipartUploadOptions{}
  699. // 3.Start worker
  700. for w := 1; w <= poolSize; w++ {
  701. go worker(s, chjobs, chresults)
  702. }
  703. // 4.Push jobs
  704. for _, chunk := range chunks {
  705. if chunk.Done {
  706. continue
  707. }
  708. partOpt := &ObjectUploadPartOptions{}
  709. if optini != nil && optini.ObjectPutHeaderOptions != nil {
  710. partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
  711. partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
  712. partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
  713. partOpt.XCosTrafficLimit = optini.XCosTrafficLimit
  714. }
  715. job := &Jobs{
  716. Name: name,
  717. RetryTimes: 3,
  718. FilePath: filepath,
  719. UploadId: uploadID,
  720. Chunk: chunk,
  721. Opt: partOpt,
  722. }
  723. chjobs <- job
  724. }
  725. close(chjobs)
  726. // 5.Recv the resp etag to complete
  727. for i := 0; i < partNum; i++ {
  728. if chunks[i].Done {
  729. optcom.Parts = append(optcom.Parts, Object{
  730. PartNumber: chunks[i].Number, ETag: chunks[i].ETag},
  731. )
  732. continue
  733. }
  734. res := <-chresults
  735. // Notice one part fail can not get the etag according.
  736. if res.Resp == nil || res.err != nil {
  737. // Some part already fail, can not to get the header inside.
  738. return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
  739. }
  740. // Notice one part fail can not get the etag according.
  741. etag := res.Resp.Header.Get("ETag")
  742. optcom.Parts = append(optcom.Parts, Object{
  743. PartNumber: res.PartNumber, ETag: etag},
  744. )
  745. }
  746. sort.Sort(ObjectList(optcom.Parts))
  747. v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
  748. return v, resp, err
  749. }
  750. type ObjectPutTaggingOptions struct {
  751. XMLName xml.Name `xml:"Tagging"`
  752. TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`
  753. }
  754. type ObjectTaggingTag BucketTaggingTag
  755. type ObjectGetTaggingResult ObjectPutTaggingOptions
  756. func (s *ObjectService) PutTagging(ctx context.Context, name string, opt *ObjectPutTaggingOptions, id ...string) (*Response, error) {
  757. var u string
  758. if len(id) == 1 {
  759. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  760. } else if len(id) == 0 {
  761. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  762. } else {
  763. return nil, errors.New("wrong params")
  764. }
  765. sendOpt := &sendOptions{
  766. baseURL: s.client.BaseURL.BucketURL,
  767. uri: u,
  768. method: http.MethodPut,
  769. body: opt,
  770. }
  771. resp, err := s.client.send(ctx, sendOpt)
  772. return resp, err
  773. }
  774. func (s *ObjectService) GetTagging(ctx context.Context, name string, id ...string) (*ObjectGetTaggingResult, *Response, error) {
  775. var u string
  776. if len(id) == 1 {
  777. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  778. } else if len(id) == 0 {
  779. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  780. } else {
  781. return nil, nil, errors.New("wrong params")
  782. }
  783. var res ObjectGetTaggingResult
  784. sendOpt := &sendOptions{
  785. baseURL: s.client.BaseURL.BucketURL,
  786. uri: u,
  787. method: http.MethodGet,
  788. result: &res,
  789. }
  790. resp, err := s.client.send(ctx, sendOpt)
  791. return &res, resp, err
  792. }
  793. func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...string) (*Response, error) {
  794. var u string
  795. if len(id) == 1 {
  796. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  797. } else if len(id) == 0 {
  798. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  799. } else {
  800. return nil, errors.New("wrong params")
  801. }
  802. sendOpt := &sendOptions{
  803. baseURL: s.client.BaseURL.BucketURL,
  804. uri: u,
  805. method: http.MethodDelete,
  806. }
  807. resp, err := s.client.send(ctx, sendOpt)
  808. return resp, err
  809. }