You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1516 lines
47 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package cos
  2. import (
  3. "bytes"
  4. "context"
  5. "crypto/md5"
  6. "encoding/hex"
  7. "encoding/json"
  8. "encoding/xml"
  9. "errors"
  10. "fmt"
  11. "hash/crc64"
  12. "io"
  13. "io/ioutil"
  14. "net/http"
  15. "net/url"
  16. "os"
  17. "sort"
  18. "strconv"
  19. "strings"
  20. "time"
  21. )
  22. // ObjectService 相关 API
  23. type ObjectService service
  24. // ObjectGetOptions is the option of GetObject
  25. type ObjectGetOptions struct {
  26. ResponseContentType string `url:"response-content-type,omitempty" header:"-"`
  27. ResponseContentLanguage string `url:"response-content-language,omitempty" header:"-"`
  28. ResponseExpires string `url:"response-expires,omitempty" header:"-"`
  29. ResponseCacheControl string `url:"response-cache-control,omitempty" header:"-"`
  30. ResponseContentDisposition string `url:"response-content-disposition,omitempty" header:"-"`
  31. ResponseContentEncoding string `url:"response-content-encoding,omitempty" header:"-"`
  32. Range string `url:"-" header:"Range,omitempty"`
  33. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  34. // SSE-C
  35. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  36. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  37. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  38. //兼容其他自定义头部
  39. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  40. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  41. // 下载进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
  42. Listener ProgressListener `header:"-" url:"-" xml:"-"`
  43. }
  44. // presignedURLTestingOptions is the opt of presigned url
  45. type presignedURLTestingOptions struct {
  46. authTime *AuthTime
  47. }
  48. // Get Object 请求可以将一个文件(Object)下载至本地。
  49. // 该操作需要对目标 Object 具有读权限或目标 Object 对所有人都开放了读权限(公有读)。
  50. //
  51. // https://www.qcloud.com/document/product/436/7753
  52. func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  53. var u string
  54. if len(id) == 1 {
  55. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  56. } else if len(id) == 0 {
  57. u = "/" + encodeURIComponent(name)
  58. } else {
  59. return nil, errors.New("wrong params")
  60. }
  61. sendOpt := sendOptions{
  62. baseURL: s.client.BaseURL.BucketURL,
  63. uri: u,
  64. method: http.MethodGet,
  65. optQuery: opt,
  66. optHeader: opt,
  67. disableCloseBody: true,
  68. }
  69. resp, err := s.client.doRetry(ctx, &sendOpt)
  70. if opt != nil && opt.Listener != nil {
  71. if err == nil && resp != nil {
  72. if totalBytes, e := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64); e == nil {
  73. resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener)
  74. }
  75. }
  76. }
  77. return resp, err
  78. }
  79. // GetToFile download the object to local file
  80. func (s *ObjectService) GetToFile(ctx context.Context, name, localpath string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  81. resp, err := s.Get(ctx, name, opt, id...)
  82. if err != nil {
  83. return resp, err
  84. }
  85. defer resp.Body.Close()
  86. // If file exist, overwrite it
  87. fd, err := os.OpenFile(localpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  88. if err != nil {
  89. return resp, err
  90. }
  91. _, err = io.Copy(fd, resp.Body)
  92. fd.Close()
  93. if err != nil {
  94. return resp, err
  95. }
  96. return resp, nil
  97. }
  98. func (s *ObjectService) GetObjectURL(name string) *url.URL {
  99. uri, _ := url.Parse("/" + encodeURIComponent(name, []byte{'/'}))
  100. return s.client.BaseURL.BucketURL.ResolveReference(uri)
  101. }
  102. type PresignedURLOptions struct {
  103. Query *url.Values `xml:"-" url:"-" header:"-"`
  104. Header *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  105. }
  106. // GetPresignedURL get the object presigned to down or upload file by url
  107. func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, ak, sk string, expired time.Duration, opt interface{}) (*url.URL, error) {
  108. sendOpt := sendOptions{
  109. baseURL: s.client.BaseURL.BucketURL,
  110. uri: "/" + encodeURIComponent(name, []byte{'/'}),
  111. method: httpMethod,
  112. optQuery: opt,
  113. optHeader: opt,
  114. }
  115. if popt, ok := opt.(*PresignedURLOptions); ok {
  116. qs := popt.Query.Encode()
  117. if qs != "" {
  118. sendOpt.uri = fmt.Sprintf("%s?%s", sendOpt.uri, qs)
  119. }
  120. }
  121. req, err := s.client.newRequest(ctx, sendOpt.baseURL, sendOpt.uri, sendOpt.method, sendOpt.body, sendOpt.optQuery, sendOpt.optHeader)
  122. if err != nil {
  123. return nil, err
  124. }
  125. var authTime *AuthTime
  126. if opt != nil {
  127. if opt, ok := opt.(*presignedURLTestingOptions); ok {
  128. authTime = opt.authTime
  129. }
  130. }
  131. if authTime == nil {
  132. authTime = NewAuthTime(expired)
  133. }
  134. authorization := newAuthorization(ak, sk, req, authTime)
  135. sign := encodeURIComponent(authorization, []byte{'&', '='})
  136. if req.URL.RawQuery == "" {
  137. req.URL.RawQuery = fmt.Sprintf("%s", sign)
  138. } else {
  139. req.URL.RawQuery = fmt.Sprintf("%s&%s", req.URL.RawQuery, sign)
  140. }
  141. return req.URL, nil
  142. }
  143. // ObjectPutHeaderOptions the options of header of the put object
  144. type ObjectPutHeaderOptions struct {
  145. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  146. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  147. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  148. ContentType string `header:"Content-Type,omitempty" url:"-"`
  149. ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
  150. ContentLength int64 `header:"Content-Length,omitempty" url:"-"`
  151. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  152. Expect string `header:"Expect,omitempty" url:"-"`
  153. Expires string `header:"Expires,omitempty" url:"-"`
  154. XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
  155. // 自定义的 x-cos-meta-* header
  156. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  157. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-"`
  158. // 可选值: Normal, Appendable
  159. //XCosObjectType string `header:"x-cos-object-type,omitempty" url:"-"`
  160. // Enable Server Side Encryption, Only supported: AES256
  161. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  162. // SSE-C
  163. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  164. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  165. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  166. //兼容其他自定义头部
  167. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  168. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  169. // 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
  170. Listener ProgressListener `header:"-" url:"-" xml:"-"`
  171. }
  172. // ObjectPutOptions the options of put object
  173. type ObjectPutOptions struct {
  174. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  175. *ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  176. }
  177. // Put Object请求可以将一个文件(Oject)上传至指定Bucket。
  178. //
  179. // https://www.qcloud.com/document/product/436/7749
  180. func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, uopt *ObjectPutOptions) (*Response, error) {
  181. if r == nil {
  182. return nil, fmt.Errorf("reader is nil")
  183. }
  184. if err := CheckReaderLen(r); err != nil {
  185. return nil, err
  186. }
  187. opt := CloneObjectPutOptions(uopt)
  188. totalBytes, err := GetReaderLen(r)
  189. if err != nil && opt != nil && opt.Listener != nil {
  190. if opt.ContentLength == 0 {
  191. return nil, err
  192. }
  193. totalBytes = opt.ContentLength
  194. }
  195. if err == nil {
  196. // 与 go http 保持一致, 非bytes.Buffer/bytes.Reader/strings.Reader由用户指定ContentLength, 或使用 Chunk 上传
  197. if opt != nil && opt.ContentLength == 0 && IsLenReader(r) {
  198. opt.ContentLength = totalBytes
  199. }
  200. }
  201. reader := TeeReader(r, nil, totalBytes, nil)
  202. if s.client.Conf.EnableCRC {
  203. reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA))
  204. }
  205. if opt != nil && opt.Listener != nil {
  206. reader.listener = opt.Listener
  207. }
  208. sendOpt := sendOptions{
  209. baseURL: s.client.BaseURL.BucketURL,
  210. uri: "/" + encodeURIComponent(name),
  211. method: http.MethodPut,
  212. body: reader,
  213. optHeader: opt,
  214. }
  215. resp, err := s.client.send(ctx, &sendOpt)
  216. return resp, err
  217. }
  218. // PutFromFile put object from local file
  219. func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (resp *Response, err error) {
  220. nr := 0
  221. for nr < 3 {
  222. fd, e := os.Open(filePath)
  223. if e != nil {
  224. err = e
  225. return
  226. }
  227. resp, err = s.Put(ctx, name, fd, opt)
  228. if err != nil {
  229. nr++
  230. fd.Close()
  231. continue
  232. }
  233. fd.Close()
  234. break
  235. }
  236. return
  237. }
  238. // ObjectCopyHeaderOptions is the head option of the Copy
  239. type ObjectCopyHeaderOptions struct {
  240. // When use replace directive to update meta infos
  241. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  242. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  243. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  244. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  245. ContentType string `header:"Content-Type,omitempty" url:"-"`
  246. Expires string `header:"Expires,omitempty" url:"-"`
  247. Expect string `header:"Expect,omitempty" url:"-"`
  248. XCosMetadataDirective string `header:"x-cos-metadata-directive,omitempty" url:"-" xml:"-"`
  249. XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-" xml:"-"`
  250. XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-" xml:"-"`
  251. XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-" xml:"-"`
  252. XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-" xml:"-"`
  253. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-" xml:"-"`
  254. // 自定义的 x-cos-meta-* header
  255. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  256. XCosCopySource string `header:"x-cos-copy-source" url:"-" xml:"-"`
  257. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  258. // SSE-C
  259. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  260. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  261. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  262. XCosCopySourceSSECustomerAglo string `header:"x-cos-copy-source-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  263. XCosCopySourceSSECustomerKey string `header:"x-cos-copy-source-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  264. XCosCopySourceSSECustomerKeyMD5 string `header:"x-cos-copy-source-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  265. //兼容其他自定义头部
  266. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  267. }
  268. // ObjectCopyOptions is the option of Copy, choose header or body
  269. type ObjectCopyOptions struct {
  270. *ObjectCopyHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  271. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  272. }
  273. // ObjectCopyResult is the result of Copy
  274. type ObjectCopyResult struct {
  275. XMLName xml.Name `xml:"CopyObjectResult"`
  276. ETag string `xml:"ETag,omitempty"`
  277. LastModified string `xml:"LastModified,omitempty"`
  278. CRC64 string `xml:"CRC64,omitempty"`
  279. VersionId string `xml:"VersionId,omitempty"`
  280. }
  281. // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
  282. // 超过 5G 的文件请使用分块上传 Upload - Copy。在拷贝的过程中,文件元属性和 ACL 可以被修改。
  283. //
  284. // 用户可以通过该接口实现文件移动,文件重命名,修改文件属性和创建副本。
  285. //
  286. // 注意:在跨帐号复制的时候,需要先设置被复制文件的权限为公有读,或者对目标帐号赋权,同帐号则不需要。
  287. //
  288. // https://cloud.tencent.com/document/product/436/10881
  289. func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *ObjectCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
  290. surl := strings.SplitN(sourceURL, "/", 2)
  291. if len(surl) < 2 {
  292. return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
  293. }
  294. var u string
  295. if len(id) == 1 {
  296. u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
  297. } else if len(id) == 0 {
  298. u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
  299. } else {
  300. return nil, nil, errors.New("wrong params")
  301. }
  302. var res ObjectCopyResult
  303. copyOpt := &ObjectCopyOptions{
  304. &ObjectCopyHeaderOptions{},
  305. &ACLHeaderOptions{},
  306. }
  307. if opt != nil {
  308. if opt.ObjectCopyHeaderOptions != nil {
  309. *copyOpt.ObjectCopyHeaderOptions = *opt.ObjectCopyHeaderOptions
  310. }
  311. if opt.ACLHeaderOptions != nil {
  312. *copyOpt.ACLHeaderOptions = *opt.ACLHeaderOptions
  313. }
  314. }
  315. copyOpt.XCosCopySource = u
  316. sendOpt := sendOptions{
  317. baseURL: s.client.BaseURL.BucketURL,
  318. uri: "/" + encodeURIComponent(name),
  319. method: http.MethodPut,
  320. body: nil,
  321. optHeader: copyOpt,
  322. result: &res,
  323. }
  324. resp, err := s.client.doRetry(ctx, &sendOpt)
  325. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  326. if err == nil && resp.StatusCode == 200 {
  327. if res.ETag == "" {
  328. return &res, resp, errors.New("response 200 OK, but body contains an error")
  329. }
  330. }
  331. return &res, resp, err
  332. }
  333. type ObjectDeleteOptions struct {
  334. // SSE-C
  335. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  336. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  337. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  338. //兼容其他自定义头部
  339. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  340. VersionId string `header:"-" url:"VersionId,omitempty" xml:"-"`
  341. }
  342. // Delete Object请求可以将一个文件(Object)删除。
  343. //
  344. // https://www.qcloud.com/document/product/436/7743
  345. func (s *ObjectService) Delete(ctx context.Context, name string, opt ...*ObjectDeleteOptions) (*Response, error) {
  346. var optHeader *ObjectDeleteOptions
  347. // When use "" string might call the delete bucket interface
  348. if len(name) == 0 {
  349. return nil, errors.New("empty object name")
  350. }
  351. if len(opt) > 0 {
  352. optHeader = opt[0]
  353. }
  354. sendOpt := sendOptions{
  355. baseURL: s.client.BaseURL.BucketURL,
  356. uri: "/" + encodeURIComponent(name),
  357. method: http.MethodDelete,
  358. optHeader: optHeader,
  359. optQuery: optHeader,
  360. }
  361. resp, err := s.client.doRetry(ctx, &sendOpt)
  362. return resp, err
  363. }
  364. // ObjectHeadOptions is the option of HeadObject
  365. type ObjectHeadOptions struct {
  366. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  367. // SSE-C
  368. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  369. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  370. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  371. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  372. }
  373. // Head Object请求可以取回对应Object的元数据,Head的权限与Get的权限一致
  374. //
  375. // https://www.qcloud.com/document/product/436/7745
  376. func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOptions, id ...string) (*Response, error) {
  377. var u string
  378. if len(id) == 1 {
  379. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  380. } else if len(id) == 0 {
  381. u = "/" + encodeURIComponent(name)
  382. } else {
  383. return nil, errors.New("wrong params")
  384. }
  385. sendOpt := sendOptions{
  386. baseURL: s.client.BaseURL.BucketURL,
  387. uri: u,
  388. method: http.MethodHead,
  389. optHeader: opt,
  390. }
  391. resp, err := s.client.doRetry(ctx, &sendOpt)
  392. if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
  393. resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
  394. }
  395. return resp, err
  396. }
  397. // ObjectOptionsOptions is the option of object options
  398. type ObjectOptionsOptions struct {
  399. Origin string `url:"-" header:"Origin"`
  400. AccessControlRequestMethod string `url:"-" header:"Access-Control-Request-Method"`
  401. AccessControlRequestHeaders string `url:"-" header:"Access-Control-Request-Headers,omitempty"`
  402. }
  403. // Options Object请求实现跨域访问的预请求。即发出一个 OPTIONS 请求给服务器以确认是否可以进行跨域操作。
  404. //
  405. // 当CORS配置不存在时,请求返回403 Forbidden。
  406. //
  407. // https://www.qcloud.com/document/product/436/8288
  408. func (s *ObjectService) Options(ctx context.Context, name string, opt *ObjectOptionsOptions) (*Response, error) {
  409. sendOpt := sendOptions{
  410. baseURL: s.client.BaseURL.BucketURL,
  411. uri: "/" + encodeURIComponent(name),
  412. method: http.MethodOptions,
  413. optHeader: opt,
  414. }
  415. resp, err := s.client.send(ctx, &sendOpt)
  416. return resp, err
  417. }
  418. // CASJobParameters support three way: Standard(in 35 hours), Expedited(quick way, in 15 mins), Bulk(in 5-12 hours_
  419. type CASJobParameters struct {
  420. Tier string `xml:"Tier" header:"-" url:"-"`
  421. }
  422. // ObjectRestoreOptions is the option of object restore
  423. type ObjectRestoreOptions struct {
  424. XMLName xml.Name `xml:"RestoreRequest" header:"-" url:"-"`
  425. Days int `xml:"Days" header:"-" url:"-"`
  426. Tier *CASJobParameters `xml:"CASJobParameters" header:"-" url:"-"`
  427. XOptionHeader *http.Header `xml:"-" header:",omitempty" url:"-"`
  428. }
  429. // PutRestore API can recover an object of type archived by COS archive.
  430. //
  431. // https://cloud.tencent.com/document/product/436/12633
  432. func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *ObjectRestoreOptions) (*Response, error) {
  433. u := fmt.Sprintf("/%s?restore", encodeURIComponent(name))
  434. sendOpt := sendOptions{
  435. baseURL: s.client.BaseURL.BucketURL,
  436. uri: u,
  437. method: http.MethodPost,
  438. body: opt,
  439. optHeader: opt,
  440. }
  441. resp, err := s.client.doRetry(ctx, &sendOpt)
  442. return resp, err
  443. }
  444. // Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
  445. // 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
  446. //
  447. // 文件属性可以在Head Object操作中被查询到,当您发起Head Object请求时,会返回自定义Header『x-cos-object-type』,该Header只有两个枚举值:Normal或者Appendable。
  448. //
  449. // 追加上传建议文件大小1M - 5G。如果position的值和当前Object的长度不致,COS会返回409错误。
  450. // 如果Append一个Normal的Object,COS会返回409 ObjectNotAppendable。
  451. //
  452. // Appendable的文件不可以被复制,不参与版本管理,不参与生命周期管理,不可跨区域复制。
  453. //
  454. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  455. //
  456. // https://www.qcloud.com/document/product/436/7741
  457. func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (int, *Response, error) {
  458. res := position
  459. if r == nil {
  460. return res, nil, fmt.Errorf("reader is nil")
  461. }
  462. if err := CheckReaderLen(r); err != nil {
  463. return res, nil, err
  464. }
  465. opt = CloneObjectPutOptions(opt)
  466. totalBytes, err := GetReaderLen(r)
  467. if err != nil && opt != nil && opt.Listener != nil {
  468. if opt.ContentLength == 0 {
  469. return res, nil, err
  470. }
  471. totalBytes = opt.ContentLength
  472. }
  473. if err == nil {
  474. // 与 go http 保持一致, 非bytes.Buffer/bytes.Reader/strings.Reader需用户指定ContentLength
  475. if opt != nil && opt.ContentLength == 0 && IsLenReader(r) {
  476. opt.ContentLength = totalBytes
  477. }
  478. }
  479. reader := TeeReader(r, nil, totalBytes, nil)
  480. if s.client.Conf.EnableCRC {
  481. reader.writer = md5.New() // MD5校验
  482. reader.disableCheckSum = true
  483. }
  484. if opt != nil && opt.Listener != nil {
  485. reader.listener = opt.Listener
  486. }
  487. u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
  488. sendOpt := sendOptions{
  489. baseURL: s.client.BaseURL.BucketURL,
  490. uri: u,
  491. method: http.MethodPost,
  492. optHeader: opt,
  493. body: reader,
  494. }
  495. resp, err := s.client.send(ctx, &sendOpt)
  496. if err == nil {
  497. // 数据校验
  498. if s.client.Conf.EnableCRC && reader.writer != nil {
  499. wanted := hex.EncodeToString(reader.Sum())
  500. if wanted != resp.Header.Get("x-cos-content-sha1") {
  501. return res, resp, fmt.Errorf("append verification failed, want:%v, return:%v", wanted, resp.Header.Get("x-cos-content-sha1"))
  502. }
  503. }
  504. np, err := strconv.ParseInt(resp.Header.Get("x-cos-next-append-position"), 10, 64)
  505. return int(np), resp, err
  506. }
  507. return res, resp, err
  508. }
  509. // ObjectDeleteMultiOptions is the option of DeleteMulti
  510. type ObjectDeleteMultiOptions struct {
  511. XMLName xml.Name `xml:"Delete" header:"-"`
  512. Quiet bool `xml:"Quiet" header:"-"`
  513. Objects []Object `xml:"Object" header:"-"`
  514. //XCosSha1 string `xml:"-" header:"x-cos-sha1"`
  515. }
  516. // ObjectDeleteMultiResult is the result of DeleteMulti
  517. type ObjectDeleteMultiResult struct {
  518. XMLName xml.Name `xml:"DeleteResult"`
  519. DeletedObjects []Object `xml:"Deleted,omitempty"`
  520. Errors []struct {
  521. Key string `xml:",omitempty"`
  522. Code string `xml:",omitempty"`
  523. Message string `xml:",omitempty"`
  524. VersionId string `xml:",omitempty"`
  525. } `xml:"Error,omitempty"`
  526. }
  527. // DeleteMulti 请求实现批量删除文件,最大支持单次删除1000个文件。
  528. // 对于返回结果,COS提供Verbose和Quiet两种结果模式。Verbose模式将返回每个Object的删除结果;
  529. // Quiet模式只返回报错的Object信息。
  530. // https://www.qcloud.com/document/product/436/8289
  531. func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiOptions) (*ObjectDeleteMultiResult, *Response, error) {
  532. var res ObjectDeleteMultiResult
  533. sendOpt := sendOptions{
  534. baseURL: s.client.BaseURL.BucketURL,
  535. uri: "/?delete",
  536. method: http.MethodPost,
  537. body: opt,
  538. result: &res,
  539. }
  540. resp, err := s.client.doRetry(ctx, &sendOpt)
  541. return &res, resp, err
  542. }
  543. // Object is the meta info of the object
  544. type Object struct {
  545. Key string `xml:",omitempty"`
  546. ETag string `xml:",omitempty"`
  547. Size int64 `xml:",omitempty"`
  548. PartNumber int `xml:",omitempty"`
  549. LastModified string `xml:",omitempty"`
  550. StorageClass string `xml:",omitempty"`
  551. Owner *Owner `xml:",omitempty"`
  552. VersionId string `xml:",omitempty"`
  553. }
  554. // MultiUploadOptions is the option of the multiupload,
  555. // ThreadPoolSize default is one
  556. type MultiUploadOptions struct {
  557. OptIni *InitiateMultipartUploadOptions
  558. PartSize int64
  559. ThreadPoolSize int
  560. CheckPoint bool
  561. DisableChecksum bool
  562. }
  563. type MultiDownloadOptions struct {
  564. Opt *ObjectGetOptions
  565. PartSize int64
  566. ThreadPoolSize int
  567. CheckPoint bool
  568. CheckPointFile string
  569. DisableChecksum bool
  570. }
  571. type MultiDownloadCPInfo struct {
  572. Size int64 `json:"contentLength,omitempty"`
  573. ETag string `json:"eTag,omitempty"`
  574. CRC64 string `json:"crc64ecma,omitempty"`
  575. LastModified string `json:"lastModified,omitempty"`
  576. DownloadedBlocks []DownloadedBlock `json:"downloadedBlocks,omitempty"`
  577. }
  578. type DownloadedBlock struct {
  579. From int64 `json:"from,omitempty"`
  580. To int64 `json:"to,omitempty"`
  581. }
  582. type Chunk struct {
  583. Number int
  584. OffSet int64
  585. Size int64
  586. Done bool
  587. ETag string
  588. }
  589. // jobs
  590. type Jobs struct {
  591. Name string
  592. UploadId string
  593. FilePath string
  594. RetryTimes int
  595. VersionId []string
  596. Chunk Chunk
  597. Data io.Reader
  598. Opt *ObjectUploadPartOptions
  599. DownOpt *ObjectGetOptions
  600. }
  601. type Results struct {
  602. PartNumber int
  603. Resp *Response
  604. err error
  605. }
  606. func LimitReadCloser(r io.Reader, n int64) io.Reader {
  607. var lc LimitedReadCloser
  608. lc.R = r
  609. lc.N = n
  610. return &lc
  611. }
  612. type LimitedReadCloser struct {
  613. io.LimitedReader
  614. }
  615. func (lc *LimitedReadCloser) Close() error {
  616. if r, ok := lc.R.(io.ReadCloser); ok {
  617. return r.Close()
  618. }
  619. return nil
  620. }
  621. type DiscardReadCloser struct {
  622. RC io.ReadCloser
  623. Discard int
  624. }
  625. func (drc *DiscardReadCloser) Read(data []byte) (int, error) {
  626. n, err := drc.RC.Read(data)
  627. if drc.Discard == 0 || n <= 0 {
  628. return n, err
  629. }
  630. if n <= drc.Discard {
  631. drc.Discard -= n
  632. return 0, err
  633. }
  634. realLen := n - drc.Discard
  635. copy(data[0:realLen], data[drc.Discard:n])
  636. drc.Discard = 0
  637. return realLen, err
  638. }
  639. func (drc *DiscardReadCloser) Close() error {
  640. if rc, ok := drc.RC.(io.ReadCloser); ok {
  641. return rc.Close()
  642. }
  643. return nil
  644. }
  645. func worker(ctx context.Context, s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  646. for j := range jobs {
  647. j.Opt.ContentLength = j.Chunk.Size
  648. rt := j.RetryTimes
  649. for {
  650. // http.Request.Body can be Closed in request
  651. fd, err := os.Open(j.FilePath)
  652. var res Results
  653. if err != nil {
  654. res.err = err
  655. res.PartNumber = j.Chunk.Number
  656. res.Resp = nil
  657. results <- &res
  658. break
  659. }
  660. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  661. resp, err := s.UploadPart(ctx, j.Name, j.UploadId, j.Chunk.Number,
  662. LimitReadCloser(fd, j.Chunk.Size), j.Opt)
  663. res.PartNumber = j.Chunk.Number
  664. res.Resp = resp
  665. res.err = err
  666. if err != nil {
  667. rt--
  668. if rt == 0 {
  669. results <- &res
  670. break
  671. }
  672. time.Sleep(time.Millisecond)
  673. continue
  674. }
  675. results <- &res
  676. break
  677. }
  678. }
  679. }
  680. func downloadWorker(ctx context.Context, s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  681. for j := range jobs {
  682. opt := &RangeOptions{
  683. HasStart: true,
  684. HasEnd: true,
  685. Start: j.Chunk.OffSet,
  686. End: j.Chunk.OffSet + j.Chunk.Size - 1,
  687. }
  688. j.DownOpt.Range = FormatRangeOptions(opt)
  689. rt := j.RetryTimes
  690. for {
  691. var res Results
  692. res.PartNumber = j.Chunk.Number
  693. resp, err := s.Get(ctx, j.Name, j.DownOpt, j.VersionId...)
  694. res.err = err
  695. res.Resp = resp
  696. if err != nil {
  697. results <- &res
  698. break
  699. }
  700. fd, err := os.OpenFile(j.FilePath, os.O_WRONLY, 0660)
  701. if err != nil {
  702. resp.Body.Close()
  703. res.err = err
  704. results <- &res
  705. break
  706. }
  707. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  708. n, err := io.Copy(fd, LimitReadCloser(resp.Body, j.Chunk.Size))
  709. if n != j.Chunk.Size || err != nil {
  710. fd.Close()
  711. resp.Body.Close()
  712. rt--
  713. if rt == 0 {
  714. res.err = fmt.Errorf("io.Copy Failed, nread:%v, want:%v, err:%v", n, j.Chunk.Size, err)
  715. results <- &res
  716. break
  717. }
  718. time.Sleep(time.Millisecond)
  719. continue
  720. }
  721. fd.Close()
  722. resp.Body.Close()
  723. results <- &res
  724. break
  725. }
  726. }
  727. }
  728. func DividePart(fileSize int64, last int) (int64, int64) {
  729. partSize := int64(last * 1024 * 1024)
  730. partNum := fileSize / partSize
  731. for partNum >= 10000 {
  732. partSize = partSize * 2
  733. partNum = fileSize / partSize
  734. }
  735. return partNum, partSize
  736. }
  737. func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int, error) {
  738. if filePath == "" {
  739. return 0, nil, 0, errors.New("filePath invalid")
  740. }
  741. file, err := os.Open(filePath)
  742. if err != nil {
  743. return 0, nil, 0, err
  744. }
  745. defer file.Close()
  746. stat, err := file.Stat()
  747. if err != nil {
  748. return 0, nil, 0, err
  749. }
  750. var partNum int64
  751. if partSize > 0 {
  752. if partSize < 1024*1024 {
  753. return 0, nil, 0, errors.New("partSize>=1048576 is required")
  754. }
  755. partNum = stat.Size() / partSize
  756. if partNum >= 10000 {
  757. return 0, nil, 0, errors.New("Too many parts, out of 10000")
  758. }
  759. } else {
  760. partNum, partSize = DividePart(stat.Size(), 16)
  761. }
  762. var chunks []Chunk
  763. var chunk = Chunk{}
  764. for i := int64(0); i < partNum; i++ {
  765. chunk.Number = int(i + 1)
  766. chunk.OffSet = i * partSize
  767. chunk.Size = partSize
  768. chunks = append(chunks, chunk)
  769. }
  770. if stat.Size()%partSize > 0 {
  771. chunk.Number = len(chunks) + 1
  772. chunk.OffSet = int64(len(chunks)) * partSize
  773. chunk.Size = stat.Size() % partSize
  774. chunks = append(chunks, chunk)
  775. partNum++
  776. }
  777. return int64(stat.Size()), chunks, int(partNum), nil
  778. }
  779. func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) {
  780. opt := &ObjectListUploadsOptions{
  781. Prefix: name,
  782. EncodingType: "url",
  783. }
  784. res, _, err := s.ListUploads(ctx, opt)
  785. if err != nil {
  786. return "", err
  787. }
  788. if len(res.Upload) == 0 {
  789. return "", nil
  790. }
  791. last := len(res.Upload) - 1
  792. for last >= 0 {
  793. decodeKey, _ := decodeURIComponent(res.Upload[last].Key)
  794. if decodeKey == name {
  795. return decodeURIComponent(res.Upload[last].UploadID)
  796. }
  797. last = last - 1
  798. }
  799. return "", nil
  800. }
  801. func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error {
  802. var uploadedParts []Object
  803. isTruncated := true
  804. opt := &ObjectListPartsOptions{
  805. EncodingType: "url",
  806. }
  807. for isTruncated {
  808. res, _, err := s.ListParts(ctx, name, UploadID, opt)
  809. if err != nil {
  810. return err
  811. }
  812. if len(res.Parts) > 0 {
  813. uploadedParts = append(uploadedParts, res.Parts...)
  814. }
  815. isTruncated = res.IsTruncated
  816. opt.PartNumberMarker = res.NextPartNumberMarker
  817. }
  818. fd, err := os.Open(filepath)
  819. if err != nil {
  820. return err
  821. }
  822. defer fd.Close()
  823. // 某个分块出错, 重置chunks
  824. ret := func(e error) error {
  825. for i, _ := range chunks {
  826. chunks[i].Done = false
  827. chunks[i].ETag = ""
  828. }
  829. return e
  830. }
  831. for _, part := range uploadedParts {
  832. partNumber := part.PartNumber
  833. if partNumber > partNum {
  834. return ret(errors.New("Part Number is not consistent"))
  835. }
  836. partNumber = partNumber - 1
  837. fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET)
  838. bs, err := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size))
  839. if err != nil {
  840. return ret(err)
  841. }
  842. localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs))
  843. if localMD5 != part.ETag {
  844. return ret(errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber)))
  845. }
  846. chunks[partNumber].Done = true
  847. chunks[partNumber].ETag = part.ETag
  848. }
  849. return nil
  850. }
  851. // MultiUpload/Upload 为高级upload接口,并发分块上传
  852. //
  853. // 当 partSize > 0 时,由调用者指定分块大小,否则由 SDK 自动切分,单位为MB
  854. // 由调用者指定分块大小时,请确认分块数量不超过10000
  855. //
  856. func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  857. return s.Upload(ctx, name, filepath, opt)
  858. }
  859. func (s *ObjectService) Upload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  860. if opt == nil {
  861. opt = &MultiUploadOptions{}
  862. }
  863. var localcrc uint64
  864. // 1.Get the file chunk
  865. totalBytes, chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize*1024*1024)
  866. if err != nil {
  867. return nil, nil, err
  868. }
  869. // 校验
  870. if s.client.Conf.EnableCRC && !opt.DisableChecksum {
  871. fd, err := os.Open(filepath)
  872. if err != nil {
  873. return nil, nil, err
  874. }
  875. defer fd.Close()
  876. localcrc, err = calCRC64(fd)
  877. if err != nil {
  878. return nil, nil, err
  879. }
  880. }
  881. // filesize=0 , use simple upload
  882. if partNum == 0 || partNum == 1 {
  883. var opt0 *ObjectPutOptions
  884. if opt.OptIni != nil {
  885. opt0 = &ObjectPutOptions{
  886. opt.OptIni.ACLHeaderOptions,
  887. opt.OptIni.ObjectPutHeaderOptions,
  888. }
  889. }
  890. rsp, err := s.PutFromFile(ctx, name, filepath, opt0)
  891. if err != nil {
  892. return nil, rsp, err
  893. }
  894. result := &CompleteMultipartUploadResult{
  895. Location: fmt.Sprintf("%s/%s", s.client.BaseURL.BucketURL, name),
  896. Key: name,
  897. ETag: rsp.Header.Get("ETag"),
  898. }
  899. if rsp != nil && s.client.Conf.EnableCRC && !opt.DisableChecksum {
  900. scoscrc := rsp.Header.Get("x-cos-hash-crc64ecma")
  901. icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64)
  902. if icoscrc != localcrc {
  903. return result, rsp, fmt.Errorf("verification failed, want:%v, return:%v", localcrc, icoscrc)
  904. }
  905. }
  906. return result, rsp, nil
  907. }
  908. var uploadID string
  909. resumableFlag := false
  910. if opt.CheckPoint {
  911. var err error
  912. uploadID, err = s.getResumableUploadID(ctx, name)
  913. if err == nil && uploadID != "" {
  914. err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum)
  915. resumableFlag = (err == nil)
  916. }
  917. }
  918. // 2.Init
  919. optini := opt.OptIni
  920. if !resumableFlag {
  921. res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
  922. if err != nil {
  923. return nil, nil, err
  924. }
  925. uploadID = res.UploadID
  926. }
  927. var poolSize int
  928. if opt.ThreadPoolSize > 0 {
  929. poolSize = opt.ThreadPoolSize
  930. } else {
  931. // Default is one
  932. poolSize = 1
  933. }
  934. chjobs := make(chan *Jobs, 100)
  935. chresults := make(chan *Results, 10000)
  936. optcom := &CompleteMultipartUploadOptions{}
  937. // 3.Start worker
  938. for w := 1; w <= poolSize; w++ {
  939. go worker(ctx, s, chjobs, chresults)
  940. }
  941. // progress started event
  942. var listener ProgressListener
  943. var consumedBytes int64
  944. if opt.OptIni != nil {
  945. if opt.OptIni.ObjectPutHeaderOptions != nil {
  946. listener = opt.OptIni.Listener
  947. }
  948. optcom.XOptionHeader, _ = deliverInitOptions(opt.OptIni)
  949. }
  950. event := newProgressEvent(ProgressStartedEvent, 0, 0, totalBytes)
  951. progressCallback(listener, event)
  952. // 4.Push jobs
  953. go func() {
  954. for _, chunk := range chunks {
  955. if chunk.Done {
  956. continue
  957. }
  958. partOpt := &ObjectUploadPartOptions{}
  959. if optini != nil && optini.ObjectPutHeaderOptions != nil {
  960. partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
  961. partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
  962. partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
  963. partOpt.XCosTrafficLimit = optini.XCosTrafficLimit
  964. }
  965. job := &Jobs{
  966. Name: name,
  967. RetryTimes: 3,
  968. FilePath: filepath,
  969. UploadId: uploadID,
  970. Chunk: chunk,
  971. Opt: partOpt,
  972. }
  973. chjobs <- job
  974. }
  975. close(chjobs)
  976. }()
  977. // 5.Recv the resp etag to complete
  978. err = nil
  979. for i := 0; i < partNum; i++ {
  980. if chunks[i].Done {
  981. optcom.Parts = append(optcom.Parts, Object{
  982. PartNumber: chunks[i].Number, ETag: chunks[i].ETag},
  983. )
  984. if err == nil {
  985. consumedBytes += chunks[i].Size
  986. event = newProgressEvent(ProgressDataEvent, chunks[i].Size, consumedBytes, totalBytes)
  987. progressCallback(listener, event)
  988. }
  989. continue
  990. }
  991. res := <-chresults
  992. // Notice one part fail can not get the etag according.
  993. if res.Resp == nil || res.err != nil {
  994. // Some part already fail, can not to get the header inside.
  995. err = fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
  996. continue
  997. }
  998. // Notice one part fail can not get the etag according.
  999. etag := res.Resp.Header.Get("ETag")
  1000. optcom.Parts = append(optcom.Parts, Object{
  1001. PartNumber: res.PartNumber, ETag: etag},
  1002. )
  1003. if err == nil {
  1004. consumedBytes += chunks[res.PartNumber-1].Size
  1005. event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes)
  1006. progressCallback(listener, event)
  1007. }
  1008. }
  1009. close(chresults)
  1010. if err != nil {
  1011. event = newProgressEvent(ProgressFailedEvent, 0, consumedBytes, totalBytes, err)
  1012. progressCallback(listener, event)
  1013. return nil, nil, err
  1014. }
  1015. sort.Sort(ObjectList(optcom.Parts))
  1016. event = newProgressEvent(ProgressCompletedEvent, 0, consumedBytes, totalBytes)
  1017. progressCallback(listener, event)
  1018. v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
  1019. if err != nil {
  1020. return v, resp, err
  1021. }
  1022. if resp != nil && s.client.Conf.EnableCRC && !opt.DisableChecksum {
  1023. scoscrc := resp.Header.Get("x-cos-hash-crc64ecma")
  1024. icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64)
  1025. if icoscrc != localcrc {
  1026. return v, resp, fmt.Errorf("verification failed, want:%v, return:%v", localcrc, icoscrc)
  1027. }
  1028. }
  1029. return v, resp, err
  1030. }
  1031. func SplitSizeIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, error) {
  1032. var partNum int64
  1033. if partSize > 0 {
  1034. if partSize < 1024*1024 {
  1035. return nil, 0, errors.New("partSize>=1048576 is required")
  1036. }
  1037. partNum = totalBytes / partSize
  1038. if partNum >= 10000 {
  1039. return nil, 0, errors.New("Too manry parts, out of 10000")
  1040. }
  1041. } else {
  1042. partNum, partSize = DividePart(totalBytes, 16)
  1043. }
  1044. var chunks []Chunk
  1045. var chunk = Chunk{}
  1046. for i := int64(0); i < partNum; i++ {
  1047. chunk.Number = int(i + 1)
  1048. chunk.OffSet = i * partSize
  1049. chunk.Size = partSize
  1050. chunks = append(chunks, chunk)
  1051. }
  1052. if totalBytes%partSize > 0 {
  1053. chunk.Number = len(chunks) + 1
  1054. chunk.OffSet = int64(len(chunks)) * partSize
  1055. chunk.Size = totalBytes % partSize
  1056. chunks = append(chunks, chunk)
  1057. partNum++
  1058. }
  1059. return chunks, int(partNum), nil
  1060. }
  1061. func (s *ObjectService) checkDownloadedParts(opt *MultiDownloadCPInfo, chfile string, chunks []Chunk) (*MultiDownloadCPInfo, bool) {
  1062. var defaultRes MultiDownloadCPInfo
  1063. defaultRes = *opt
  1064. fd, err := os.Open(chfile)
  1065. // checkpoint 文件不存在
  1066. if err != nil && os.IsNotExist(err) {
  1067. // 创建 checkpoint 文件
  1068. fd, _ = os.OpenFile(chfile, os.O_RDONLY|os.O_CREATE|os.O_TRUNC, 0660)
  1069. fd.Close()
  1070. return &defaultRes, false
  1071. }
  1072. if err != nil {
  1073. return &defaultRes, false
  1074. }
  1075. defer fd.Close()
  1076. var res MultiDownloadCPInfo
  1077. err = json.NewDecoder(fd).Decode(&res)
  1078. if err != nil {
  1079. return &defaultRes, false
  1080. }
  1081. // 与COS的文件比较
  1082. if res.CRC64 != opt.CRC64 || res.ETag != opt.ETag || res.Size != opt.Size || res.LastModified != opt.LastModified || len(res.DownloadedBlocks) == 0 {
  1083. return &defaultRes, false
  1084. }
  1085. // len(chunks) 大于1,否则为简单下载, chunks[0].Size为partSize
  1086. partSize := chunks[0].Size
  1087. for _, v := range res.DownloadedBlocks {
  1088. index := v.From / partSize
  1089. to := chunks[index].OffSet + chunks[index].Size - 1
  1090. if chunks[index].OffSet != v.From || to != v.To {
  1091. // 重置chunks
  1092. for i, _ := range chunks {
  1093. chunks[i].Done = false
  1094. }
  1095. return &defaultRes, false
  1096. }
  1097. chunks[index].Done = true
  1098. }
  1099. return &res, true
  1100. }
  1101. func (s *ObjectService) Download(ctx context.Context, name string, filepath string, opt *MultiDownloadOptions, id ...string) (*Response, error) {
  1102. // 参数校验
  1103. if opt == nil {
  1104. opt = &MultiDownloadOptions{}
  1105. }
  1106. if opt.Opt != nil && opt.Opt.Range != "" {
  1107. return nil, fmt.Errorf("Download doesn't support Range Options")
  1108. }
  1109. // 获取文件长度和CRC
  1110. var coscrc string
  1111. resp, err := s.Head(ctx, name, nil, id...)
  1112. if err != nil {
  1113. return resp, err
  1114. }
  1115. // 如果对象不存在x-cos-hash-crc64ecma,则跳过不做校验
  1116. coscrc = resp.Header.Get("x-cos-hash-crc64ecma")
  1117. strTotalBytes := resp.Header.Get("Content-Length")
  1118. totalBytes, err := strconv.ParseInt(strTotalBytes, 10, 64)
  1119. if err != nil {
  1120. return resp, err
  1121. }
  1122. // 切分
  1123. chunks, partNum, err := SplitSizeIntoChunks(totalBytes, opt.PartSize*1024*1024)
  1124. if err != nil {
  1125. return resp, err
  1126. }
  1127. // 直接下载到文件
  1128. if partNum == 0 || partNum == 1 {
  1129. rsp, err := s.GetToFile(ctx, name, filepath, opt.Opt, id...)
  1130. if err != nil {
  1131. return rsp, err
  1132. }
  1133. if coscrc != "" && s.client.Conf.EnableCRC && !opt.DisableChecksum {
  1134. icoscrc, _ := strconv.ParseUint(coscrc, 10, 64)
  1135. fd, err := os.Open(filepath)
  1136. if err != nil {
  1137. return rsp, err
  1138. }
  1139. defer fd.Close()
  1140. localcrc, err := calCRC64(fd)
  1141. if err != nil {
  1142. return rsp, err
  1143. }
  1144. if localcrc != icoscrc {
  1145. return rsp, fmt.Errorf("verification failed, want:%v, return:%v", icoscrc, localcrc)
  1146. }
  1147. }
  1148. return rsp, err
  1149. }
  1150. // 断点续载
  1151. var resumableFlag bool
  1152. var resumableInfo *MultiDownloadCPInfo
  1153. var cpfd *os.File
  1154. var cpfile string
  1155. if opt.CheckPoint {
  1156. cpInfo := &MultiDownloadCPInfo{
  1157. LastModified: resp.Header.Get("Last-Modified"),
  1158. ETag: resp.Header.Get("ETag"),
  1159. CRC64: coscrc,
  1160. Size: totalBytes,
  1161. }
  1162. cpfile = opt.CheckPointFile
  1163. if cpfile == "" {
  1164. cpfile = fmt.Sprintf("%s.cosresumabletask", filepath)
  1165. }
  1166. resumableInfo, resumableFlag = s.checkDownloadedParts(cpInfo, cpfile, chunks)
  1167. cpfd, err = os.OpenFile(cpfile, os.O_RDWR, 0660)
  1168. if err != nil {
  1169. return nil, fmt.Errorf("Open CheckPoint File[%v] Failed:%v", cpfile, err)
  1170. }
  1171. }
  1172. if !resumableFlag {
  1173. // 创建文件
  1174. nfile, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  1175. if err != nil {
  1176. if cpfd != nil {
  1177. cpfd.Close()
  1178. }
  1179. return resp, err
  1180. }
  1181. nfile.Close()
  1182. }
  1183. var poolSize int
  1184. if opt.ThreadPoolSize > 0 {
  1185. poolSize = opt.ThreadPoolSize
  1186. } else {
  1187. poolSize = 1
  1188. }
  1189. chjobs := make(chan *Jobs, 100)
  1190. chresults := make(chan *Results, 10000)
  1191. for w := 1; w <= poolSize; w++ {
  1192. go downloadWorker(ctx, s, chjobs, chresults)
  1193. }
  1194. go func() {
  1195. for _, chunk := range chunks {
  1196. if chunk.Done {
  1197. continue
  1198. }
  1199. var downOpt ObjectGetOptions
  1200. if opt.Opt != nil {
  1201. downOpt = *opt.Opt
  1202. downOpt.Listener = nil // listener need to set nil
  1203. }
  1204. job := &Jobs{
  1205. Name: name,
  1206. RetryTimes: 3,
  1207. FilePath: filepath,
  1208. Chunk: chunk,
  1209. DownOpt: &downOpt,
  1210. }
  1211. if len(id) > 0 {
  1212. job.VersionId = append(job.VersionId, id...)
  1213. }
  1214. chjobs <- job
  1215. }
  1216. close(chjobs)
  1217. }()
  1218. err = nil
  1219. for i := 0; i < partNum; i++ {
  1220. if chunks[i].Done {
  1221. continue
  1222. }
  1223. res := <-chresults
  1224. if res.Resp == nil || res.err != nil {
  1225. err = fmt.Errorf("part %d get resp Content. error: %s", res.PartNumber, res.err.Error())
  1226. continue
  1227. }
  1228. // Dump CheckPoint Info
  1229. if opt.CheckPoint {
  1230. cpfd.Truncate(0)
  1231. cpfd.Seek(0, os.SEEK_SET)
  1232. resumableInfo.DownloadedBlocks = append(resumableInfo.DownloadedBlocks, DownloadedBlock{
  1233. From: chunks[res.PartNumber-1].OffSet,
  1234. To: chunks[res.PartNumber-1].OffSet + chunks[res.PartNumber-1].Size - 1,
  1235. })
  1236. json.NewEncoder(cpfd).Encode(resumableInfo)
  1237. }
  1238. }
  1239. close(chresults)
  1240. if cpfd != nil {
  1241. cpfd.Close()
  1242. }
  1243. if err != nil {
  1244. return nil, err
  1245. }
  1246. // 下载成功,删除checkpoint文件
  1247. if opt.CheckPoint {
  1248. os.Remove(cpfile)
  1249. }
  1250. if coscrc != "" && s.client.Conf.EnableCRC && !opt.DisableChecksum {
  1251. icoscrc, _ := strconv.ParseUint(coscrc, 10, 64)
  1252. fd, err := os.Open(filepath)
  1253. if err != nil {
  1254. return resp, err
  1255. }
  1256. defer fd.Close()
  1257. localcrc, err := calCRC64(fd)
  1258. if err != nil {
  1259. return resp, err
  1260. }
  1261. if localcrc != icoscrc {
  1262. return resp, fmt.Errorf("verification failed, want:%v, return:%v", icoscrc, localcrc)
  1263. }
  1264. }
  1265. return resp, err
  1266. }
  1267. type ObjectPutTaggingOptions struct {
  1268. XMLName xml.Name `xml:"Tagging"`
  1269. TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`
  1270. }
  1271. type ObjectTaggingTag BucketTaggingTag
  1272. type ObjectGetTaggingResult ObjectPutTaggingOptions
  1273. func (s *ObjectService) PutTagging(ctx context.Context, name string, opt *ObjectPutTaggingOptions, id ...string) (*Response, error) {
  1274. var u string
  1275. if len(id) == 1 {
  1276. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  1277. } else if len(id) == 0 {
  1278. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  1279. } else {
  1280. return nil, errors.New("wrong params")
  1281. }
  1282. sendOpt := &sendOptions{
  1283. baseURL: s.client.BaseURL.BucketURL,
  1284. uri: u,
  1285. method: http.MethodPut,
  1286. body: opt,
  1287. }
  1288. resp, err := s.client.doRetry(ctx, sendOpt)
  1289. return resp, err
  1290. }
  1291. func (s *ObjectService) GetTagging(ctx context.Context, name string, id ...string) (*ObjectGetTaggingResult, *Response, error) {
  1292. var u string
  1293. if len(id) == 1 {
  1294. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  1295. } else if len(id) == 0 {
  1296. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  1297. } else {
  1298. return nil, nil, errors.New("wrong params")
  1299. }
  1300. var res ObjectGetTaggingResult
  1301. sendOpt := &sendOptions{
  1302. baseURL: s.client.BaseURL.BucketURL,
  1303. uri: u,
  1304. method: http.MethodGet,
  1305. result: &res,
  1306. }
  1307. resp, err := s.client.doRetry(ctx, sendOpt)
  1308. return &res, resp, err
  1309. }
  1310. func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...string) (*Response, error) {
  1311. var u string
  1312. if len(id) == 1 {
  1313. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  1314. } else if len(id) == 0 {
  1315. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  1316. } else {
  1317. return nil, errors.New("wrong params")
  1318. }
  1319. sendOpt := &sendOptions{
  1320. baseURL: s.client.BaseURL.BucketURL,
  1321. uri: u,
  1322. method: http.MethodDelete,
  1323. }
  1324. resp, err := s.client.doRetry(ctx, sendOpt)
  1325. return resp, err
  1326. }
  1327. type PutFetchTaskOptions struct {
  1328. Url string `json:"Url,omitempty" header:"-" xml:"-"`
  1329. Key string `json:"Key,omitempty" header:"-" xml:"-"`
  1330. MD5 string `json:"MD5,omitempty" header:"-" xml:"-"`
  1331. OnKeyExist string `json:"OnKeyExist,omitempty" header:"-" xml:"-"`
  1332. IgnoreSameKey bool `json:"IgnoreSameKey,omitempty" header:"-" xml:"-"`
  1333. SuccessCallbackUrl string `json:"SuccessCallbackUrl,omitempty" header:"-" xml:"-"`
  1334. FailureCallbackUrl string `json:"FailureCallbackUrl,omitempty" header:"-" xml:"-"`
  1335. XOptionHeader *http.Header `json:"-", xml:"-" header:"-,omitempty"`
  1336. }
  1337. type PutFetchTaskResult struct {
  1338. Code int `json:"code,omitempty"`
  1339. Message string `json:"message,omitempty"`
  1340. RequestId string `json:"request_id,omitempty"`
  1341. Data struct {
  1342. TaskId string `json:"taskId,omitempty"`
  1343. } `json:"Data,omitempty"`
  1344. }
  1345. type GetFetchTaskResult struct {
  1346. Code int `json:"code,omitempty"`
  1347. Message string `json:"message,omitempty"`
  1348. RequestId string `json:"request_id,omitempty"`
  1349. Data struct {
  1350. Code string `json:"code,omitempty"`
  1351. Message string `json:"msg,omitempty"`
  1352. Percent int `json:"percent,omitempty"`
  1353. Status string `json:"status,omitempty"`
  1354. } `json:"data,omitempty"`
  1355. }
  1356. type innerFetchTaskHeader struct {
  1357. XOptionHeader *http.Header `json:"-", xml:"-" header:"-,omitempty"`
  1358. }
  1359. func (s *ObjectService) PutFetchTask(ctx context.Context, bucket string, opt *PutFetchTaskOptions) (*PutFetchTaskResult, *Response, error) {
  1360. var buf bytes.Buffer
  1361. var res PutFetchTaskResult
  1362. if opt == nil {
  1363. opt = &PutFetchTaskOptions{}
  1364. }
  1365. header := innerFetchTaskHeader{
  1366. XOptionHeader: &http.Header{},
  1367. }
  1368. if opt.XOptionHeader != nil {
  1369. header.XOptionHeader = cloneHeader(opt.XOptionHeader)
  1370. }
  1371. header.XOptionHeader.Set("Content-Type", "application/json")
  1372. bs, err := json.Marshal(opt)
  1373. if err != nil {
  1374. return nil, nil, err
  1375. }
  1376. reader := bytes.NewBuffer(bs)
  1377. sendOpt := &sendOptions{
  1378. baseURL: s.client.BaseURL.FetchURL,
  1379. uri: fmt.Sprintf("/%s/", bucket),
  1380. method: http.MethodPost,
  1381. optHeader: &header,
  1382. body: reader,
  1383. result: &buf,
  1384. }
  1385. resp, err := s.client.send(ctx, sendOpt)
  1386. if buf.Len() > 0 {
  1387. err = json.Unmarshal(buf.Bytes(), &res)
  1388. }
  1389. return &res, resp, err
  1390. }
  1391. func (s *ObjectService) GetFetchTask(ctx context.Context, bucket string, taskid string) (*GetFetchTaskResult, *Response, error) {
  1392. var buf bytes.Buffer
  1393. var res GetFetchTaskResult
  1394. sendOpt := &sendOptions{
  1395. baseURL: s.client.BaseURL.FetchURL,
  1396. uri: fmt.Sprintf("/%s/%s", bucket, encodeURIComponent(taskid)),
  1397. method: http.MethodGet,
  1398. result: &buf,
  1399. }
  1400. resp, err := s.client.send(ctx, sendOpt)
  1401. if buf.Len() > 0 {
  1402. err = json.Unmarshal(buf.Bytes(), &res)
  1403. }
  1404. return &res, resp, err
  1405. }