You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

614 lines
20 KiB

  1. package cos
  2. import (
  3. "context"
  4. "encoding/xml"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "os"
  11. "sort"
  12. "time"
  13. )
  14. // ObjectService 相关 API
  15. type ObjectService service
  16. // ObjectGetOptions is the option of GetObject
  17. type ObjectGetOptions struct {
  18. ResponseContentType string `url:"response-content-type,omitempty" header:"-"`
  19. ResponseContentLanguage string `url:"response-content-language,omitempty" header:"-"`
  20. ResponseExpires string `url:"response-expires,omitempty" header:"-"`
  21. ResponseCacheControl string `url:"response-cache-control,omitempty" header:"-"`
  22. ResponseContentDisposition string `url:"response-content-disposition,omitempty" header:"-"`
  23. ResponseContentEncoding string `url:"response-content-encoding,omitempty" header:"-"`
  24. Range string `url:"-" header:"Range,omitempty"`
  25. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  26. }
  27. // presignedURLTestingOptions is the opt of presigned url
  28. type presignedURLTestingOptions struct {
  29. authTime *AuthTime
  30. }
  31. // Get Object 请求可以将一个文件(Object)下载至本地。
  32. // 该操作需要对目标 Object 具有读权限或目标 Object 对所有人都开放了读权限(公有读)。
  33. //
  34. // https://www.qcloud.com/document/product/436/7753
  35. func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  36. var u string
  37. if len(id) == 1 {
  38. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  39. } else if len(id) == 0 {
  40. u = "/" + encodeURIComponent(name)
  41. } else {
  42. return nil, errors.New("wrong params")
  43. }
  44. sendOpt := sendOptions{
  45. baseURL: s.client.BaseURL.BucketURL,
  46. uri: u,
  47. method: http.MethodGet,
  48. optQuery: opt,
  49. optHeader: opt,
  50. disableCloseBody: true,
  51. }
  52. resp, err := s.client.send(ctx, &sendOpt)
  53. return resp, err
  54. }
  55. // GetToFile download the object to local file
  56. func (s *ObjectService) GetToFile(ctx context.Context, name, localpath string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  57. resp, err := s.Get(ctx, name, opt, id...)
  58. if err != nil {
  59. return resp, err
  60. }
  61. defer resp.Body.Close()
  62. // If file exist, overwrite it
  63. fd, err := os.OpenFile(localpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  64. if err != nil {
  65. return resp, err
  66. }
  67. _, err = io.Copy(fd, resp.Body)
  68. fd.Close()
  69. if err != nil {
  70. return resp, err
  71. }
  72. return resp, nil
  73. }
  74. // GetPresignedURL get the object presigned to down or upload file by url
  75. func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, ak, sk string, expired time.Duration, opt interface{}) (*url.URL, error) {
  76. sendOpt := sendOptions{
  77. baseURL: s.client.BaseURL.BucketURL,
  78. uri: "/" + encodeURIComponent(name),
  79. method: httpMethod,
  80. optQuery: opt,
  81. optHeader: opt,
  82. }
  83. req, err := s.client.newRequest(ctx, sendOpt.baseURL, sendOpt.uri, sendOpt.method, sendOpt.body, sendOpt.optQuery, sendOpt.optHeader)
  84. if err != nil {
  85. return nil, err
  86. }
  87. var authTime *AuthTime
  88. if opt != nil {
  89. if opt, ok := opt.(*presignedURLTestingOptions); ok {
  90. authTime = opt.authTime
  91. }
  92. }
  93. if authTime == nil {
  94. authTime = NewAuthTime(expired)
  95. }
  96. authorization := newAuthorization(ak, sk, req, authTime)
  97. sign := encodeURIComponent(authorization)
  98. if req.URL.RawQuery == "" {
  99. req.URL.RawQuery = fmt.Sprintf("sign=%s", sign)
  100. } else {
  101. req.URL.RawQuery = fmt.Sprintf("%s&sign=%s", req.URL.RawQuery, sign)
  102. }
  103. return req.URL, nil
  104. }
  105. // ObjectPutHeaderOptions the options of header of the put object
  106. type ObjectPutHeaderOptions struct {
  107. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  108. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  109. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  110. ContentType string `header:"Content-Type,omitempty" url:"-"`
  111. ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
  112. ContentLength int `header:"Content-Length,omitempty" url:"-"`
  113. Expect string `header:"Expect,omitempty" url:"-"`
  114. Expires string `header:"Expires,omitempty" url:"-"`
  115. XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
  116. // 自定义的 x-cos-meta-* header
  117. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  118. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-"`
  119. // 可选值: Normal, Appendable
  120. //XCosObjectType string `header:"x-cos-object-type,omitempty" url:"-"`
  121. // Enable Server Side Encryption, Only supported: AES256
  122. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  123. }
  124. // ObjectPutOptions the options of put object
  125. type ObjectPutOptions struct {
  126. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  127. *ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  128. }
  129. // Put Object请求可以将一个文件(Oject)上传至指定Bucket。
  130. //
  131. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  132. //
  133. // https://www.qcloud.com/document/product/436/7749
  134. func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  135. sendOpt := sendOptions{
  136. baseURL: s.client.BaseURL.BucketURL,
  137. uri: "/" + encodeURIComponent(name),
  138. method: http.MethodPut,
  139. body: r,
  140. optHeader: opt,
  141. }
  142. resp, err := s.client.send(ctx, &sendOpt)
  143. return resp, err
  144. }
  145. // PutFromFile put object from local file
  146. // Notice that when use this put large file need set non-body of debug req/resp, otherwise will out of memory
  147. func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (*Response, error) {
  148. fd, err := os.Open(filePath)
  149. if err != nil {
  150. return nil, err
  151. }
  152. defer fd.Close()
  153. return s.Put(ctx, name, fd, opt)
  154. }
  155. // ObjectCopyHeaderOptions is the head option of the Copy
  156. type ObjectCopyHeaderOptions struct {
  157. // When use replace directive to update meta infos
  158. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  159. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  160. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  161. ContentType string `header:"Content-Type,omitempty" url:"-"`
  162. Expires string `header:"Expires,omitempty" url:"-"`
  163. Expect string `header:"Expect,omitempty" url:"-"`
  164. XCosMetadataDirective string `header:"x-cos-metadata-directive,omitempty" url:"-" xml:"-"`
  165. XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-" xml:"-"`
  166. XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-" xml:"-"`
  167. XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-" xml:"-"`
  168. XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-" xml:"-"`
  169. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-" xml:"-"`
  170. // 自定义的 x-cos-meta-* header
  171. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  172. XCosCopySource string `header:"x-cos-copy-source" url:"-" xml:"-"`
  173. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  174. }
  175. // ObjectCopyOptions is the option of Copy, choose header or body
  176. type ObjectCopyOptions struct {
  177. *ObjectCopyHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  178. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  179. }
  180. // ObjectCopyResult is the result of Copy
  181. type ObjectCopyResult struct {
  182. XMLName xml.Name `xml:"CopyObjectResult"`
  183. ETag string `xml:"ETag,omitempty"`
  184. LastModified string `xml:"LastModified,omitempty"`
  185. }
  186. // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
  187. // 超过 5G 的文件请使用分块上传 Upload - Copy。在拷贝的过程中,文件元属性和 ACL 可以被修改。
  188. //
  189. // 用户可以通过该接口实现文件移动,文件重命名,修改文件属性和创建副本。
  190. //
  191. // 注意:在跨帐号复制的时候,需要先设置被复制文件的权限为公有读,或者对目标帐号赋权,同帐号则不需要。
  192. //
  193. // https://cloud.tencent.com/document/product/436/10881
  194. func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *ObjectCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
  195. var u string
  196. if len(id) == 1 {
  197. u = fmt.Sprintf("%s?versionId=%s", encodeURIComponent(sourceURL), id[0])
  198. } else if len(id) == 0 {
  199. u = encodeURIComponent(sourceURL)
  200. } else {
  201. return nil, nil, errors.New("wrong params")
  202. }
  203. var res ObjectCopyResult
  204. if opt == nil {
  205. opt = new(ObjectCopyOptions)
  206. }
  207. if opt.ObjectCopyHeaderOptions == nil {
  208. opt.ObjectCopyHeaderOptions = new(ObjectCopyHeaderOptions)
  209. }
  210. opt.XCosCopySource = u
  211. sendOpt := sendOptions{
  212. baseURL: s.client.BaseURL.BucketURL,
  213. uri: "/" + encodeURIComponent(name),
  214. method: http.MethodPut,
  215. body: nil,
  216. optHeader: opt,
  217. result: &res,
  218. }
  219. resp, err := s.client.send(ctx, &sendOpt)
  220. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  221. if err == nil && resp.StatusCode == 200 {
  222. if res.ETag == "" {
  223. return &res, resp, errors.New("response 200 OK, but body contains an error")
  224. }
  225. }
  226. return &res, resp, err
  227. }
  228. // Delete Object请求可以将一个文件(Object)删除。
  229. //
  230. // https://www.qcloud.com/document/product/436/7743
  231. func (s *ObjectService) Delete(ctx context.Context, name string) (*Response, error) {
  232. // When use "" string might call the delete bucket interface
  233. if len(name) == 0 {
  234. return nil, errors.New("empty object name")
  235. }
  236. sendOpt := sendOptions{
  237. baseURL: s.client.BaseURL.BucketURL,
  238. uri: "/" + encodeURIComponent(name),
  239. method: http.MethodDelete,
  240. }
  241. resp, err := s.client.send(ctx, &sendOpt)
  242. return resp, err
  243. }
  244. // ObjectHeadOptions is the option of HeadObject
  245. type ObjectHeadOptions struct {
  246. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  247. }
  248. // Head Object请求可以取回对应Object的元数据,Head的权限与Get的权限一致
  249. //
  250. // https://www.qcloud.com/document/product/436/7745
  251. func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOptions, id ...string) (*Response, error) {
  252. var u string
  253. if len(id) == 1 {
  254. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  255. } else if len(id) == 0 {
  256. u = "/" + encodeURIComponent(name)
  257. } else {
  258. return nil, errors.New("wrong params")
  259. }
  260. sendOpt := sendOptions{
  261. baseURL: s.client.BaseURL.BucketURL,
  262. uri: u,
  263. method: http.MethodHead,
  264. optHeader: opt,
  265. }
  266. resp, err := s.client.send(ctx, &sendOpt)
  267. if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
  268. resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
  269. }
  270. return resp, err
  271. }
  272. // ObjectOptionsOptions is the option of object options
  273. type ObjectOptionsOptions struct {
  274. Origin string `url:"-" header:"Origin"`
  275. AccessControlRequestMethod string `url:"-" header:"Access-Control-Request-Method"`
  276. AccessControlRequestHeaders string `url:"-" header:"Access-Control-Request-Headers,omitempty"`
  277. }
  278. // Options Object请求实现跨域访问的预请求。即发出一个 OPTIONS 请求给服务器以确认是否可以进行跨域操作。
  279. //
  280. // 当CORS配置不存在时,请求返回403 Forbidden。
  281. //
  282. // https://www.qcloud.com/document/product/436/8288
  283. func (s *ObjectService) Options(ctx context.Context, name string, opt *ObjectOptionsOptions) (*Response, error) {
  284. sendOpt := sendOptions{
  285. baseURL: s.client.BaseURL.BucketURL,
  286. uri: "/" + encodeURIComponent(name),
  287. method: http.MethodOptions,
  288. optHeader: opt,
  289. }
  290. resp, err := s.client.send(ctx, &sendOpt)
  291. return resp, err
  292. }
  293. // CASJobParameters support three way: Standard(in 35 hours), Expedited(quick way, in 15 mins), Bulk(in 5-12 hours_
  294. type CASJobParameters struct {
  295. Tier string `xml:"Tier"`
  296. }
  297. // ObjectRestoreOptions is the option of object restore
  298. type ObjectRestoreOptions struct {
  299. XMLName xml.Name `xml:"RestoreRequest"`
  300. Days int `xml:"Days"`
  301. Tier *CASJobParameters `xml:"CASJobParameters"`
  302. }
  303. // PutRestore API can recover an object of type archived by COS archive.
  304. //
  305. // https://cloud.tencent.com/document/product/436/12633
  306. func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *ObjectRestoreOptions) (*Response, error) {
  307. u := fmt.Sprintf("/%s?restore", encodeURIComponent(name))
  308. sendOpt := sendOptions{
  309. baseURL: s.client.BaseURL.BucketURL,
  310. uri: u,
  311. method: http.MethodPost,
  312. body: opt,
  313. }
  314. resp, err := s.client.send(ctx, &sendOpt)
  315. return resp, err
  316. }
  317. // TODO Append 接口在优化未开放使用
  318. //
  319. // Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
  320. // 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
  321. //
  322. // 文件属性可以在Head Object操作中被查询到,当您发起Head Object请求时,会返回自定义Header『x-cos-object-type』,该Header只有两个枚举值:Normal或者Appendable。
  323. //
  324. // 追加上传建议文件大小1M - 5G。如果position的值和当前Object的长度不致,COS会返回409错误。
  325. // 如果Append一个Normal的Object,COS会返回409 ObjectNotAppendable。
  326. //
  327. // Appendable的文件不可以被复制,不参与版本管理,不参与生命周期管理,不可跨区域复制。
  328. //
  329. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  330. //
  331. // https://www.qcloud.com/document/product/436/7741
  332. // func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  333. // u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
  334. // if position != 0{
  335. // opt = nil
  336. // }
  337. // sendOpt := sendOptions{
  338. // baseURL: s.client.BaseURL.BucketURL,
  339. // uri: u,
  340. // method: http.MethodPost,
  341. // optHeader: opt,
  342. // body: r,
  343. // }
  344. // resp, err := s.client.send(ctx, &sendOpt)
  345. // return resp, err
  346. // }
  347. // ObjectDeleteMultiOptions is the option of DeleteMulti
  348. type ObjectDeleteMultiOptions struct {
  349. XMLName xml.Name `xml:"Delete" header:"-"`
  350. Quiet bool `xml:"Quiet" header:"-"`
  351. Objects []Object `xml:"Object" header:"-"`
  352. //XCosSha1 string `xml:"-" header:"x-cos-sha1"`
  353. }
  354. // ObjectDeleteMultiResult is the result of DeleteMulti
  355. type ObjectDeleteMultiResult struct {
  356. XMLName xml.Name `xml:"DeleteResult"`
  357. DeletedObjects []Object `xml:"Deleted,omitempty"`
  358. Errors []struct {
  359. Key string
  360. Code string
  361. Message string
  362. } `xml:"Error,omitempty"`
  363. }
  364. // DeleteMulti 请求实现批量删除文件,最大支持单次删除1000个文件。
  365. // 对于返回结果,COS提供Verbose和Quiet两种结果模式。Verbose模式将返回每个Object的删除结果;
  366. // Quiet模式只返回报错的Object信息。
  367. // https://www.qcloud.com/document/product/436/8289
  368. func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiOptions) (*ObjectDeleteMultiResult, *Response, error) {
  369. var res ObjectDeleteMultiResult
  370. sendOpt := sendOptions{
  371. baseURL: s.client.BaseURL.BucketURL,
  372. uri: "/?delete",
  373. method: http.MethodPost,
  374. body: opt,
  375. result: &res,
  376. }
  377. resp, err := s.client.send(ctx, &sendOpt)
  378. return &res, resp, err
  379. }
  380. // Object is the meta info of the object
  381. type Object struct {
  382. Key string `xml:",omitempty"`
  383. ETag string `xml:",omitempty"`
  384. Size int `xml:",omitempty"`
  385. PartNumber int `xml:",omitempty"`
  386. LastModified string `xml:",omitempty"`
  387. StorageClass string `xml:",omitempty"`
  388. Owner *Owner `xml:",omitempty"`
  389. }
  390. // MultiUploadOptions is the option of the multiupload,
  391. // ThreadPoolSize default is one
  392. type MultiUploadOptions struct {
  393. OptIni *InitiateMultipartUploadOptions
  394. PartSize int64
  395. ThreadPoolSize int
  396. }
  397. type Chunk struct {
  398. Number int
  399. OffSet int64
  400. Size int64
  401. }
  402. // jobs
  403. type Jobs struct {
  404. Name string
  405. UploadId string
  406. FilePath string
  407. RetryTimes int
  408. Chunk Chunk
  409. Data io.Reader
  410. Opt *ObjectUploadPartOptions
  411. }
  412. type Results struct {
  413. PartNumber int
  414. Resp *Response
  415. }
  416. func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  417. for j := range jobs {
  418. fd, err := os.Open(j.FilePath)
  419. var res Results
  420. if err != nil {
  421. res.PartNumber = j.Chunk.Number
  422. res.Resp = nil
  423. results <- &res
  424. }
  425. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  426. // UploadPart do not support the chunk trsf, so need to add the content-length
  427. opt := &ObjectUploadPartOptions{
  428. ContentLength: int(j.Chunk.Size),
  429. }
  430. rt := j.RetryTimes
  431. for {
  432. resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
  433. &io.LimitedReader{R: fd, N: j.Chunk.Size}, opt)
  434. res.PartNumber = j.Chunk.Number
  435. res.Resp = resp
  436. if err != nil {
  437. rt--
  438. if rt == 0 {
  439. fd.Close()
  440. results <- &res
  441. break
  442. }
  443. continue
  444. }
  445. fd.Close()
  446. results <- &res
  447. break
  448. }
  449. }
  450. }
  451. func SplitFileIntoChunks(filePath string, partSize int64) ([]Chunk, int, error) {
  452. if filePath == "" || partSize <= 0 {
  453. return nil, 0, errors.New("chunkSize invalid")
  454. }
  455. file, err := os.Open(filePath)
  456. if err != nil {
  457. return nil, 0, err
  458. }
  459. defer file.Close()
  460. stat, err := file.Stat()
  461. if err != nil {
  462. return nil, 0, err
  463. }
  464. var partNum = stat.Size() / partSize
  465. // 10000 max part size
  466. if partNum >= 10000 {
  467. return nil, 0, errors.New("Too many parts, out of 10000")
  468. }
  469. var chunks []Chunk
  470. var chunk = Chunk{}
  471. for i := int64(0); i < partNum; i++ {
  472. chunk.Number = int(i + 1)
  473. chunk.OffSet = i * partSize
  474. chunk.Size = partSize
  475. chunks = append(chunks, chunk)
  476. }
  477. if stat.Size()%partSize > 0 {
  478. chunk.Number = len(chunks) + 1
  479. chunk.OffSet = int64(len(chunks)) * partSize
  480. chunk.Size = stat.Size() % partSize
  481. chunks = append(chunks, chunk)
  482. partNum++
  483. }
  484. return chunks, int(partNum), nil
  485. }
  486. // MultiUpload 为高级upload接口,并发分块上传
  487. // 注意该接口目前只供参考
  488. //
  489. // 需要指定分块大小 partSize >= 1 ,单位为MB
  490. // 同时请确认分块数量不超过10000
  491. //
  492. func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  493. // 1.Get the file chunk
  494. bufSize := opt.PartSize * 1024 * 1024
  495. chunks, partNum, err := SplitFileIntoChunks(filepath, bufSize)
  496. if err != nil {
  497. return nil, nil, err
  498. }
  499. // 2.Init
  500. optini := opt.OptIni
  501. res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
  502. if err != nil {
  503. return nil, nil, err
  504. }
  505. uploadID := res.UploadID
  506. var poolSize int
  507. if opt.ThreadPoolSize > 0 {
  508. poolSize = opt.ThreadPoolSize
  509. } else {
  510. // Default is one
  511. poolSize = 1
  512. }
  513. chjobs := make(chan *Jobs, 100)
  514. chresults := make(chan *Results, 10000)
  515. optcom := &CompleteMultipartUploadOptions{}
  516. // 3.Start worker
  517. for w := 1; w <= poolSize; w++ {
  518. go worker(s, chjobs, chresults)
  519. }
  520. // 4.Push jobs
  521. for _, chunk := range chunks {
  522. job := &Jobs{
  523. Name: name,
  524. RetryTimes: 3,
  525. FilePath: filepath,
  526. UploadId: uploadID,
  527. Chunk: chunk,
  528. }
  529. chjobs <- job
  530. }
  531. close(chjobs)
  532. // 5.Recv the resp etag to complete
  533. for i := 1; i <= partNum; i++ {
  534. res := <-chresults
  535. // Notice one part fail can not get the etag according.
  536. if res.Resp == nil {
  537. // Some part already fail, can not to get the header inside.
  538. return nil, nil, fmt.Errorf("UploadID %s, part %d failed to get resp content.", uploadID, res.PartNumber)
  539. }
  540. // Notice one part fail can not get the etag according.
  541. etag := res.Resp.Header.Get("ETag")
  542. optcom.Parts = append(optcom.Parts, Object{
  543. PartNumber: res.PartNumber, ETag: etag},
  544. )
  545. }
  546. sort.Sort(ObjectList(optcom.Parts))
  547. v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
  548. return v, resp, err
  549. }