You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

749 lines
26 KiB

  1. package cos
  2. import (
  3. "context"
  4. "encoding/xml"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "os"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. // ObjectService 相关 API
  16. type ObjectService service
  17. // ObjectGetOptions is the option of GetObject
  18. type ObjectGetOptions struct {
  19. ResponseContentType string `url:"response-content-type,omitempty" header:"-"`
  20. ResponseContentLanguage string `url:"response-content-language,omitempty" header:"-"`
  21. ResponseExpires string `url:"response-expires,omitempty" header:"-"`
  22. ResponseCacheControl string `url:"response-cache-control,omitempty" header:"-"`
  23. ResponseContentDisposition string `url:"response-content-disposition,omitempty" header:"-"`
  24. ResponseContentEncoding string `url:"response-content-encoding,omitempty" header:"-"`
  25. Range string `url:"-" header:"Range,omitempty"`
  26. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  27. // SSE-C
  28. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  29. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  30. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  31. }
  32. // presignedURLTestingOptions is the opt of presigned url
  33. type presignedURLTestingOptions struct {
  34. authTime *AuthTime
  35. }
  36. // Get Object 请求可以将一个文件(Object)下载至本地。
  37. // 该操作需要对目标 Object 具有读权限或目标 Object 对所有人都开放了读权限(公有读)。
  38. //
  39. // https://www.qcloud.com/document/product/436/7753
  40. func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  41. var u string
  42. if len(id) == 1 {
  43. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  44. } else if len(id) == 0 {
  45. u = "/" + encodeURIComponent(name)
  46. } else {
  47. return nil, errors.New("wrong params")
  48. }
  49. sendOpt := sendOptions{
  50. baseURL: s.client.BaseURL.BucketURL,
  51. uri: u,
  52. method: http.MethodGet,
  53. optQuery: opt,
  54. optHeader: opt,
  55. disableCloseBody: true,
  56. }
  57. resp, err := s.client.send(ctx, &sendOpt)
  58. return resp, err
  59. }
  60. // GetToFile download the object to local file
  61. func (s *ObjectService) GetToFile(ctx context.Context, name, localpath string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  62. resp, err := s.Get(ctx, name, opt, id...)
  63. if err != nil {
  64. return resp, err
  65. }
  66. defer resp.Body.Close()
  67. // If file exist, overwrite it
  68. fd, err := os.OpenFile(localpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  69. if err != nil {
  70. return resp, err
  71. }
  72. _, err = io.Copy(fd, resp.Body)
  73. fd.Close()
  74. if err != nil {
  75. return resp, err
  76. }
  77. return resp, nil
  78. }
  79. // GetPresignedURL get the object presigned to down or upload file by url
  80. func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, ak, sk string, expired time.Duration, opt interface{}) (*url.URL, error) {
  81. sendOpt := sendOptions{
  82. baseURL: s.client.BaseURL.BucketURL,
  83. uri: "/" + encodeURIComponent(name),
  84. method: httpMethod,
  85. optQuery: opt,
  86. optHeader: opt,
  87. }
  88. req, err := s.client.newRequest(ctx, sendOpt.baseURL, sendOpt.uri, sendOpt.method, sendOpt.body, sendOpt.optQuery, sendOpt.optHeader)
  89. if err != nil {
  90. return nil, err
  91. }
  92. var authTime *AuthTime
  93. if opt != nil {
  94. if opt, ok := opt.(*presignedURLTestingOptions); ok {
  95. authTime = opt.authTime
  96. }
  97. }
  98. if authTime == nil {
  99. authTime = NewAuthTime(expired)
  100. }
  101. authorization := newAuthorization(ak, sk, req, authTime)
  102. sign := encodeURIComponent(authorization)
  103. if req.URL.RawQuery == "" {
  104. req.URL.RawQuery = fmt.Sprintf("sign=%s", sign)
  105. } else {
  106. req.URL.RawQuery = fmt.Sprintf("%s&sign=%s", req.URL.RawQuery, sign)
  107. }
  108. return req.URL, nil
  109. }
  110. // ObjectPutHeaderOptions the options of header of the put object
  111. type ObjectPutHeaderOptions struct {
  112. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  113. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  114. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  115. ContentType string `header:"Content-Type,omitempty" url:"-"`
  116. ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
  117. ContentLength int `header:"Content-Length,omitempty" url:"-"`
  118. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  119. Expect string `header:"Expect,omitempty" url:"-"`
  120. Expires string `header:"Expires,omitempty" url:"-"`
  121. XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
  122. // 自定义的 x-cos-meta-* header
  123. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  124. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-"`
  125. // 可选值: Normal, Appendable
  126. //XCosObjectType string `header:"x-cos-object-type,omitempty" url:"-"`
  127. // Enable Server Side Encryption, Only supported: AES256
  128. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  129. // SSE-C
  130. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  131. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  132. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  133. //兼容其他自定义头部
  134. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  135. }
  136. // ObjectPutOptions the options of put object
  137. type ObjectPutOptions struct {
  138. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  139. *ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  140. }
  141. // Put Object请求可以将一个文件(Oject)上传至指定Bucket。
  142. //
  143. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  144. //
  145. // https://www.qcloud.com/document/product/436/7749
  146. func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  147. sendOpt := sendOptions{
  148. baseURL: s.client.BaseURL.BucketURL,
  149. uri: "/" + encodeURIComponent(name),
  150. method: http.MethodPut,
  151. body: r,
  152. optHeader: opt,
  153. }
  154. resp, err := s.client.send(ctx, &sendOpt)
  155. return resp, err
  156. }
  157. // PutFromFile put object from local file
  158. // Notice that when use this put large file need set non-body of debug req/resp, otherwise will out of memory
  159. func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (*Response, error) {
  160. fd, err := os.Open(filePath)
  161. if err != nil {
  162. return nil, err
  163. }
  164. defer fd.Close()
  165. return s.Put(ctx, name, fd, opt)
  166. }
  167. // ObjectCopyHeaderOptions is the head option of the Copy
  168. type ObjectCopyHeaderOptions struct {
  169. // When use replace directive to update meta infos
  170. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  171. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  172. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  173. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  174. ContentType string `header:"Content-Type,omitempty" url:"-"`
  175. Expires string `header:"Expires,omitempty" url:"-"`
  176. Expect string `header:"Expect,omitempty" url:"-"`
  177. XCosMetadataDirective string `header:"x-cos-metadata-directive,omitempty" url:"-" xml:"-"`
  178. XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-" xml:"-"`
  179. XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-" xml:"-"`
  180. XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-" xml:"-"`
  181. XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-" xml:"-"`
  182. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-" xml:"-"`
  183. // 自定义的 x-cos-meta-* header
  184. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  185. XCosCopySource string `header:"x-cos-copy-source" url:"-" xml:"-"`
  186. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  187. // SSE-C
  188. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  189. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  190. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  191. XCosCopySourceSSECustomerAglo string `header:"x-cos-copy-source-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  192. XCosCopySourceSSECustomerKey string `header:"x-cos-copy-source-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  193. XCosCopySourceSSECustomerKeyMD5 string `header:"x-cos-copy-source-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  194. //兼容其他自定义头部
  195. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  196. }
  197. // ObjectCopyOptions is the option of Copy, choose header or body
  198. type ObjectCopyOptions struct {
  199. *ObjectCopyHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  200. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  201. }
  202. // ObjectCopyResult is the result of Copy
  203. type ObjectCopyResult struct {
  204. XMLName xml.Name `xml:"CopyObjectResult"`
  205. ETag string `xml:"ETag,omitempty"`
  206. LastModified string `xml:"LastModified,omitempty"`
  207. }
  208. // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
  209. // 超过 5G 的文件请使用分块上传 Upload - Copy。在拷贝的过程中,文件元属性和 ACL 可以被修改。
  210. //
  211. // 用户可以通过该接口实现文件移动,文件重命名,修改文件属性和创建副本。
  212. //
  213. // 注意:在跨帐号复制的时候,需要先设置被复制文件的权限为公有读,或者对目标帐号赋权,同帐号则不需要。
  214. //
  215. // https://cloud.tencent.com/document/product/436/10881
  216. func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *ObjectCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
  217. surl := strings.SplitN(sourceURL, "/", 2)
  218. if len(surl) < 2 {
  219. return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
  220. }
  221. var u string
  222. if len(id) == 1 {
  223. u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
  224. } else if len(id) == 0 {
  225. u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
  226. } else {
  227. return nil, nil, errors.New("wrong params")
  228. }
  229. var res ObjectCopyResult
  230. if opt == nil {
  231. opt = new(ObjectCopyOptions)
  232. }
  233. if opt.ObjectCopyHeaderOptions == nil {
  234. opt.ObjectCopyHeaderOptions = new(ObjectCopyHeaderOptions)
  235. }
  236. opt.XCosCopySource = u
  237. sendOpt := sendOptions{
  238. baseURL: s.client.BaseURL.BucketURL,
  239. uri: "/" + encodeURIComponent(name),
  240. method: http.MethodPut,
  241. body: nil,
  242. optHeader: opt,
  243. result: &res,
  244. }
  245. resp, err := s.client.send(ctx, &sendOpt)
  246. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  247. if err == nil && resp.StatusCode == 200 {
  248. if res.ETag == "" {
  249. return &res, resp, errors.New("response 200 OK, but body contains an error")
  250. }
  251. }
  252. return &res, resp, err
  253. }
  254. type ObjectDeleteOptions struct {
  255. // SSE-C
  256. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  257. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  258. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  259. //兼容其他自定义头部
  260. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  261. }
  262. // Delete Object请求可以将一个文件(Object)删除。
  263. //
  264. // https://www.qcloud.com/document/product/436/7743
  265. func (s *ObjectService) Delete(ctx context.Context, name string, opt ...*ObjectDeleteOptions) (*Response, error) {
  266. var optHeader *ObjectDeleteOptions
  267. // When use "" string might call the delete bucket interface
  268. if len(name) == 0 {
  269. return nil, errors.New("empty object name")
  270. }
  271. if len(opt) > 0 {
  272. optHeader = opt[0]
  273. }
  274. sendOpt := sendOptions{
  275. baseURL: s.client.BaseURL.BucketURL,
  276. uri: "/" + encodeURIComponent(name),
  277. method: http.MethodDelete,
  278. optHeader: optHeader,
  279. }
  280. resp, err := s.client.send(ctx, &sendOpt)
  281. return resp, err
  282. }
  283. // ObjectHeadOptions is the option of HeadObject
  284. type ObjectHeadOptions struct {
  285. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  286. // SSE-C
  287. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  288. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  289. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  290. }
  291. // Head Object请求可以取回对应Object的元数据,Head的权限与Get的权限一致
  292. //
  293. // https://www.qcloud.com/document/product/436/7745
  294. func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOptions, id ...string) (*Response, error) {
  295. var u string
  296. if len(id) == 1 {
  297. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  298. } else if len(id) == 0 {
  299. u = "/" + encodeURIComponent(name)
  300. } else {
  301. return nil, errors.New("wrong params")
  302. }
  303. sendOpt := sendOptions{
  304. baseURL: s.client.BaseURL.BucketURL,
  305. uri: u,
  306. method: http.MethodHead,
  307. optHeader: opt,
  308. }
  309. resp, err := s.client.send(ctx, &sendOpt)
  310. if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
  311. resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
  312. }
  313. return resp, err
  314. }
  315. // ObjectOptionsOptions is the option of object options
  316. type ObjectOptionsOptions struct {
  317. Origin string `url:"-" header:"Origin"`
  318. AccessControlRequestMethod string `url:"-" header:"Access-Control-Request-Method"`
  319. AccessControlRequestHeaders string `url:"-" header:"Access-Control-Request-Headers,omitempty"`
  320. }
  321. // Options Object请求实现跨域访问的预请求。即发出一个 OPTIONS 请求给服务器以确认是否可以进行跨域操作。
  322. //
  323. // 当CORS配置不存在时,请求返回403 Forbidden。
  324. //
  325. // https://www.qcloud.com/document/product/436/8288
  326. func (s *ObjectService) Options(ctx context.Context, name string, opt *ObjectOptionsOptions) (*Response, error) {
  327. sendOpt := sendOptions{
  328. baseURL: s.client.BaseURL.BucketURL,
  329. uri: "/" + encodeURIComponent(name),
  330. method: http.MethodOptions,
  331. optHeader: opt,
  332. }
  333. resp, err := s.client.send(ctx, &sendOpt)
  334. return resp, err
  335. }
  336. // CASJobParameters support three way: Standard(in 35 hours), Expedited(quick way, in 15 mins), Bulk(in 5-12 hours_
  337. type CASJobParameters struct {
  338. Tier string `xml:"Tier"`
  339. }
  340. // ObjectRestoreOptions is the option of object restore
  341. type ObjectRestoreOptions struct {
  342. XMLName xml.Name `xml:"RestoreRequest"`
  343. Days int `xml:"Days"`
  344. Tier *CASJobParameters `xml:"CASJobParameters"`
  345. }
  346. // PutRestore API can recover an object of type archived by COS archive.
  347. //
  348. // https://cloud.tencent.com/document/product/436/12633
  349. func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *ObjectRestoreOptions) (*Response, error) {
  350. u := fmt.Sprintf("/%s?restore", encodeURIComponent(name))
  351. sendOpt := sendOptions{
  352. baseURL: s.client.BaseURL.BucketURL,
  353. uri: u,
  354. method: http.MethodPost,
  355. body: opt,
  356. }
  357. resp, err := s.client.send(ctx, &sendOpt)
  358. return resp, err
  359. }
  360. // TODO Append 接口在优化未开放使用
  361. //
  362. // Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
  363. // 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
  364. //
  365. // 文件属性可以在Head Object操作中被查询到,当您发起Head Object请求时,会返回自定义Header『x-cos-object-type』,该Header只有两个枚举值:Normal或者Appendable。
  366. //
  367. // 追加上传建议文件大小1M - 5G。如果position的值和当前Object的长度不致,COS会返回409错误。
  368. // 如果Append一个Normal的Object,COS会返回409 ObjectNotAppendable。
  369. //
  370. // Appendable的文件不可以被复制,不参与版本管理,不参与生命周期管理,不可跨区域复制。
  371. //
  372. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  373. //
  374. // https://www.qcloud.com/document/product/436/7741
  375. // func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  376. // u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
  377. // if position != 0{
  378. // opt = nil
  379. // }
  380. // sendOpt := sendOptions{
  381. // baseURL: s.client.BaseURL.BucketURL,
  382. // uri: u,
  383. // method: http.MethodPost,
  384. // optHeader: opt,
  385. // body: r,
  386. // }
  387. // resp, err := s.client.send(ctx, &sendOpt)
  388. // return resp, err
  389. // }
  390. // ObjectDeleteMultiOptions is the option of DeleteMulti
  391. type ObjectDeleteMultiOptions struct {
  392. XMLName xml.Name `xml:"Delete" header:"-"`
  393. Quiet bool `xml:"Quiet" header:"-"`
  394. Objects []Object `xml:"Object" header:"-"`
  395. //XCosSha1 string `xml:"-" header:"x-cos-sha1"`
  396. }
  397. // ObjectDeleteMultiResult is the result of DeleteMulti
  398. type ObjectDeleteMultiResult struct {
  399. XMLName xml.Name `xml:"DeleteResult"`
  400. DeletedObjects []Object `xml:"Deleted,omitempty"`
  401. Errors []struct {
  402. Key string
  403. Code string
  404. Message string
  405. } `xml:"Error,omitempty"`
  406. }
  407. // DeleteMulti 请求实现批量删除文件,最大支持单次删除1000个文件。
  408. // 对于返回结果,COS提供Verbose和Quiet两种结果模式。Verbose模式将返回每个Object的删除结果;
  409. // Quiet模式只返回报错的Object信息。
  410. // https://www.qcloud.com/document/product/436/8289
  411. func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiOptions) (*ObjectDeleteMultiResult, *Response, error) {
  412. var res ObjectDeleteMultiResult
  413. sendOpt := sendOptions{
  414. baseURL: s.client.BaseURL.BucketURL,
  415. uri: "/?delete",
  416. method: http.MethodPost,
  417. body: opt,
  418. result: &res,
  419. }
  420. resp, err := s.client.send(ctx, &sendOpt)
  421. return &res, resp, err
  422. }
  423. // Object is the meta info of the object
  424. type Object struct {
  425. Key string `xml:",omitempty"`
  426. ETag string `xml:",omitempty"`
  427. Size int `xml:",omitempty"`
  428. PartNumber int `xml:",omitempty"`
  429. LastModified string `xml:",omitempty"`
  430. StorageClass string `xml:",omitempty"`
  431. Owner *Owner `xml:",omitempty"`
  432. }
  433. // MultiUploadOptions is the option of the multiupload,
  434. // ThreadPoolSize default is one
  435. type MultiUploadOptions struct {
  436. OptIni *InitiateMultipartUploadOptions
  437. PartSize int64
  438. ThreadPoolSize int
  439. }
  440. type Chunk struct {
  441. Number int
  442. OffSet int64
  443. Size int64
  444. }
  445. // jobs
  446. type Jobs struct {
  447. Name string
  448. UploadId string
  449. FilePath string
  450. RetryTimes int
  451. Chunk Chunk
  452. Data io.Reader
  453. Opt *ObjectUploadPartOptions
  454. }
  455. type Results struct {
  456. PartNumber int
  457. Resp *Response
  458. }
  459. func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  460. for j := range jobs {
  461. fd, err := os.Open(j.FilePath)
  462. var res Results
  463. if err != nil {
  464. res.PartNumber = j.Chunk.Number
  465. res.Resp = nil
  466. results <- &res
  467. }
  468. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  469. // UploadPart do not support the chunk trsf, so need to add the content-length
  470. j.Opt.ContentLength = int(j.Chunk.Size)
  471. rt := j.RetryTimes
  472. for {
  473. resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
  474. &io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt)
  475. res.PartNumber = j.Chunk.Number
  476. res.Resp = resp
  477. if err != nil {
  478. rt--
  479. if rt == 0 {
  480. fd.Close()
  481. results <- &res
  482. break
  483. }
  484. continue
  485. }
  486. fd.Close()
  487. results <- &res
  488. break
  489. }
  490. }
  491. }
  492. func DividePart(fileSize int64) (int64, int64) {
  493. partSize := int64(1 * 1024 * 1024)
  494. partNum := fileSize / partSize
  495. for partNum >= 10000 {
  496. partSize = partSize * 2
  497. partNum = fileSize / partSize
  498. }
  499. return partNum, partSize
  500. }
  501. func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) {
  502. if filePath == "" {
  503. return nil, 0, errors.New("filePath invalid")
  504. }
  505. file, err := os.Open(filePath)
  506. if err != nil {
  507. return nil, 0, err
  508. }
  509. defer file.Close()
  510. stat, err := file.Stat()
  511. if err != nil {
  512. return nil, 0, err
  513. }
  514. var partNum int64
  515. if partSize > 0 {
  516. partSize = partSize * 1024 * 1024
  517. partNum = stat.Size() / partSize
  518. if partNum >= 10000 {
  519. return nil, 0, errors.New("Too many parts, out of 10000")
  520. }
  521. } else {
  522. partNum, partSize = DividePart(stat.Size())
  523. }
  524. var chunks []Chunk
  525. var chunk = Chunk{}
  526. for i := int64(0); i < partNum; i++ {
  527. chunk.Number = int(i + 1)
  528. chunk.OffSet = i * partSize
  529. chunk.Size = partSize
  530. chunks = append(chunks, chunk)
  531. }
  532. if stat.Size()%partSize > 0 {
  533. chunk.Number = len(chunks) + 1
  534. chunk.OffSet = int64(len(chunks)) * partSize
  535. chunk.Size = stat.Size() % partSize
  536. chunks = append(chunks, chunk)
  537. partNum++
  538. }
  539. return chunks, int(partNum), nil
  540. }
  541. // MultiUpload/Upload 为高级upload接口,并发分块上传
  542. // 注意该接口目前只供参考
  543. //
  544. // 当 partSize > 0 时,由调用者指定分块大小,否则由 SDK 自动切分,单位为MB
  545. // 由调用者指定分块大小时,请确认分块数量不超过10000
  546. //
  547. func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  548. return s.Upload(ctx, name, filepath, opt)
  549. }
  550. func (s *ObjectService) Upload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  551. if opt == nil {
  552. opt = &MultiUploadOptions{}
  553. }
  554. // 1.Get the file chunk
  555. chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
  556. if err != nil {
  557. return nil, nil, err
  558. }
  559. // 2.Init
  560. optini := opt.OptIni
  561. res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
  562. if err != nil {
  563. return nil, nil, err
  564. }
  565. uploadID := res.UploadID
  566. var poolSize int
  567. if opt.ThreadPoolSize > 0 {
  568. poolSize = opt.ThreadPoolSize
  569. } else {
  570. // Default is one
  571. poolSize = 1
  572. }
  573. chjobs := make(chan *Jobs, 100)
  574. chresults := make(chan *Results, 10000)
  575. optcom := &CompleteMultipartUploadOptions{}
  576. // 3.Start worker
  577. for w := 1; w <= poolSize; w++ {
  578. go worker(s, chjobs, chresults)
  579. }
  580. // 4.Push jobs
  581. for _, chunk := range chunks {
  582. partOpt := &ObjectUploadPartOptions{}
  583. if optini != nil && optini.ObjectPutHeaderOptions != nil {
  584. partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
  585. partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
  586. partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
  587. }
  588. job := &Jobs{
  589. Name: name,
  590. RetryTimes: 3,
  591. FilePath: filepath,
  592. UploadId: uploadID,
  593. Chunk: chunk,
  594. Opt: partOpt,
  595. }
  596. chjobs <- job
  597. }
  598. close(chjobs)
  599. // 5.Recv the resp etag to complete
  600. for i := 1; i <= partNum; i++ {
  601. res := <-chresults
  602. // Notice one part fail can not get the etag according.
  603. if res.Resp == nil {
  604. // Some part already fail, can not to get the header inside.
  605. return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, res.PartNumber)
  606. }
  607. // Notice one part fail can not get the etag according.
  608. etag := res.Resp.Header.Get("ETag")
  609. optcom.Parts = append(optcom.Parts, Object{
  610. PartNumber: res.PartNumber, ETag: etag},
  611. )
  612. }
  613. sort.Sort(ObjectList(optcom.Parts))
  614. v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
  615. return v, resp, err
  616. }
  617. type ObjectPutTaggingOptions struct {
  618. XMLName xml.Name `xml:"Tagging"`
  619. TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`
  620. }
  621. type ObjectTaggingTag BucketTaggingTag
  622. type ObjectGetTaggingResult ObjectPutTaggingOptions
  623. func (s *ObjectService) PutTagging(ctx context.Context, name string, opt *ObjectPutTaggingOptions, id ...string) (*Response, error) {
  624. var u string
  625. if len(id) == 1 {
  626. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  627. } else if len(id) == 0 {
  628. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  629. } else {
  630. return nil, errors.New("wrong params")
  631. }
  632. sendOpt := &sendOptions{
  633. baseURL: s.client.BaseURL.BucketURL,
  634. uri: u,
  635. method: http.MethodPut,
  636. body: opt,
  637. }
  638. resp, err := s.client.send(ctx, sendOpt)
  639. return resp, err
  640. }
  641. func (s *ObjectService) GetTagging(ctx context.Context, name string, id ...string) (*ObjectGetTaggingResult, *Response, error) {
  642. var u string
  643. if len(id) == 1 {
  644. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  645. } else if len(id) == 0 {
  646. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  647. } else {
  648. return nil, nil, errors.New("wrong params")
  649. }
  650. var res ObjectGetTaggingResult
  651. sendOpt := &sendOptions{
  652. baseURL: s.client.BaseURL.BucketURL,
  653. uri: u,
  654. method: http.MethodGet,
  655. result: &res,
  656. }
  657. resp, err := s.client.send(ctx, sendOpt)
  658. return &res, resp, err
  659. }
  660. func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...string) (*Response, error) {
  661. var u string
  662. if len(id) == 1 {
  663. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  664. } else if len(id) == 0 {
  665. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  666. } else {
  667. return nil, errors.New("wrong params")
  668. }
  669. sendOpt := &sendOptions{
  670. baseURL: s.client.BaseURL.BucketURL,
  671. uri: u,
  672. method: http.MethodDelete,
  673. }
  674. resp, err := s.client.send(ctx, sendOpt)
  675. return resp, err
  676. }