You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

534 lines
19 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package cos
  2. import (
  3. "context"
  4. "encoding/xml"
  5. "errors"
  6. "fmt"
  7. "hash/crc64"
  8. "io"
  9. "net/http"
  10. "net/url"
  11. "sort"
  12. "strings"
  13. "time"
  14. )
  15. // InitiateMultipartUploadOptions is the option of InitateMultipartUpload
  16. type InitiateMultipartUploadOptions struct {
  17. *ACLHeaderOptions
  18. *ObjectPutHeaderOptions
  19. }
  20. // InitiateMultipartUploadResult is the result of InitateMultipartUpload
  21. type InitiateMultipartUploadResult struct {
  22. XMLName xml.Name `xml:"InitiateMultipartUploadResult"`
  23. Bucket string
  24. Key string
  25. UploadID string `xml:"UploadId"`
  26. }
  27. // InitiateMultipartUpload 请求实现初始化分片上传,成功执行此请求以后会返回Upload ID用于后续的Upload Part请求。
  28. //
  29. // https://www.qcloud.com/document/product/436/7746
  30. func (s *ObjectService) InitiateMultipartUpload(ctx context.Context, name string, opt *InitiateMultipartUploadOptions) (*InitiateMultipartUploadResult, *Response, error) {
  31. var res InitiateMultipartUploadResult
  32. sendOpt := sendOptions{
  33. baseURL: s.client.BaseURL.BucketURL,
  34. uri: "/" + encodeURIComponent(name) + "?uploads",
  35. method: http.MethodPost,
  36. optHeader: opt,
  37. result: &res,
  38. }
  39. resp, err := s.client.send(ctx, &sendOpt)
  40. return &res, resp, err
  41. }
  42. // ObjectUploadPartOptions is the options of upload-part
  43. type ObjectUploadPartOptions struct {
  44. Expect string `header:"Expect,omitempty" url:"-"`
  45. XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
  46. ContentLength int64 `header:"Content-Length,omitempty" url:"-"`
  47. ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
  48. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  49. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  50. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  51. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  52. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  53. // 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
  54. Listener ProgressListener `header:"-" url:"-" xml:"-"`
  55. }
  56. // UploadPart 请求实现在初始化以后的分块上传,支持的块的数量为1到10000,块的大小为1 MB 到5 GB。
  57. // 在每次请求Upload Part时候,需要携带partNumber和uploadID,partNumber为块的编号,支持乱序上传。
  58. //
  59. // 当传入uploadID和partNumber都相同的时候,后传入的块将覆盖之前传入的块。当uploadID不存在时会返回404错误,NoSuchUpload.
  60. //
  61. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ContentLength
  62. //
  63. // https://www.qcloud.com/document/product/436/7750
  64. func (s *ObjectService) UploadPart(ctx context.Context, name, uploadID string, partNumber int, r io.Reader, uopt *ObjectUploadPartOptions) (*Response, error) {
  65. if r == nil {
  66. return nil, fmt.Errorf("reader is nil")
  67. }
  68. if err := CheckReaderLen(r); err != nil {
  69. return nil, err
  70. }
  71. // opt 不为 nil
  72. opt := cloneObjectUploadPartOptions(uopt)
  73. totalBytes, err := GetReaderLen(r)
  74. if err != nil && opt.Listener != nil {
  75. return nil, err
  76. }
  77. // 分块上传不支持 Chunk 上传
  78. if err == nil {
  79. // 与 go http 保持一致, 非bytes.Buffer/bytes.Reader/strings.Reader需用户指定ContentLength
  80. if opt != nil && opt.ContentLength == 0 && IsLenReader(r) {
  81. opt.ContentLength = totalBytes
  82. }
  83. }
  84. reader := TeeReader(r, nil, totalBytes, nil)
  85. if s.client.Conf.EnableCRC {
  86. reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA))
  87. }
  88. if opt != nil && opt.Listener != nil {
  89. reader.listener = opt.Listener
  90. }
  91. u := fmt.Sprintf("/%s?partNumber=%d&uploadId=%s", encodeURIComponent(name), partNumber, uploadID)
  92. sendOpt := sendOptions{
  93. baseURL: s.client.BaseURL.BucketURL,
  94. uri: u,
  95. method: http.MethodPut,
  96. optHeader: opt,
  97. body: reader,
  98. }
  99. resp, err := s.client.send(ctx, &sendOpt)
  100. return resp, err
  101. }
  102. // ObjectListPartsOptions is the option of ListParts
  103. type ObjectListPartsOptions struct {
  104. EncodingType string `url:"Encoding-type,omitempty"`
  105. MaxParts string `url:"max-parts,omitempty"`
  106. PartNumberMarker string `url:"part-number-marker,omitempty"`
  107. }
  108. // ObjectListPartsResult is the result of ListParts
  109. type ObjectListPartsResult struct {
  110. XMLName xml.Name `xml:"ListPartsResult"`
  111. Bucket string
  112. EncodingType string `xml:"Encoding-type,omitempty"`
  113. Key string
  114. UploadID string `xml:"UploadId"`
  115. Initiator *Initiator `xml:"Initiator,omitempty"`
  116. Owner *Owner `xml:"Owner,omitempty"`
  117. StorageClass string
  118. PartNumberMarker string
  119. NextPartNumberMarker string `xml:"NextPartNumberMarker,omitempty"`
  120. MaxParts string
  121. IsTruncated bool
  122. Parts []Object `xml:"Part,omitempty"`
  123. }
  124. // ListParts 用来查询特定分块上传中的已上传的块。
  125. //
  126. // https://www.qcloud.com/document/product/436/7747
  127. func (s *ObjectService) ListParts(ctx context.Context, name, uploadID string, opt *ObjectListPartsOptions) (*ObjectListPartsResult, *Response, error) {
  128. u := fmt.Sprintf("/%s?uploadId=%s", encodeURIComponent(name), uploadID)
  129. var res ObjectListPartsResult
  130. sendOpt := sendOptions{
  131. baseURL: s.client.BaseURL.BucketURL,
  132. uri: u,
  133. method: http.MethodGet,
  134. result: &res,
  135. optQuery: opt,
  136. }
  137. resp, err := s.client.send(ctx, &sendOpt)
  138. return &res, resp, err
  139. }
  140. // CompleteMultipartUploadOptions is the option of CompleteMultipartUpload
  141. type CompleteMultipartUploadOptions struct {
  142. XMLName xml.Name `xml:"CompleteMultipartUpload" header:"-" url:"-"`
  143. Parts []Object `xml:"Part" header:"-" url:"-"`
  144. XOptionHeader *http.Header `header:"-,omitempty" xml:"-" url:"-"`
  145. }
  146. // CompleteMultipartUploadResult is the result CompleteMultipartUpload
  147. type CompleteMultipartUploadResult struct {
  148. XMLName xml.Name `xml:"CompleteMultipartUploadResult"`
  149. Location string
  150. Bucket string
  151. Key string
  152. ETag string
  153. }
  154. // ObjectList can used for sort the parts which needs in complete upload part
  155. // sort.Sort(cos.ObjectList(opt.Parts))
  156. type ObjectList []Object
  157. func (o ObjectList) Len() int {
  158. return len(o)
  159. }
  160. func (o ObjectList) Swap(i, j int) {
  161. o[i], o[j] = o[j], o[i]
  162. }
  163. func (o ObjectList) Less(i, j int) bool { // rewrite the Less method from small to big
  164. return o[i].PartNumber < o[j].PartNumber
  165. }
  166. // CompleteMultipartUpload 用来实现完成整个分块上传。当您已经使用Upload Parts上传所有块以后,你可以用该API完成上传。
  167. // 在使用该API时,您必须在Body中给出每一个块的PartNumber和ETag,用来校验块的准确性。
  168. //
  169. // 由于分块上传的合并需要数分钟时间,因而当合并分块开始的时候,COS就立即返回200的状态码,在合并的过程中,
  170. // COS会周期性的返回空格信息来保持连接活跃,直到合并完成,COS会在Body中返回合并后块的内容。
  171. //
  172. // 当上传块小于1 MB的时候,在调用该请求时,会返回400 EntityTooSmall;
  173. // 当上传块编号不连续的时候,在调用该请求时,会返回400 InvalidPart;
  174. // 当请求Body中的块信息没有按序号从小到大排列的时候,在调用该请求时,会返回400 InvalidPartOrder;
  175. // 当UploadId不存在的时候,在调用该请求时,会返回404 NoSuchUpload。
  176. //
  177. // 建议您及时完成分块上传或者舍弃分块上传,因为已上传但是未终止的块会占用存储空间进而产生存储费用。
  178. //
  179. // https://www.qcloud.com/document/product/436/7742
  180. func (s *ObjectService) CompleteMultipartUpload(ctx context.Context, name, uploadID string, opt *CompleteMultipartUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  181. u := fmt.Sprintf("/%s?uploadId=%s", encodeURIComponent(name), uploadID)
  182. var res CompleteMultipartUploadResult
  183. sendOpt := sendOptions{
  184. baseURL: s.client.BaseURL.BucketURL,
  185. uri: u,
  186. method: http.MethodPost,
  187. optHeader: opt,
  188. body: opt,
  189. result: &res,
  190. }
  191. resp, err := s.client.send(ctx, &sendOpt)
  192. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  193. if err == nil && resp.StatusCode == 200 {
  194. if res.ETag == "" {
  195. return &res, resp, errors.New("response 200 OK, but body contains an error")
  196. }
  197. }
  198. return &res, resp, err
  199. }
  200. // AbortMultipartUpload 用来实现舍弃一个分块上传并删除已上传的块。当您调用Abort Multipart Upload时,
  201. // 如果有正在使用这个Upload Parts上传块的请求,则Upload Parts会返回失败。当该UploadID不存在时,会返回404 NoSuchUpload。
  202. //
  203. // 建议您及时完成分块上传或者舍弃分块上传,因为已上传但是未终止的块会占用存储空间进而产生存储费用。
  204. //
  205. // https://www.qcloud.com/document/product/436/7740
  206. func (s *ObjectService) AbortMultipartUpload(ctx context.Context, name, uploadID string) (*Response, error) {
  207. u := fmt.Sprintf("/%s?uploadId=%s", encodeURIComponent(name), uploadID)
  208. sendOpt := sendOptions{
  209. baseURL: s.client.BaseURL.BucketURL,
  210. uri: u,
  211. method: http.MethodDelete,
  212. }
  213. resp, err := s.client.send(ctx, &sendOpt)
  214. return resp, err
  215. }
  216. // ObjectCopyPartOptions is the options of copy-part
  217. type ObjectCopyPartOptions struct {
  218. XCosCopySource string `header:"x-cos-copy-source" url:"-"`
  219. XCosCopySourceRange string `header:"x-cos-copy-source-range,omitempty" url:"-"`
  220. XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-"`
  221. XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-"`
  222. XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-"`
  223. XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-"`
  224. }
  225. // CopyPartResult is the result CopyPart
  226. type CopyPartResult struct {
  227. XMLName xml.Name `xml:"CopyPartResult"`
  228. ETag string
  229. LastModified string
  230. }
  231. // CopyPart 请求实现在初始化以后的分块上传,支持的块的数量为1到10000,块的大小为1 MB 到5 GB。
  232. // 在每次请求Upload Part时候,需要携带partNumber和uploadID,partNumber为块的编号,支持乱序上传。
  233. // ObjectCopyPartOptions的XCosCopySource为必填参数,格式为<bucket-name>-<app-id>.cos.<region-id>.myqcloud.com/<object-key>
  234. // ObjectCopyPartOptions的XCosCopySourceRange指定源的Range,格式为bytes=<start>-<end>
  235. //
  236. // 当传入uploadID和partNumber都相同的时候,后传入的块将覆盖之前传入的块。当uploadID不存在时会返回404错误,NoSuchUpload.
  237. //
  238. // https://www.qcloud.com/document/product/436/7750
  239. func (s *ObjectService) CopyPart(ctx context.Context, name, uploadID string, partNumber int, sourceURL string, opt *ObjectCopyPartOptions) (*CopyPartResult, *Response, error) {
  240. if opt == nil {
  241. opt = &ObjectCopyPartOptions{}
  242. }
  243. opt.XCosCopySource = sourceURL
  244. u := fmt.Sprintf("/%s?partNumber=%d&uploadId=%s", encodeURIComponent(name), partNumber, uploadID)
  245. var res CopyPartResult
  246. sendOpt := sendOptions{
  247. baseURL: s.client.BaseURL.BucketURL,
  248. uri: u,
  249. method: http.MethodPut,
  250. optHeader: opt,
  251. result: &res,
  252. }
  253. resp, err := s.client.send(ctx, &sendOpt)
  254. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  255. if err == nil && resp != nil && resp.StatusCode == 200 {
  256. if res.ETag == "" {
  257. return &res, resp, errors.New("response 200 OK, but body contains an error")
  258. }
  259. }
  260. return &res, resp, err
  261. }
  262. type ObjectListUploadsOptions struct {
  263. Delimiter string `url:"Delimiter,omitempty"`
  264. EncodingType string `url:"EncodingType,omitempty"`
  265. Prefix string `url:"Prefix"`
  266. MaxUploads int `url:"MaxUploads"`
  267. KeyMarker string `url:"KeyMarker"`
  268. UploadIdMarker string `url:"UploadIDMarker"`
  269. }
  270. type ObjectListUploadsResult struct {
  271. XMLName xml.Name `xml:"ListMultipartUploadsResult"`
  272. Bucket string `xml:"Bucket,omitempty"`
  273. EncodingType string `xml:"Encoding-Type,omitempty"`
  274. KeyMarker string `xml:"KeyMarker,omitempty"`
  275. UploadIdMarker string `xml:"UploadIdMarker,omitempty"`
  276. NextKeyMarker string `xml:"NextKeyMarker,omitempty"`
  277. NextUploadIdMarker string `xml:"NextUploadIdMarker,omitempty"`
  278. MaxUploads string `xml:"MaxUploads,omitempty"`
  279. IsTruncated bool `xml:"IsTruncated,omitempty"`
  280. Prefix string `xml:"Prefix,omitempty"`
  281. Delimiter string `xml:"Delimiter,omitempty"`
  282. Upload []ListUploadsResultUpload `xml:"Upload,omitempty"`
  283. CommonPrefixes []string `xml:"CommonPrefixes>Prefix,omitempty"`
  284. }
  285. type ListUploadsResultUpload struct {
  286. Key string `xml:"Key,omitempty"`
  287. UploadID string `xml:"UploadId,omitempty"`
  288. StorageClass string `xml:"StorageClass,omitempty"`
  289. Initiator *Initiator `xml:"Initiator,omitempty"`
  290. Owner *Owner `xml:"Owner,omitempty"`
  291. Initiated string `xml:"Initiated,omitempty"`
  292. }
  293. func (s *ObjectService) ListUploads(ctx context.Context, opt *ObjectListUploadsOptions) (*ObjectListUploadsResult, *Response, error) {
  294. var res ObjectListUploadsResult
  295. sendOpt := &sendOptions{
  296. baseURL: s.client.BaseURL.BucketURL,
  297. uri: "/?uploads",
  298. method: http.MethodGet,
  299. optQuery: opt,
  300. result: &res,
  301. }
  302. resp, err := s.client.send(ctx, sendOpt)
  303. return &res, resp, err
  304. }
  305. type MultiCopyOptions struct {
  306. OptCopy *ObjectCopyOptions
  307. PartSize int64
  308. ThreadPoolSize int
  309. }
  310. type CopyJobs struct {
  311. Name string
  312. UploadId string
  313. RetryTimes int
  314. Chunk Chunk
  315. Opt *ObjectCopyPartOptions
  316. }
  317. type CopyResults struct {
  318. PartNumber int
  319. Resp *Response
  320. err error
  321. res *CopyPartResult
  322. }
  323. func copyworker(s *ObjectService, jobs <-chan *CopyJobs, results chan<- *CopyResults) {
  324. for j := range jobs {
  325. var copyres CopyResults
  326. j.Opt.XCosCopySourceRange = fmt.Sprintf("bytes=%d-%d", j.Chunk.OffSet, j.Chunk.OffSet+j.Chunk.Size-1)
  327. rt := j.RetryTimes
  328. for {
  329. res, resp, err := s.CopyPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number, j.Opt.XCosCopySource, j.Opt)
  330. copyres.PartNumber = j.Chunk.Number
  331. copyres.Resp = resp
  332. copyres.err = err
  333. copyres.res = res
  334. if err != nil {
  335. rt--
  336. if rt == 0 {
  337. results <- &copyres
  338. break
  339. }
  340. time.Sleep(10 * time.Millisecond)
  341. continue
  342. }
  343. results <- &copyres
  344. break
  345. }
  346. }
  347. }
  348. func (s *ObjectService) innerHead(ctx context.Context, sourceURL string, opt *ObjectHeadOptions, id []string) (resp *Response, err error) {
  349. surl := strings.SplitN(sourceURL, "/", 2)
  350. if len(surl) < 2 {
  351. err = errors.New(fmt.Sprintf("sourceURL format error: %s", sourceURL))
  352. return
  353. }
  354. u, err := url.Parse(fmt.Sprintf("https://%s", surl[0]))
  355. if err != nil {
  356. return
  357. }
  358. b := &BaseURL{BucketURL: u}
  359. client := NewClient(b, &http.Client{
  360. Transport: s.client.client.Transport,
  361. })
  362. if len(id) > 0 {
  363. resp, err = client.Object.Head(ctx, surl[1], nil, id[0])
  364. } else {
  365. resp, err = client.Object.Head(ctx, surl[1], nil)
  366. }
  367. return
  368. }
  369. func SplitCopyFileIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, error) {
  370. var partNum int64
  371. if partSize > 0 {
  372. partSize = partSize * 1024 * 1024
  373. partNum = totalBytes / partSize
  374. if partNum >= 10000 {
  375. return nil, 0, errors.New("Too many parts, out of 10000")
  376. }
  377. } else {
  378. partNum, partSize = DividePart(totalBytes, 128)
  379. }
  380. var chunks []Chunk
  381. var chunk = Chunk{}
  382. for i := int64(0); i < partNum; i++ {
  383. chunk.Number = int(i + 1)
  384. chunk.OffSet = i * partSize
  385. chunk.Size = partSize
  386. chunks = append(chunks, chunk)
  387. }
  388. if totalBytes%partSize > 0 {
  389. chunk.Number = len(chunks) + 1
  390. chunk.OffSet = int64(len(chunks)) * partSize
  391. chunk.Size = totalBytes % partSize
  392. chunks = append(chunks, chunk)
  393. partNum++
  394. }
  395. return chunks, int(partNum), nil
  396. }
  397. func (s *ObjectService) MultiCopy(ctx context.Context, name string, sourceURL string, opt *MultiCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
  398. resp, err := s.innerHead(ctx, sourceURL, nil, id)
  399. if err != nil {
  400. return nil, nil, err
  401. }
  402. totalBytes := resp.ContentLength
  403. surl := strings.SplitN(sourceURL, "/", 2)
  404. if len(surl) < 2 {
  405. return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
  406. }
  407. var u string
  408. if len(id) == 1 {
  409. u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
  410. } else if len(id) == 0 {
  411. u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
  412. } else {
  413. return nil, nil, errors.New("wrong params")
  414. }
  415. if opt == nil {
  416. opt = &MultiCopyOptions{}
  417. }
  418. chunks, partNum, err := SplitCopyFileIntoChunks(totalBytes, opt.PartSize)
  419. if err != nil {
  420. return nil, nil, err
  421. }
  422. if partNum == 0 || totalBytes < singleUploadMaxLength {
  423. if len(id) > 0 {
  424. return s.Copy(ctx, name, sourceURL, opt.OptCopy, id[0])
  425. } else {
  426. return s.Copy(ctx, name, sourceURL, opt.OptCopy)
  427. }
  428. }
  429. optini := CopyOptionsToMulti(opt.OptCopy)
  430. var uploadID string
  431. res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
  432. if err != nil {
  433. return nil, nil, err
  434. }
  435. uploadID = res.UploadID
  436. var poolSize int
  437. if opt.ThreadPoolSize > 0 {
  438. poolSize = opt.ThreadPoolSize
  439. } else {
  440. poolSize = 1
  441. }
  442. chjobs := make(chan *CopyJobs, 100)
  443. chresults := make(chan *CopyResults, 10000)
  444. optcom := &CompleteMultipartUploadOptions{}
  445. for w := 1; w <= poolSize; w++ {
  446. go copyworker(s, chjobs, chresults)
  447. }
  448. go func() {
  449. for _, chunk := range chunks {
  450. partOpt := &ObjectCopyPartOptions{
  451. XCosCopySource: u,
  452. }
  453. job := &CopyJobs{
  454. Name: name,
  455. RetryTimes: 3,
  456. UploadId: uploadID,
  457. Chunk: chunk,
  458. Opt: partOpt,
  459. }
  460. chjobs <- job
  461. }
  462. close(chjobs)
  463. }()
  464. err = nil
  465. for i := 0; i < partNum; i++ {
  466. res := <-chresults
  467. if res.res == nil || res.err != nil {
  468. err = fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
  469. continue
  470. }
  471. etag := res.res.ETag
  472. optcom.Parts = append(optcom.Parts, Object{
  473. PartNumber: res.PartNumber, ETag: etag},
  474. )
  475. }
  476. close(chresults)
  477. if err != nil {
  478. return nil, nil, err
  479. }
  480. sort.Sort(ObjectList(optcom.Parts))
  481. v, resp, err := s.CompleteMultipartUpload(ctx, name, uploadID, optcom)
  482. if err != nil {
  483. s.AbortMultipartUpload(ctx, name, uploadID)
  484. return nil, resp, err
  485. }
  486. cpres := &ObjectCopyResult{
  487. ETag: v.ETag,
  488. CRC64: resp.Header.Get("x-cos-hash-crc64ecma"),
  489. VersionId: resp.Header.Get("x-cos-version-id"),
  490. }
  491. return cpres, resp, err
  492. }