You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

516 lines
18 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package cos
  2. import (
  3. "context"
  4. "encoding/xml"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "sort"
  11. "strings"
  12. "time"
  13. )
  14. // InitiateMultipartUploadOptions is the option of InitateMultipartUpload
  15. type InitiateMultipartUploadOptions struct {
  16. *ACLHeaderOptions
  17. *ObjectPutHeaderOptions
  18. }
  19. // InitiateMultipartUploadResult is the result of InitateMultipartUpload
  20. type InitiateMultipartUploadResult struct {
  21. XMLName xml.Name `xml:"InitiateMultipartUploadResult"`
  22. Bucket string
  23. Key string
  24. UploadID string `xml:"UploadId"`
  25. }
  26. // InitiateMultipartUpload 请求实现初始化分片上传,成功执行此请求以后会返回Upload ID用于后续的Upload Part请求。
  27. //
  28. // https://www.qcloud.com/document/product/436/7746
  29. func (s *ObjectService) InitiateMultipartUpload(ctx context.Context, name string, opt *InitiateMultipartUploadOptions) (*InitiateMultipartUploadResult, *Response, error) {
  30. var res InitiateMultipartUploadResult
  31. sendOpt := sendOptions{
  32. baseURL: s.client.BaseURL.BucketURL,
  33. uri: "/" + encodeURIComponent(name) + "?uploads",
  34. method: http.MethodPost,
  35. optHeader: opt,
  36. result: &res,
  37. }
  38. resp, err := s.client.send(ctx, &sendOpt)
  39. return &res, resp, err
  40. }
  41. // ObjectUploadPartOptions is the options of upload-part
  42. type ObjectUploadPartOptions struct {
  43. Expect string `header:"Expect,omitempty" url:"-"`
  44. XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
  45. ContentLength int `header:"Content-Length,omitempty" url:"-"`
  46. ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
  47. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  48. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  49. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  50. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  51. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  52. // 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
  53. Listener ProgressListener `header:"-" url:"-" xml:"-"`
  54. }
  55. // UploadPart 请求实现在初始化以后的分块上传,支持的块的数量为1到10000,块的大小为1 MB 到5 GB。
  56. // 在每次请求Upload Part时候,需要携带partNumber和uploadID,partNumber为块的编号,支持乱序上传。
  57. //
  58. // 当传入uploadID和partNumber都相同的时候,后传入的块将覆盖之前传入的块。当uploadID不存在时会返回404错误,NoSuchUpload.
  59. //
  60. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ContentLength
  61. //
  62. // https://www.qcloud.com/document/product/436/7750
  63. func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, partNumber int, r io.Reader, opt *ObjectUploadPartOptions) (*Response, error) {
  64. if err := CheckReaderLen(r); err != nil {
  65. return nil, err
  66. }
  67. if opt != nil && opt.Listener != nil {
  68. totalBytes, err := GetReaderLen(r)
  69. if err != nil {
  70. return nil, err
  71. }
  72. r = TeeReader(r, nil, totalBytes, opt.Listener)
  73. }
  74. u := fmt.Sprintf("/%s?partNumber=%d&uploadId=%s", encodeURIComponent(name), partNumber, uploadID)
  75. sendOpt := sendOptions{
  76. baseURL: s.client.BaseURL.BucketURL,
  77. uri: u,
  78. method: http.MethodPut,
  79. optHeader: opt,
  80. body: r,
  81. }
  82. resp, err := s.client.send(ctx, &sendOpt)
  83. return resp, err
  84. }
  85. // ObjectListPartsOptions is the option of ListParts
  86. type ObjectListPartsOptions struct {
  87. EncodingType string `url:"Encoding-type,omitempty"`
  88. MaxParts string `url:"max-parts,omitempty"`
  89. PartNumberMarker string `url:"part-number-marker,omitempty"`
  90. }
  91. // ObjectListPartsResult is the result of ListParts
  92. type ObjectListPartsResult struct {
  93. XMLName xml.Name `xml:"ListPartsResult"`
  94. Bucket string
  95. EncodingType string `xml:"Encoding-type,omitempty"`
  96. Key string
  97. UploadID string `xml:"UploadId"`
  98. Initiator *Initiator `xml:"Initiator,omitempty"`
  99. Owner *Owner `xml:"Owner,omitempty"`
  100. StorageClass string
  101. PartNumberMarker string
  102. NextPartNumberMarker string `xml:"NextPartNumberMarker,omitempty"`
  103. MaxParts string
  104. IsTruncated bool
  105. Parts []Object `xml:"Part,omitempty"`
  106. }
  107. // ListParts 用来查询特定分块上传中的已上传的块。
  108. //
  109. // https://www.qcloud.com/document/product/436/7747
  110. func (s *ObjectService) ListParts(ctx context.Context, name, uploadID string, opt *ObjectListPartsOptions) (*ObjectListPartsResult, *Response, error) {
  111. u := fmt.Sprintf("/%s?uploadId=%s", encodeURIComponent(name), uploadID)
  112. var res ObjectListPartsResult
  113. sendOpt := sendOptions{
  114. baseURL: s.client.BaseURL.BucketURL,
  115. uri: u,
  116. method: http.MethodGet,
  117. result: &res,
  118. optQuery: opt,
  119. }
  120. resp, err := s.client.send(ctx, &sendOpt)
  121. return &res, resp, err
  122. }
  123. // CompleteMultipartUploadOptions is the option of CompleteMultipartUpload
  124. type CompleteMultipartUploadOptions struct {
  125. XMLName xml.Name `xml:"CompleteMultipartUpload" header:"-" url:"-"`
  126. Parts []Object `xml:"Part" header:"-" url:"-"`
  127. XOptionHeader *http.Header `header:"-,omitempty" xml:"-" url:"-"`
  128. }
  129. // CompleteMultipartUploadResult is the result CompleteMultipartUpload
  130. type CompleteMultipartUploadResult struct {
  131. XMLName xml.Name `xml:"CompleteMultipartUploadResult"`
  132. Location string
  133. Bucket string
  134. Key string
  135. ETag string
  136. }
  137. // ObjectList can used for sort the parts which needs in complete upload part
  138. // sort.Sort(cos.ObjectList(opt.Parts))
  139. type ObjectList []Object
  140. func (o ObjectList) Len() int {
  141. return len(o)
  142. }
  143. func (o ObjectList) Swap(i, j int) {
  144. o[i], o[j] = o[j], o[i]
  145. }
  146. func (o ObjectList) Less(i, j int) bool { // rewrite the Less method from small to big
  147. return o[i].PartNumber < o[j].PartNumber
  148. }
  149. // CompleteMultipartUpload 用来实现完成整个分块上传。当您已经使用Upload Parts上传所有块以后,你可以用该API完成上传。
  150. // 在使用该API时,您必须在Body中给出每一个块的PartNumber和ETag,用来校验块的准确性。
  151. //
  152. // 由于分块上传的合并需要数分钟时间,因而当合并分块开始的时候,COS就立即返回200的状态码,在合并的过程中,
  153. // COS会周期性的返回空格信息来保持连接活跃,直到合并完成,COS会在Body中返回合并后块的内容。
  154. //
  155. // 当上传块小于1 MB的时候,在调用该请求时,会返回400 EntityTooSmall;
  156. // 当上传块编号不连续的时候,在调用该请求时,会返回400 InvalidPart;
  157. // 当请求Body中的块信息没有按序号从小到大排列的时候,在调用该请求时,会返回400 InvalidPartOrder;
  158. // 当UploadId不存在的时候,在调用该请求时,会返回404 NoSuchUpload。
  159. //
  160. // 建议您及时完成分块上传或者舍弃分块上传,因为已上传但是未终止的块会占用存储空间进而产生存储费用。
  161. //
  162. // https://www.qcloud.com/document/product/436/7742
  163. func (s *ObjectService) CompleteMultipartUpload(ctx context.Context, name, uploadID string, opt *CompleteMultipartUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  164. u := fmt.Sprintf("/%s?uploadId=%s", encodeURIComponent(name), uploadID)
  165. var res CompleteMultipartUploadResult
  166. sendOpt := sendOptions{
  167. baseURL: s.client.BaseURL.BucketURL,
  168. uri: u,
  169. method: http.MethodPost,
  170. optHeader: opt,
  171. body: opt,
  172. result: &res,
  173. }
  174. resp, err := s.client.send(ctx, &sendOpt)
  175. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  176. if err == nil && resp.StatusCode == 200 {
  177. if res.ETag == "" {
  178. return &res, resp, errors.New("response 200 OK, but body contains an error")
  179. }
  180. }
  181. return &res, resp, err
  182. }
  183. // AbortMultipartUpload 用来实现舍弃一个分块上传并删除已上传的块。当您调用Abort Multipart Upload时,
  184. // 如果有正在使用这个Upload Parts上传块的请求,则Upload Parts会返回失败。当该UploadID不存在时,会返回404 NoSuchUpload。
  185. //
  186. // 建议您及时完成分块上传或者舍弃分块上传,因为已上传但是未终止的块会占用存储空间进而产生存储费用。
  187. //
  188. // https://www.qcloud.com/document/product/436/7740
  189. func (s *ObjectService) AbortMultipartUpload(ctx context.Context, name, uploadID string) (*Response, error) {
  190. u := fmt.Sprintf("/%s?uploadId=%s", encodeURIComponent(name), uploadID)
  191. sendOpt := sendOptions{
  192. baseURL: s.client.BaseURL.BucketURL,
  193. uri: u,
  194. method: http.MethodDelete,
  195. }
  196. resp, err := s.client.send(ctx, &sendOpt)
  197. return resp, err
  198. }
  199. // ObjectCopyPartOptions is the options of copy-part
  200. type ObjectCopyPartOptions struct {
  201. XCosCopySource string `header:"x-cos-copy-source" url:"-"`
  202. XCosCopySourceRange string `header:"x-cos-copy-source-range,omitempty" url:"-"`
  203. XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-"`
  204. XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-"`
  205. XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-"`
  206. XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-"`
  207. }
  208. // CopyPartResult is the result CopyPart
  209. type CopyPartResult struct {
  210. XMLName xml.Name `xml:"CopyPartResult"`
  211. ETag string
  212. LastModified string
  213. }
  214. // CopyPart 请求实现在初始化以后的分块上传,支持的块的数量为1到10000,块的大小为1 MB 到5 GB。
  215. // 在每次请求Upload Part时候,需要携带partNumber和uploadID,partNumber为块的编号,支持乱序上传。
  216. // ObjectCopyPartOptions的XCosCopySource为必填参数,格式为<bucket-name>-<app-id>.cos.<region-id>.myqcloud.com/<object-key>
  217. // ObjectCopyPartOptions的XCosCopySourceRange指定源的Range,格式为bytes=<start>-<end>
  218. //
  219. // 当传入uploadID和partNumber都相同的时候,后传入的块将覆盖之前传入的块。当uploadID不存在时会返回404错误,NoSuchUpload.
  220. //
  221. // https://www.qcloud.com/document/product/436/7750
  222. func (s *ObjectService) CopyPart(ctx context.Context, name, uploadID string, partNumber int, sourceURL string, opt *ObjectCopyPartOptions) (*CopyPartResult, *Response, error) {
  223. if opt == nil {
  224. opt = &ObjectCopyPartOptions{}
  225. }
  226. opt.XCosCopySource = sourceURL
  227. u := fmt.Sprintf("/%s?partNumber=%d&uploadId=%s", encodeURIComponent(name), partNumber, uploadID)
  228. var res CopyPartResult
  229. sendOpt := sendOptions{
  230. baseURL: s.client.BaseURL.BucketURL,
  231. uri: u,
  232. method: http.MethodPut,
  233. optHeader: opt,
  234. result: &res,
  235. }
  236. resp, err := s.client.send(ctx, &sendOpt)
  237. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  238. if err == nil && resp != nil && resp.StatusCode == 200 {
  239. if res.ETag == "" {
  240. return &res, resp, errors.New("response 200 OK, but body contains an error")
  241. }
  242. }
  243. return &res, resp, err
  244. }
  245. type ObjectListUploadsOptions struct {
  246. Delimiter string `url:"Delimiter,omitempty"`
  247. EncodingType string `url:"EncodingType,omitempty"`
  248. Prefix string `url:"Prefix"`
  249. MaxUploads int `url:"MaxUploads"`
  250. KeyMarker string `url:"KeyMarker"`
  251. UploadIdMarker string `url:"UploadIDMarker"`
  252. }
  253. type ObjectListUploadsResult struct {
  254. XMLName xml.Name `xml:"ListMultipartUploadsResult"`
  255. Bucket string `xml:"Bucket,omitempty"`
  256. EncodingType string `xml:"Encoding-Type,omitempty"`
  257. KeyMarker string `xml:"KeyMarker,omitempty"`
  258. UploadIdMarker string `xml:"UploadIdMarker,omitempty"`
  259. NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
  260. NextUploadIdMarker string `xml:"NextUploadIdMarker,omitempty"`
  261. MaxUploads string `xml:"MaxUploads,omitempty"`
  262. IsTruncated bool `xml:"IsTruncated,omitempty"`
  263. Prefix string `xml:"Prefix,omitempty"`
  264. Delimiter string `xml:"Delimiter,omitempty"`
  265. Upload []ListUploadsResultUpload `xml:"Upload,omitempty"`
  266. CommonPrefixes []string `xml:"CommonPrefixes>Prefix,omitempty"`
  267. }
  268. type ListUploadsResultUpload struct {
  269. Key string `xml:"Key,omitempty"`
  270. UploadID string `xml:"UploadId,omitempty"`
  271. StorageClass string `xml:"StorageClass,omitempty"`
  272. Initiator *Initiator `xml:"Initiator,omitempty"`
  273. Owner *Owner `xml:"Owner,omitempty"`
  274. Initiated string `xml:"Initiated,omitempty"`
  275. }
  276. func (s *ObjectService) ListUploads(ctx context.Context, opt *ObjectListUploadsOptions) (*ObjectListUploadsResult, *Response, error) {
  277. var res ObjectListUploadsResult
  278. sendOpt := &sendOptions{
  279. baseURL: s.client.BaseURL.BucketURL,
  280. uri: "/?uploads",
  281. method: http.MethodGet,
  282. optQuery: opt,
  283. result: &res,
  284. }
  285. resp, err := s.client.send(ctx, sendOpt)
  286. return &res, resp, err
  287. }
  288. type MultiCopyOptions struct {
  289. OptCopy *ObjectCopyOptions
  290. PartSize int64
  291. ThreadPoolSize int
  292. }
  293. type CopyJobs struct {
  294. Name string
  295. UploadId string
  296. RetryTimes int
  297. Chunk Chunk
  298. Opt *ObjectCopyPartOptions
  299. }
  300. type CopyResults struct {
  301. PartNumber int
  302. Resp *Response
  303. err error
  304. res *CopyPartResult
  305. }
  306. func copyworker(s *ObjectService, jobs <-chan *CopyJobs, results chan<- *CopyResults) {
  307. for j := range jobs {
  308. var copyres CopyResults
  309. j.Opt.XCosCopySourceRange = fmt.Sprintf("bytes=%d-%d", j.Chunk.OffSet, j.Chunk.OffSet+j.Chunk.Size-1)
  310. rt := j.RetryTimes
  311. for {
  312. res, resp, err := s.CopyPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number, j.Opt.XCosCopySource, j.Opt)
  313. copyres.PartNumber = j.Chunk.Number
  314. copyres.Resp = resp
  315. copyres.err = err
  316. copyres.res = res
  317. if err != nil {
  318. rt--
  319. if rt == 0 {
  320. results <- &copyres
  321. break
  322. }
  323. time.Sleep(10 * time.Millisecond)
  324. continue
  325. }
  326. results <- &copyres
  327. break
  328. }
  329. }
  330. }
  331. func (s *ObjectService) innerHead(ctx context.Context, sourceURL string, opt *ObjectHeadOptions, id []string) (resp *Response, err error) {
  332. surl := strings.SplitN(sourceURL, "/", 2)
  333. if len(surl) < 2 {
  334. err = errors.New(fmt.Sprintf("sourceURL format error: %s", sourceURL))
  335. return
  336. }
  337. u, err := url.Parse(fmt.Sprintf("https://%s", surl[0]))
  338. if err != nil {
  339. return
  340. }
  341. b := &BaseURL{BucketURL: u}
  342. client := NewClient(b, &http.Client{
  343. Transport: s.client.client.Transport,
  344. })
  345. if len(id) > 0 {
  346. resp, err = client.Object.Head(ctx, surl[1], nil, id[0])
  347. } else {
  348. resp, err = client.Object.Head(ctx, surl[1], nil)
  349. }
  350. return
  351. }
  352. func SplitCopyFileIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, error) {
  353. var partNum int64
  354. if partSize > 0 {
  355. partSize = partSize * 1024 * 1024
  356. partNum = totalBytes / partSize
  357. if partNum >= 10000 {
  358. return nil, 0, errors.New("Too many parts, out of 10000")
  359. }
  360. } else {
  361. partNum, partSize = DividePart(totalBytes, 128)
  362. }
  363. var chunks []Chunk
  364. var chunk = Chunk{}
  365. for i := int64(0); i < partNum; i++ {
  366. chunk.Number = int(i + 1)
  367. chunk.OffSet = i * partSize
  368. chunk.Size = partSize
  369. chunks = append(chunks, chunk)
  370. }
  371. if totalBytes%partSize > 0 {
  372. chunk.Number = len(chunks) + 1
  373. chunk.OffSet = int64(len(chunks)) * partSize
  374. chunk.Size = totalBytes % partSize
  375. chunks = append(chunks, chunk)
  376. partNum++
  377. }
  378. return chunks, int(partNum), nil
  379. }
  380. func (s *ObjectService) MultiCopy(ctx context.Context, name string, sourceURL string, opt *MultiCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
  381. resp, err := s.innerHead(ctx, sourceURL, nil, id)
  382. if err != nil {
  383. return nil, nil, err
  384. }
  385. totalBytes := resp.ContentLength
  386. surl := strings.SplitN(sourceURL, "/", 2)
  387. if len(surl) < 2 {
  388. return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
  389. }
  390. var u string
  391. if len(id) == 1 {
  392. u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
  393. } else if len(id) == 0 {
  394. u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
  395. } else {
  396. return nil, nil, errors.New("wrong params")
  397. }
  398. if opt == nil {
  399. opt = &MultiCopyOptions{}
  400. }
  401. chunks, partNum, err := SplitCopyFileIntoChunks(totalBytes, opt.PartSize)
  402. if err != nil {
  403. return nil, nil, err
  404. }
  405. if partNum == 0 || totalBytes < singleUploadMaxLength {
  406. if len(id) > 0 {
  407. return s.Copy(ctx, name, sourceURL, opt.OptCopy, id[0])
  408. } else {
  409. return s.Copy(ctx, name, sourceURL, opt.OptCopy)
  410. }
  411. }
  412. optini := CopyOptionsToMulti(opt.OptCopy)
  413. var uploadID string
  414. res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
  415. if err != nil {
  416. return nil, nil, err
  417. }
  418. uploadID = res.UploadID
  419. var poolSize int
  420. if opt.ThreadPoolSize > 0 {
  421. poolSize = opt.ThreadPoolSize
  422. } else {
  423. poolSize = 1
  424. }
  425. chjobs := make(chan *CopyJobs, 100)
  426. chresults := make(chan *CopyResults, 10000)
  427. optcom := &CompleteMultipartUploadOptions{}
  428. for w := 1; w <= poolSize; w++ {
  429. go copyworker(s, chjobs, chresults)
  430. }
  431. go func() {
  432. for _, chunk := range chunks {
  433. partOpt := &ObjectCopyPartOptions{
  434. XCosCopySource: u,
  435. }
  436. job := &CopyJobs{
  437. Name: name,
  438. RetryTimes: 3,
  439. UploadId: uploadID,
  440. Chunk: chunk,
  441. Opt: partOpt,
  442. }
  443. chjobs <- job
  444. }
  445. close(chjobs)
  446. }()
  447. err = nil
  448. for i := 0; i < partNum; i++ {
  449. res := <-chresults
  450. if res.res == nil || res.err != nil {
  451. err = fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
  452. continue
  453. }
  454. etag := res.res.ETag
  455. optcom.Parts = append(optcom.Parts, Object{
  456. PartNumber: res.PartNumber, ETag: etag},
  457. )
  458. }
  459. close(chresults)
  460. if err != nil {
  461. return nil, nil, err
  462. }
  463. sort.Sort(ObjectList(optcom.Parts))
  464. v, resp, err := s.CompleteMultipartUpload(ctx, name, uploadID, optcom)
  465. if err != nil {
  466. s.AbortMultipartUpload(ctx, name, uploadID)
  467. }
  468. cpres := &ObjectCopyResult{
  469. ETag: v.ETag,
  470. CRC64: resp.Header.Get("x-cos-hash-crc64ecma"),
  471. VersionId: resp.Header.Get("x-cos-version-id"),
  472. }
  473. return cpres, resp, err
  474. }