You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

744 lines
26 KiB

  1. package cos
  2. import (
  3. "context"
  4. "encoding/xml"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "os"
  11. "sort"
  12. "time"
  13. )
  14. // ObjectService 相关 API
  15. type ObjectService service
  16. // ObjectGetOptions is the option of GetObject
  17. type ObjectGetOptions struct {
  18. ResponseContentType string `url:"response-content-type,omitempty" header:"-"`
  19. ResponseContentLanguage string `url:"response-content-language,omitempty" header:"-"`
  20. ResponseExpires string `url:"response-expires,omitempty" header:"-"`
  21. ResponseCacheControl string `url:"response-cache-control,omitempty" header:"-"`
  22. ResponseContentDisposition string `url:"response-content-disposition,omitempty" header:"-"`
  23. ResponseContentEncoding string `url:"response-content-encoding,omitempty" header:"-"`
  24. Range string `url:"-" header:"Range,omitempty"`
  25. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  26. // SSE-C
  27. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  28. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  29. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  30. }
  31. // presignedURLTestingOptions is the opt of presigned url
  32. type presignedURLTestingOptions struct {
  33. authTime *AuthTime
  34. }
  35. // Get Object 请求可以将一个文件(Object)下载至本地。
  36. // 该操作需要对目标 Object 具有读权限或目标 Object 对所有人都开放了读权限(公有读)。
  37. //
  38. // https://www.qcloud.com/document/product/436/7753
  39. func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  40. var u string
  41. if len(id) == 1 {
  42. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  43. } else if len(id) == 0 {
  44. u = "/" + encodeURIComponent(name)
  45. } else {
  46. return nil, errors.New("wrong params")
  47. }
  48. sendOpt := sendOptions{
  49. baseURL: s.client.BaseURL.BucketURL,
  50. uri: u,
  51. method: http.MethodGet,
  52. optQuery: opt,
  53. optHeader: opt,
  54. disableCloseBody: true,
  55. }
  56. resp, err := s.client.send(ctx, &sendOpt)
  57. return resp, err
  58. }
  59. // GetToFile download the object to local file
  60. func (s *ObjectService) GetToFile(ctx context.Context, name, localpath string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  61. resp, err := s.Get(ctx, name, opt, id...)
  62. if err != nil {
  63. return resp, err
  64. }
  65. defer resp.Body.Close()
  66. // If file exist, overwrite it
  67. fd, err := os.OpenFile(localpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  68. if err != nil {
  69. return resp, err
  70. }
  71. _, err = io.Copy(fd, resp.Body)
  72. fd.Close()
  73. if err != nil {
  74. return resp, err
  75. }
  76. return resp, nil
  77. }
  78. // GetPresignedURL get the object presigned to down or upload file by url
  79. func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, ak, sk string, expired time.Duration, opt interface{}) (*url.URL, error) {
  80. sendOpt := sendOptions{
  81. baseURL: s.client.BaseURL.BucketURL,
  82. uri: "/" + encodeURIComponent(name),
  83. method: httpMethod,
  84. optQuery: opt,
  85. optHeader: opt,
  86. }
  87. req, err := s.client.newRequest(ctx, sendOpt.baseURL, sendOpt.uri, sendOpt.method, sendOpt.body, sendOpt.optQuery, sendOpt.optHeader)
  88. if err != nil {
  89. return nil, err
  90. }
  91. var authTime *AuthTime
  92. if opt != nil {
  93. if opt, ok := opt.(*presignedURLTestingOptions); ok {
  94. authTime = opt.authTime
  95. }
  96. }
  97. if authTime == nil {
  98. authTime = NewAuthTime(expired)
  99. }
  100. authorization := newAuthorization(ak, sk, req, authTime)
  101. sign := encodeURIComponent(authorization)
  102. if req.URL.RawQuery == "" {
  103. req.URL.RawQuery = fmt.Sprintf("sign=%s", sign)
  104. } else {
  105. req.URL.RawQuery = fmt.Sprintf("%s&sign=%s", req.URL.RawQuery, sign)
  106. }
  107. return req.URL, nil
  108. }
  109. // ObjectPutHeaderOptions the options of header of the put object
  110. type ObjectPutHeaderOptions struct {
  111. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  112. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  113. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  114. ContentType string `header:"Content-Type,omitempty" url:"-"`
  115. ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
  116. ContentLength int `header:"Content-Length,omitempty" url:"-"`
  117. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  118. Expect string `header:"Expect,omitempty" url:"-"`
  119. Expires string `header:"Expires,omitempty" url:"-"`
  120. XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
  121. // 自定义的 x-cos-meta-* header
  122. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  123. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-"`
  124. // 可选值: Normal, Appendable
  125. //XCosObjectType string `header:"x-cos-object-type,omitempty" url:"-"`
  126. // Enable Server Side Encryption, Only supported: AES256
  127. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  128. // SSE-C
  129. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  130. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  131. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  132. //兼容其他自定义头部
  133. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  134. }
  135. // ObjectPutOptions the options of put object
  136. type ObjectPutOptions struct {
  137. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  138. *ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  139. }
  140. // Put Object请求可以将一个文件(Oject)上传至指定Bucket。
  141. //
  142. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  143. //
  144. // https://www.qcloud.com/document/product/436/7749
  145. func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  146. sendOpt := sendOptions{
  147. baseURL: s.client.BaseURL.BucketURL,
  148. uri: "/" + encodeURIComponent(name),
  149. method: http.MethodPut,
  150. body: r,
  151. optHeader: opt,
  152. }
  153. resp, err := s.client.send(ctx, &sendOpt)
  154. return resp, err
  155. }
  156. // PutFromFile put object from local file
  157. // Notice that when use this put large file need set non-body of debug req/resp, otherwise will out of memory
  158. func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (*Response, error) {
  159. fd, err := os.Open(filePath)
  160. if err != nil {
  161. return nil, err
  162. }
  163. defer fd.Close()
  164. return s.Put(ctx, name, fd, opt)
  165. }
  166. // ObjectCopyHeaderOptions is the head option of the Copy
  167. type ObjectCopyHeaderOptions struct {
  168. // When use replace directive to update meta infos
  169. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  170. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  171. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  172. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  173. ContentType string `header:"Content-Type,omitempty" url:"-"`
  174. Expires string `header:"Expires,omitempty" url:"-"`
  175. Expect string `header:"Expect,omitempty" url:"-"`
  176. XCosMetadataDirective string `header:"x-cos-metadata-directive,omitempty" url:"-" xml:"-"`
  177. XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-" xml:"-"`
  178. XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-" xml:"-"`
  179. XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-" xml:"-"`
  180. XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-" xml:"-"`
  181. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-" xml:"-"`
  182. // 自定义的 x-cos-meta-* header
  183. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  184. XCosCopySource string `header:"x-cos-copy-source" url:"-" xml:"-"`
  185. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  186. // SSE-C
  187. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  188. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  189. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  190. XCosCopySourceSSECustomerAglo string `header:"x-cos-copy-source-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  191. XCosCopySourceSSECustomerKey string `header:"x-cos-copy-source-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  192. XCosCopySourceSSECustomerKeyMD5 string `header:"x-cos-copy-source-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  193. //兼容其他自定义头部
  194. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  195. }
  196. // ObjectCopyOptions is the option of Copy, choose header or body
  197. type ObjectCopyOptions struct {
  198. *ObjectCopyHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  199. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  200. }
  201. // ObjectCopyResult is the result of Copy
  202. type ObjectCopyResult struct {
  203. XMLName xml.Name `xml:"CopyObjectResult"`
  204. ETag string `xml:"ETag,omitempty"`
  205. LastModified string `xml:"LastModified,omitempty"`
  206. }
  207. // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
  208. // 超过 5G 的文件请使用分块上传 Upload - Copy。在拷贝的过程中,文件元属性和 ACL 可以被修改。
  209. //
  210. // 用户可以通过该接口实现文件移动,文件重命名,修改文件属性和创建副本。
  211. //
  212. // 注意:在跨帐号复制的时候,需要先设置被复制文件的权限为公有读,或者对目标帐号赋权,同帐号则不需要。
  213. //
  214. // https://cloud.tencent.com/document/product/436/10881
  215. func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *ObjectCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
  216. var u string
  217. if len(id) == 1 {
  218. u = fmt.Sprintf("%s?versionId=%s", encodeURIComponent(sourceURL), id[0])
  219. } else if len(id) == 0 {
  220. u = encodeURIComponent(sourceURL)
  221. } else {
  222. return nil, nil, errors.New("wrong params")
  223. }
  224. var res ObjectCopyResult
  225. if opt == nil {
  226. opt = new(ObjectCopyOptions)
  227. }
  228. if opt.ObjectCopyHeaderOptions == nil {
  229. opt.ObjectCopyHeaderOptions = new(ObjectCopyHeaderOptions)
  230. }
  231. opt.XCosCopySource = u
  232. sendOpt := sendOptions{
  233. baseURL: s.client.BaseURL.BucketURL,
  234. uri: "/" + encodeURIComponent(name),
  235. method: http.MethodPut,
  236. body: nil,
  237. optHeader: opt,
  238. result: &res,
  239. }
  240. resp, err := s.client.send(ctx, &sendOpt)
  241. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  242. if err == nil && resp.StatusCode == 200 {
  243. if res.ETag == "" {
  244. return &res, resp, errors.New("response 200 OK, but body contains an error")
  245. }
  246. }
  247. return &res, resp, err
  248. }
  249. type ObjectDeleteOptions struct {
  250. // SSE-C
  251. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  252. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  253. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  254. //兼容其他自定义头部
  255. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  256. }
  257. // Delete Object请求可以将一个文件(Object)删除。
  258. //
  259. // https://www.qcloud.com/document/product/436/7743
  260. func (s *ObjectService) Delete(ctx context.Context, name string, opt ...*ObjectDeleteOptions) (*Response, error) {
  261. var optHeader *ObjectDeleteOptions
  262. // When use "" string might call the delete bucket interface
  263. if len(name) == 0 {
  264. return nil, errors.New("empty object name")
  265. }
  266. if len(opt) > 0 {
  267. optHeader = opt[0]
  268. }
  269. sendOpt := sendOptions{
  270. baseURL: s.client.BaseURL.BucketURL,
  271. uri: "/" + encodeURIComponent(name),
  272. method: http.MethodDelete,
  273. optHeader: optHeader,
  274. }
  275. resp, err := s.client.send(ctx, &sendOpt)
  276. return resp, err
  277. }
  278. // ObjectHeadOptions is the option of HeadObject
  279. type ObjectHeadOptions struct {
  280. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  281. // SSE-C
  282. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  283. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  284. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  285. }
  286. // Head Object请求可以取回对应Object的元数据,Head的权限与Get的权限一致
  287. //
  288. // https://www.qcloud.com/document/product/436/7745
  289. func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOptions, id ...string) (*Response, error) {
  290. var u string
  291. if len(id) == 1 {
  292. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  293. } else if len(id) == 0 {
  294. u = "/" + encodeURIComponent(name)
  295. } else {
  296. return nil, errors.New("wrong params")
  297. }
  298. sendOpt := sendOptions{
  299. baseURL: s.client.BaseURL.BucketURL,
  300. uri: u,
  301. method: http.MethodHead,
  302. optHeader: opt,
  303. }
  304. resp, err := s.client.send(ctx, &sendOpt)
  305. if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
  306. resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
  307. }
  308. return resp, err
  309. }
  310. // ObjectOptionsOptions is the option of object options
  311. type ObjectOptionsOptions struct {
  312. Origin string `url:"-" header:"Origin"`
  313. AccessControlRequestMethod string `url:"-" header:"Access-Control-Request-Method"`
  314. AccessControlRequestHeaders string `url:"-" header:"Access-Control-Request-Headers,omitempty"`
  315. }
  316. // Options Object请求实现跨域访问的预请求。即发出一个 OPTIONS 请求给服务器以确认是否可以进行跨域操作。
  317. //
  318. // 当CORS配置不存在时,请求返回403 Forbidden。
  319. //
  320. // https://www.qcloud.com/document/product/436/8288
  321. func (s *ObjectService) Options(ctx context.Context, name string, opt *ObjectOptionsOptions) (*Response, error) {
  322. sendOpt := sendOptions{
  323. baseURL: s.client.BaseURL.BucketURL,
  324. uri: "/" + encodeURIComponent(name),
  325. method: http.MethodOptions,
  326. optHeader: opt,
  327. }
  328. resp, err := s.client.send(ctx, &sendOpt)
  329. return resp, err
  330. }
  331. // CASJobParameters support three way: Standard(in 35 hours), Expedited(quick way, in 15 mins), Bulk(in 5-12 hours_
  332. type CASJobParameters struct {
  333. Tier string `xml:"Tier"`
  334. }
  335. // ObjectRestoreOptions is the option of object restore
  336. type ObjectRestoreOptions struct {
  337. XMLName xml.Name `xml:"RestoreRequest"`
  338. Days int `xml:"Days"`
  339. Tier *CASJobParameters `xml:"CASJobParameters"`
  340. }
  341. // PutRestore API can recover an object of type archived by COS archive.
  342. //
  343. // https://cloud.tencent.com/document/product/436/12633
  344. func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *ObjectRestoreOptions) (*Response, error) {
  345. u := fmt.Sprintf("/%s?restore", encodeURIComponent(name))
  346. sendOpt := sendOptions{
  347. baseURL: s.client.BaseURL.BucketURL,
  348. uri: u,
  349. method: http.MethodPost,
  350. body: opt,
  351. }
  352. resp, err := s.client.send(ctx, &sendOpt)
  353. return resp, err
  354. }
  355. // TODO Append 接口在优化未开放使用
  356. //
  357. // Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
  358. // 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
  359. //
  360. // 文件属性可以在Head Object操作中被查询到,当您发起Head Object请求时,会返回自定义Header『x-cos-object-type』,该Header只有两个枚举值:Normal或者Appendable。
  361. //
  362. // 追加上传建议文件大小1M - 5G。如果position的值和当前Object的长度不致,COS会返回409错误。
  363. // 如果Append一个Normal的Object,COS会返回409 ObjectNotAppendable。
  364. //
  365. // Appendable的文件不可以被复制,不参与版本管理,不参与生命周期管理,不可跨区域复制。
  366. //
  367. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  368. //
  369. // https://www.qcloud.com/document/product/436/7741
  370. // func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  371. // u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
  372. // if position != 0{
  373. // opt = nil
  374. // }
  375. // sendOpt := sendOptions{
  376. // baseURL: s.client.BaseURL.BucketURL,
  377. // uri: u,
  378. // method: http.MethodPost,
  379. // optHeader: opt,
  380. // body: r,
  381. // }
  382. // resp, err := s.client.send(ctx, &sendOpt)
  383. // return resp, err
  384. // }
  385. // ObjectDeleteMultiOptions is the option of DeleteMulti
  386. type ObjectDeleteMultiOptions struct {
  387. XMLName xml.Name `xml:"Delete" header:"-"`
  388. Quiet bool `xml:"Quiet" header:"-"`
  389. Objects []Object `xml:"Object" header:"-"`
  390. //XCosSha1 string `xml:"-" header:"x-cos-sha1"`
  391. }
  392. // ObjectDeleteMultiResult is the result of DeleteMulti
  393. type ObjectDeleteMultiResult struct {
  394. XMLName xml.Name `xml:"DeleteResult"`
  395. DeletedObjects []Object `xml:"Deleted,omitempty"`
  396. Errors []struct {
  397. Key string
  398. Code string
  399. Message string
  400. } `xml:"Error,omitempty"`
  401. }
  402. // DeleteMulti 请求实现批量删除文件,最大支持单次删除1000个文件。
  403. // 对于返回结果,COS提供Verbose和Quiet两种结果模式。Verbose模式将返回每个Object的删除结果;
  404. // Quiet模式只返回报错的Object信息。
  405. // https://www.qcloud.com/document/product/436/8289
  406. func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiOptions) (*ObjectDeleteMultiResult, *Response, error) {
  407. var res ObjectDeleteMultiResult
  408. sendOpt := sendOptions{
  409. baseURL: s.client.BaseURL.BucketURL,
  410. uri: "/?delete",
  411. method: http.MethodPost,
  412. body: opt,
  413. result: &res,
  414. }
  415. resp, err := s.client.send(ctx, &sendOpt)
  416. return &res, resp, err
  417. }
  418. // Object is the meta info of the object
  419. type Object struct {
  420. Key string `xml:",omitempty"`
  421. ETag string `xml:",omitempty"`
  422. Size int `xml:",omitempty"`
  423. PartNumber int `xml:",omitempty"`
  424. LastModified string `xml:",omitempty"`
  425. StorageClass string `xml:",omitempty"`
  426. Owner *Owner `xml:",omitempty"`
  427. }
  428. // MultiUploadOptions is the option of the multiupload,
  429. // ThreadPoolSize default is one
  430. type MultiUploadOptions struct {
  431. OptIni *InitiateMultipartUploadOptions
  432. PartSize int64
  433. ThreadPoolSize int
  434. }
  435. type Chunk struct {
  436. Number int
  437. OffSet int64
  438. Size int64
  439. }
  440. // jobs
  441. type Jobs struct {
  442. Name string
  443. UploadId string
  444. FilePath string
  445. RetryTimes int
  446. Chunk Chunk
  447. Data io.Reader
  448. Opt *ObjectUploadPartOptions
  449. }
  450. type Results struct {
  451. PartNumber int
  452. Resp *Response
  453. }
  454. func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  455. for j := range jobs {
  456. fd, err := os.Open(j.FilePath)
  457. var res Results
  458. if err != nil {
  459. res.PartNumber = j.Chunk.Number
  460. res.Resp = nil
  461. results <- &res
  462. }
  463. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  464. // UploadPart do not support the chunk trsf, so need to add the content-length
  465. j.Opt.ContentLength = int(j.Chunk.Size)
  466. rt := j.RetryTimes
  467. for {
  468. resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
  469. &io.LimitedReader{R: fd, N: j.Chunk.Size}, j.Opt)
  470. res.PartNumber = j.Chunk.Number
  471. res.Resp = resp
  472. if err != nil {
  473. rt--
  474. if rt == 0 {
  475. fd.Close()
  476. results <- &res
  477. break
  478. }
  479. continue
  480. }
  481. fd.Close()
  482. results <- &res
  483. break
  484. }
  485. }
  486. }
  487. func DividePart(fileSize int64) (int64, int64) {
  488. partSize := int64(1 * 1024 * 1024)
  489. partNum := fileSize / partSize
  490. for partNum >= 10000 {
  491. partSize = partSize * 2
  492. partNum = fileSize / partSize
  493. }
  494. return partNum, partSize
  495. }
  496. func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) {
  497. if filePath == "" {
  498. return nil, 0, errors.New("filePath invalid")
  499. }
  500. file, err := os.Open(filePath)
  501. if err != nil {
  502. return nil, 0, err
  503. }
  504. defer file.Close()
  505. stat, err := file.Stat()
  506. if err != nil {
  507. return nil, 0, err
  508. }
  509. var partNum int64
  510. if partSize > 0 {
  511. partSize = partSize * 1024 * 1024
  512. partNum = stat.Size() / partSize
  513. if partNum >= 10000 {
  514. return nil, 0, errors.New("Too many parts, out of 10000")
  515. }
  516. } else {
  517. partNum, partSize = DividePart(stat.Size())
  518. }
  519. var chunks []Chunk
  520. var chunk = Chunk{}
  521. for i := int64(0); i < partNum; i++ {
  522. chunk.Number = int(i + 1)
  523. chunk.OffSet = i * partSize
  524. chunk.Size = partSize
  525. chunks = append(chunks, chunk)
  526. }
  527. if stat.Size()%partSize > 0 {
  528. chunk.Number = len(chunks) + 1
  529. chunk.OffSet = int64(len(chunks)) * partSize
  530. chunk.Size = stat.Size() % partSize
  531. chunks = append(chunks, chunk)
  532. partNum++
  533. }
  534. return chunks, int(partNum), nil
  535. }
  536. // MultiUpload/Upload 为高级upload接口,并发分块上传
  537. // 注意该接口目前只供参考
  538. //
  539. // 当 partSize > 0 时,由调用者指定分块大小,否则由 SDK 自动切分,单位为MB
  540. // 由调用者指定分块大小时,请确认分块数量不超过10000
  541. //
  542. func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  543. return s.Upload(ctx, name, filepath, opt)
  544. }
  545. func (s *ObjectService) Upload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  546. if opt == nil {
  547. opt = &MultiUploadOptions{}
  548. }
  549. // 1.Get the file chunk
  550. chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize)
  551. if err != nil {
  552. return nil, nil, err
  553. }
  554. // 2.Init
  555. optini := opt.OptIni
  556. res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
  557. if err != nil {
  558. return nil, nil, err
  559. }
  560. uploadID := res.UploadID
  561. var poolSize int
  562. if opt.ThreadPoolSize > 0 {
  563. poolSize = opt.ThreadPoolSize
  564. } else {
  565. // Default is one
  566. poolSize = 1
  567. }
  568. chjobs := make(chan *Jobs, 100)
  569. chresults := make(chan *Results, 10000)
  570. optcom := &CompleteMultipartUploadOptions{}
  571. // 3.Start worker
  572. for w := 1; w <= poolSize; w++ {
  573. go worker(s, chjobs, chresults)
  574. }
  575. // 4.Push jobs
  576. for _, chunk := range chunks {
  577. partOpt := &ObjectUploadPartOptions{}
  578. if optini != nil && optini.ObjectPutHeaderOptions != nil {
  579. partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
  580. partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
  581. partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
  582. }
  583. job := &Jobs{
  584. Name: name,
  585. RetryTimes: 3,
  586. FilePath: filepath,
  587. UploadId: uploadID,
  588. Chunk: chunk,
  589. Opt: partOpt,
  590. }
  591. chjobs <- job
  592. }
  593. close(chjobs)
  594. // 5.Recv the resp etag to complete
  595. for i := 1; i <= partNum; i++ {
  596. res := <-chresults
  597. // Notice one part fail can not get the etag according.
  598. if res.Resp == nil {
  599. // Some part already fail, can not to get the header inside.
  600. return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, res.PartNumber)
  601. }
  602. // Notice one part fail can not get the etag according.
  603. etag := res.Resp.Header.Get("ETag")
  604. optcom.Parts = append(optcom.Parts, Object{
  605. PartNumber: res.PartNumber, ETag: etag},
  606. )
  607. }
  608. sort.Sort(ObjectList(optcom.Parts))
  609. v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
  610. return v, resp, err
  611. }
  612. type ObjectPutTaggingOptions struct {
  613. XMLName xml.Name `xml:"Tagging"`
  614. TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`
  615. }
  616. type ObjectTaggingTag BucketTaggingTag
  617. type ObjectGetTaggingResult ObjectPutTaggingOptions
  618. func (s *ObjectService) PutTagging(ctx context.Context, name string, opt *ObjectPutTaggingOptions, id ...string) (*Response, error) {
  619. var u string
  620. if len(id) == 1 {
  621. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  622. } else if len(id) == 0 {
  623. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  624. } else {
  625. return nil, errors.New("wrong params")
  626. }
  627. sendOpt := &sendOptions{
  628. baseURL: s.client.BaseURL.BucketURL,
  629. uri: u,
  630. method: http.MethodPut,
  631. body: opt,
  632. }
  633. resp, err := s.client.send(ctx, sendOpt)
  634. return resp, err
  635. }
  636. func (s *ObjectService) GetTagging(ctx context.Context, name string, id ...string) (*ObjectGetTaggingResult, *Response, error) {
  637. var u string
  638. if len(id) == 1 {
  639. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  640. } else if len(id) == 0 {
  641. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  642. } else {
  643. return nil, nil, errors.New("wrong params")
  644. }
  645. var res ObjectGetTaggingResult
  646. sendOpt := &sendOptions{
  647. baseURL: s.client.BaseURL.BucketURL,
  648. uri: u,
  649. method: http.MethodGet,
  650. result: &res,
  651. }
  652. resp, err := s.client.send(ctx, sendOpt)
  653. return &res, resp, err
  654. }
  655. func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...string) (*Response, error) {
  656. var u string
  657. if len(id) == 1 {
  658. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  659. } else if len(id) == 0 {
  660. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  661. } else {
  662. return nil, errors.New("wrong params")
  663. }
  664. sendOpt := &sendOptions{
  665. baseURL: s.client.BaseURL.BucketURL,
  666. uri: u,
  667. method: http.MethodDelete,
  668. }
  669. resp, err := s.client.send(ctx, sendOpt)
  670. return resp, err
  671. }