You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1380 lines
42 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package cos
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/json"
  6. "encoding/xml"
  7. "errors"
  8. "fmt"
  9. "hash/crc64"
  10. "io"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "os"
  15. "sort"
  16. "strconv"
  17. "strings"
  18. "time"
  19. )
  20. // ObjectService 相关 API
  21. type ObjectService service
  22. // ObjectGetOptions is the option of GetObject
  23. type ObjectGetOptions struct {
  24. ResponseContentType string `url:"response-content-type,omitempty" header:"-"`
  25. ResponseContentLanguage string `url:"response-content-language,omitempty" header:"-"`
  26. ResponseExpires string `url:"response-expires,omitempty" header:"-"`
  27. ResponseCacheControl string `url:"response-cache-control,omitempty" header:"-"`
  28. ResponseContentDisposition string `url:"response-content-disposition,omitempty" header:"-"`
  29. ResponseContentEncoding string `url:"response-content-encoding,omitempty" header:"-"`
  30. Range string `url:"-" header:"Range,omitempty"`
  31. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  32. // SSE-C
  33. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  34. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  35. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  36. //兼容其他自定义头部
  37. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  38. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  39. // 下载进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
  40. Listener ProgressListener `header:"-" url:"-" xml:"-"`
  41. }
  42. // presignedURLTestingOptions is the opt of presigned url
  43. type presignedURLTestingOptions struct {
  44. authTime *AuthTime
  45. }
  46. // Get Object 请求可以将一个文件(Object)下载至本地。
  47. // 该操作需要对目标 Object 具有读权限或目标 Object 对所有人都开放了读权限(公有读)。
  48. //
  49. // https://www.qcloud.com/document/product/436/7753
  50. func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  51. var u string
  52. if len(id) == 1 {
  53. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  54. } else if len(id) == 0 {
  55. u = "/" + encodeURIComponent(name)
  56. } else {
  57. return nil, errors.New("wrong params")
  58. }
  59. sendOpt := sendOptions{
  60. baseURL: s.client.BaseURL.BucketURL,
  61. uri: u,
  62. method: http.MethodGet,
  63. optQuery: opt,
  64. optHeader: opt,
  65. disableCloseBody: true,
  66. }
  67. resp, err := s.client.send(ctx, &sendOpt)
  68. if opt != nil && opt.Listener != nil {
  69. if err == nil && resp != nil {
  70. if totalBytes, e := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64); e == nil {
  71. resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener)
  72. }
  73. }
  74. }
  75. return resp, err
  76. }
  77. // GetToFile download the object to local file
  78. func (s *ObjectService) GetToFile(ctx context.Context, name, localpath string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  79. resp, err := s.Get(ctx, name, opt, id...)
  80. if err != nil {
  81. return resp, err
  82. }
  83. defer resp.Body.Close()
  84. // If file exist, overwrite it
  85. fd, err := os.OpenFile(localpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  86. if err != nil {
  87. return resp, err
  88. }
  89. _, err = io.Copy(fd, resp.Body)
  90. fd.Close()
  91. if err != nil {
  92. return resp, err
  93. }
  94. return resp, nil
  95. }
  96. type PresignedURLOptions struct {
  97. Query *url.Values `xml:"-" url:"-" header:"-"`
  98. Header *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  99. }
  100. // GetPresignedURL get the object presigned to down or upload file by url
  101. func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, ak, sk string, expired time.Duration, opt interface{}) (*url.URL, error) {
  102. sendOpt := sendOptions{
  103. baseURL: s.client.BaseURL.BucketURL,
  104. uri: "/" + encodeURIComponent(name),
  105. method: httpMethod,
  106. optQuery: opt,
  107. optHeader: opt,
  108. }
  109. if popt, ok := opt.(*PresignedURLOptions); ok {
  110. qs := popt.Query.Encode()
  111. if qs != "" {
  112. sendOpt.uri = fmt.Sprintf("%s?%s", sendOpt.uri, qs)
  113. }
  114. }
  115. req, err := s.client.newRequest(ctx, sendOpt.baseURL, sendOpt.uri, sendOpt.method, sendOpt.body, sendOpt.optQuery, sendOpt.optHeader)
  116. if err != nil {
  117. return nil, err
  118. }
  119. var authTime *AuthTime
  120. if opt != nil {
  121. if opt, ok := opt.(*presignedURLTestingOptions); ok {
  122. authTime = opt.authTime
  123. }
  124. }
  125. if authTime == nil {
  126. authTime = NewAuthTime(expired)
  127. }
  128. authorization := newAuthorization(ak, sk, req, authTime)
  129. sign := encodeURIComponent(authorization, []byte{'&', '='})
  130. if req.URL.RawQuery == "" {
  131. req.URL.RawQuery = fmt.Sprintf("%s", sign)
  132. } else {
  133. req.URL.RawQuery = fmt.Sprintf("%s&%s", req.URL.RawQuery, sign)
  134. }
  135. return req.URL, nil
  136. }
  137. // ObjectPutHeaderOptions the options of header of the put object
  138. type ObjectPutHeaderOptions struct {
  139. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  140. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  141. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  142. ContentType string `header:"Content-Type,omitempty" url:"-"`
  143. ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
  144. ContentLength int64 `header:"Content-Length,omitempty" url:"-"`
  145. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  146. Expect string `header:"Expect,omitempty" url:"-"`
  147. Expires string `header:"Expires,omitempty" url:"-"`
  148. XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
  149. // 自定义的 x-cos-meta-* header
  150. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  151. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-"`
  152. // 可选值: Normal, Appendable
  153. //XCosObjectType string `header:"x-cos-object-type,omitempty" url:"-"`
  154. // Enable Server Side Encryption, Only supported: AES256
  155. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  156. // SSE-C
  157. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  158. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  159. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  160. //兼容其他自定义头部
  161. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  162. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  163. // 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
  164. Listener ProgressListener `header:"-" url:"-" xml:"-"`
  165. }
  166. // ObjectPutOptions the options of put object
  167. type ObjectPutOptions struct {
  168. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  169. *ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  170. }
  171. // Put Object请求可以将一个文件(Oject)上传至指定Bucket。
  172. //
  173. // https://www.qcloud.com/document/product/436/7749
  174. func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, uopt *ObjectPutOptions) (*Response, error) {
  175. if r == nil {
  176. return nil, fmt.Errorf("reader is nil")
  177. }
  178. if err := CheckReaderLen(r); err != nil {
  179. return nil, err
  180. }
  181. opt := CloneObjectPutOptions(uopt)
  182. totalBytes, err := GetReaderLen(r)
  183. if err != nil && opt != nil && opt.Listener != nil {
  184. if opt.ContentLength == 0 {
  185. return nil, err
  186. }
  187. totalBytes = opt.ContentLength
  188. }
  189. if err == nil {
  190. // 与 go http 保持一致, 非bytes.Buffer/bytes.Reader/strings.Reader由用户指定ContentLength, 或使用 Chunk 上传
  191. if opt != nil && opt.ContentLength == 0 && IsLenReader(r) {
  192. opt.ContentLength = totalBytes
  193. }
  194. }
  195. reader := TeeReader(r, nil, totalBytes, nil)
  196. if s.client.Conf.EnableCRC {
  197. reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA))
  198. }
  199. if opt != nil && opt.Listener != nil {
  200. reader.listener = opt.Listener
  201. }
  202. sendOpt := sendOptions{
  203. baseURL: s.client.BaseURL.BucketURL,
  204. uri: "/" + encodeURIComponent(name),
  205. method: http.MethodPut,
  206. body: reader,
  207. optHeader: opt,
  208. }
  209. resp, err := s.client.send(ctx, &sendOpt)
  210. return resp, err
  211. }
  212. // PutFromFile put object from local file
  213. // Notice that when use this put large file need set non-body of debug req/resp, otherwise will out of memory
  214. func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (resp *Response, err error) {
  215. nr := 0
  216. for nr < 3 {
  217. fd, e := os.Open(filePath)
  218. if e != nil {
  219. err = e
  220. return
  221. }
  222. resp, err = s.Put(ctx, name, fd, opt)
  223. if err != nil {
  224. nr++
  225. fd.Close()
  226. continue
  227. }
  228. fd.Close()
  229. break
  230. }
  231. return
  232. }
  233. // ObjectCopyHeaderOptions is the head option of the Copy
  234. type ObjectCopyHeaderOptions struct {
  235. // When use replace directive to update meta infos
  236. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  237. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  238. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  239. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  240. ContentType string `header:"Content-Type,omitempty" url:"-"`
  241. Expires string `header:"Expires,omitempty" url:"-"`
  242. Expect string `header:"Expect,omitempty" url:"-"`
  243. XCosMetadataDirective string `header:"x-cos-metadata-directive,omitempty" url:"-" xml:"-"`
  244. XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-" xml:"-"`
  245. XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-" xml:"-"`
  246. XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-" xml:"-"`
  247. XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-" xml:"-"`
  248. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-" xml:"-"`
  249. // 自定义的 x-cos-meta-* header
  250. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  251. XCosCopySource string `header:"x-cos-copy-source" url:"-" xml:"-"`
  252. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  253. // SSE-C
  254. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  255. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  256. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  257. XCosCopySourceSSECustomerAglo string `header:"x-cos-copy-source-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  258. XCosCopySourceSSECustomerKey string `header:"x-cos-copy-source-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  259. XCosCopySourceSSECustomerKeyMD5 string `header:"x-cos-copy-source-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  260. //兼容其他自定义头部
  261. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  262. }
  263. // ObjectCopyOptions is the option of Copy, choose header or body
  264. type ObjectCopyOptions struct {
  265. *ObjectCopyHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  266. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  267. }
  268. // ObjectCopyResult is the result of Copy
  269. type ObjectCopyResult struct {
  270. XMLName xml.Name `xml:"CopyObjectResult"`
  271. ETag string `xml:"ETag,omitempty"`
  272. LastModified string `xml:"LastModified,omitempty"`
  273. CRC64 string `xml:"CRC64,omitempty"`
  274. VersionId string `xml:"VersionId,omitempty"`
  275. }
  276. // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
  277. // 超过 5G 的文件请使用分块上传 Upload - Copy。在拷贝的过程中,文件元属性和 ACL 可以被修改。
  278. //
  279. // 用户可以通过该接口实现文件移动,文件重命名,修改文件属性和创建副本。
  280. //
  281. // 注意:在跨帐号复制的时候,需要先设置被复制文件的权限为公有读,或者对目标帐号赋权,同帐号则不需要。
  282. //
  283. // https://cloud.tencent.com/document/product/436/10881
  284. func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *ObjectCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
  285. surl := strings.SplitN(sourceURL, "/", 2)
  286. if len(surl) < 2 {
  287. return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
  288. }
  289. var u string
  290. if len(id) == 1 {
  291. u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
  292. } else if len(id) == 0 {
  293. u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
  294. } else {
  295. return nil, nil, errors.New("wrong params")
  296. }
  297. var res ObjectCopyResult
  298. copyOpt := &ObjectCopyOptions{
  299. &ObjectCopyHeaderOptions{},
  300. &ACLHeaderOptions{},
  301. }
  302. if opt != nil {
  303. if opt.ObjectCopyHeaderOptions != nil {
  304. *copyOpt.ObjectCopyHeaderOptions = *opt.ObjectCopyHeaderOptions
  305. }
  306. if opt.ACLHeaderOptions != nil {
  307. *copyOpt.ACLHeaderOptions = *opt.ACLHeaderOptions
  308. }
  309. }
  310. copyOpt.XCosCopySource = u
  311. sendOpt := sendOptions{
  312. baseURL: s.client.BaseURL.BucketURL,
  313. uri: "/" + encodeURIComponent(name),
  314. method: http.MethodPut,
  315. body: nil,
  316. optHeader: copyOpt,
  317. result: &res,
  318. }
  319. resp, err := s.client.send(ctx, &sendOpt)
  320. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  321. if err == nil && resp.StatusCode == 200 {
  322. if res.ETag == "" {
  323. return &res, resp, errors.New("response 200 OK, but body contains an error")
  324. }
  325. }
  326. return &res, resp, err
  327. }
  328. type ObjectDeleteOptions struct {
  329. // SSE-C
  330. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  331. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  332. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  333. //兼容其他自定义头部
  334. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  335. VersionId string `header:"-" url:"VersionId,omitempty" xml:"-"`
  336. }
  337. // Delete Object请求可以将一个文件(Object)删除。
  338. //
  339. // https://www.qcloud.com/document/product/436/7743
  340. func (s *ObjectService) Delete(ctx context.Context, name string, opt ...*ObjectDeleteOptions) (*Response, error) {
  341. var optHeader *ObjectDeleteOptions
  342. // When use "" string might call the delete bucket interface
  343. if len(name) == 0 {
  344. return nil, errors.New("empty object name")
  345. }
  346. if len(opt) > 0 {
  347. optHeader = opt[0]
  348. }
  349. sendOpt := sendOptions{
  350. baseURL: s.client.BaseURL.BucketURL,
  351. uri: "/" + encodeURIComponent(name),
  352. method: http.MethodDelete,
  353. optHeader: optHeader,
  354. optQuery: optHeader,
  355. }
  356. resp, err := s.client.send(ctx, &sendOpt)
  357. return resp, err
  358. }
  359. // ObjectHeadOptions is the option of HeadObject
  360. type ObjectHeadOptions struct {
  361. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  362. // SSE-C
  363. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  364. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  365. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  366. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  367. }
  368. // Head Object请求可以取回对应Object的元数据,Head的权限与Get的权限一致
  369. //
  370. // https://www.qcloud.com/document/product/436/7745
  371. func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOptions, id ...string) (*Response, error) {
  372. var u string
  373. if len(id) == 1 {
  374. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  375. } else if len(id) == 0 {
  376. u = "/" + encodeURIComponent(name)
  377. } else {
  378. return nil, errors.New("wrong params")
  379. }
  380. sendOpt := sendOptions{
  381. baseURL: s.client.BaseURL.BucketURL,
  382. uri: u,
  383. method: http.MethodHead,
  384. optHeader: opt,
  385. }
  386. resp, err := s.client.send(ctx, &sendOpt)
  387. if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
  388. resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
  389. }
  390. return resp, err
  391. }
  392. // ObjectOptionsOptions is the option of object options
  393. type ObjectOptionsOptions struct {
  394. Origin string `url:"-" header:"Origin"`
  395. AccessControlRequestMethod string `url:"-" header:"Access-Control-Request-Method"`
  396. AccessControlRequestHeaders string `url:"-" header:"Access-Control-Request-Headers,omitempty"`
  397. }
  398. // Options Object请求实现跨域访问的预请求。即发出一个 OPTIONS 请求给服务器以确认是否可以进行跨域操作。
  399. //
  400. // 当CORS配置不存在时,请求返回403 Forbidden。
  401. //
  402. // https://www.qcloud.com/document/product/436/8288
  403. func (s *ObjectService) Options(ctx context.Context, name string, opt *ObjectOptionsOptions) (*Response, error) {
  404. sendOpt := sendOptions{
  405. baseURL: s.client.BaseURL.BucketURL,
  406. uri: "/" + encodeURIComponent(name),
  407. method: http.MethodOptions,
  408. optHeader: opt,
  409. }
  410. resp, err := s.client.send(ctx, &sendOpt)
  411. return resp, err
  412. }
  413. // CASJobParameters support three way: Standard(in 35 hours), Expedited(quick way, in 15 mins), Bulk(in 5-12 hours_
  414. type CASJobParameters struct {
  415. Tier string `xml:"Tier"`
  416. }
  417. // ObjectRestoreOptions is the option of object restore
  418. type ObjectRestoreOptions struct {
  419. XMLName xml.Name `xml:"RestoreRequest"`
  420. Days int `xml:"Days"`
  421. Tier *CASJobParameters `xml:"CASJobParameters"`
  422. }
  423. // PutRestore API can recover an object of type archived by COS archive.
  424. //
  425. // https://cloud.tencent.com/document/product/436/12633
  426. func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *ObjectRestoreOptions) (*Response, error) {
  427. u := fmt.Sprintf("/%s?restore", encodeURIComponent(name))
  428. sendOpt := sendOptions{
  429. baseURL: s.client.BaseURL.BucketURL,
  430. uri: u,
  431. method: http.MethodPost,
  432. body: opt,
  433. }
  434. resp, err := s.client.send(ctx, &sendOpt)
  435. return resp, err
  436. }
  437. // TODO Append 接口在优化未开放使用
  438. //
  439. // Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
  440. // 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
  441. //
  442. // 文件属性可以在Head Object操作中被查询到,当您发起Head Object请求时,会返回自定义Header『x-cos-object-type』,该Header只有两个枚举值:Normal或者Appendable。
  443. //
  444. // 追加上传建议文件大小1M - 5G。如果position的值和当前Object的长度不致,COS会返回409错误。
  445. // 如果Append一个Normal的Object,COS会返回409 ObjectNotAppendable。
  446. //
  447. // Appendable的文件不可以被复制,不参与版本管理,不参与生命周期管理,不可跨区域复制。
  448. //
  449. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  450. //
  451. // https://www.qcloud.com/document/product/436/7741
  452. // func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  453. // u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
  454. // if position != 0{
  455. // opt = nil
  456. // }
  457. // sendOpt := sendOptions{
  458. // baseURL: s.client.BaseURL.BucketURL,
  459. // uri: u,
  460. // method: http.MethodPost,
  461. // optHeader: opt,
  462. // body: r,
  463. // }
  464. // resp, err := s.client.send(ctx, &sendOpt)
  465. // return resp, err
  466. // }
  467. // ObjectDeleteMultiOptions is the option of DeleteMulti
  468. type ObjectDeleteMultiOptions struct {
  469. XMLName xml.Name `xml:"Delete" header:"-"`
  470. Quiet bool `xml:"Quiet" header:"-"`
  471. Objects []Object `xml:"Object" header:"-"`
  472. //XCosSha1 string `xml:"-" header:"x-cos-sha1"`
  473. }
  474. // ObjectDeleteMultiResult is the result of DeleteMulti
  475. type ObjectDeleteMultiResult struct {
  476. XMLName xml.Name `xml:"DeleteResult"`
  477. DeletedObjects []Object `xml:"Deleted,omitempty"`
  478. Errors []struct {
  479. Key string `xml:",omitempty"`
  480. Code string `xml:",omitempty"`
  481. Message string `xml:",omitempty"`
  482. VersionId string `xml:",omitempty"`
  483. } `xml:"Error,omitempty"`
  484. }
  485. // DeleteMulti 请求实现批量删除文件,最大支持单次删除1000个文件。
  486. // 对于返回结果,COS提供Verbose和Quiet两种结果模式。Verbose模式将返回每个Object的删除结果;
  487. // Quiet模式只返回报错的Object信息。
  488. // https://www.qcloud.com/document/product/436/8289
  489. func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiOptions) (*ObjectDeleteMultiResult, *Response, error) {
  490. var res ObjectDeleteMultiResult
  491. sendOpt := sendOptions{
  492. baseURL: s.client.BaseURL.BucketURL,
  493. uri: "/?delete",
  494. method: http.MethodPost,
  495. body: opt,
  496. result: &res,
  497. }
  498. resp, err := s.client.send(ctx, &sendOpt)
  499. return &res, resp, err
  500. }
  501. // Object is the meta info of the object
  502. type Object struct {
  503. Key string `xml:",omitempty"`
  504. ETag string `xml:",omitempty"`
  505. Size int64 `xml:",omitempty"`
  506. PartNumber int `xml:",omitempty"`
  507. LastModified string `xml:",omitempty"`
  508. StorageClass string `xml:",omitempty"`
  509. Owner *Owner `xml:",omitempty"`
  510. VersionId string `xml:",omitempty"`
  511. }
  512. // MultiUploadOptions is the option of the multiupload,
  513. // ThreadPoolSize default is one
  514. type MultiUploadOptions struct {
  515. OptIni *InitiateMultipartUploadOptions
  516. PartSize int64
  517. ThreadPoolSize int
  518. CheckPoint bool
  519. EnableVerification bool
  520. }
  521. type MultiDownloadOptions struct {
  522. Opt *ObjectGetOptions
  523. PartSize int64
  524. ThreadPoolSize int
  525. CheckPoint bool
  526. CheckPointFile string
  527. }
  528. type MultiDownloadCPInfo struct {
  529. Size int64 `json:"contentLength,omitempty"`
  530. ETag string `json:"eTag,omitempty"`
  531. CRC64 string `json:"crc64ecma,omitempty"`
  532. LastModified string `json:"lastModified,omitempty"`
  533. DownloadedBlocks []DownloadedBlock `json:"downloadedBlocks,omitempty"`
  534. }
  535. type DownloadedBlock struct {
  536. From int64 `json:"from,omitempty"`
  537. To int64 `json:"to,omitempty"`
  538. }
  539. type Chunk struct {
  540. Number int
  541. OffSet int64
  542. Size int64
  543. Done bool
  544. ETag string
  545. }
  546. // jobs
  547. type Jobs struct {
  548. Name string
  549. UploadId string
  550. FilePath string
  551. RetryTimes int
  552. VersionId []string
  553. Chunk Chunk
  554. Data io.Reader
  555. Opt *ObjectUploadPartOptions
  556. DownOpt *ObjectGetOptions
  557. }
  558. type Results struct {
  559. PartNumber int
  560. Resp *Response
  561. err error
  562. }
  563. func LimitReadCloser(r io.Reader, n int64) io.Reader {
  564. var lc LimitedReadCloser
  565. lc.R = r
  566. lc.N = n
  567. return &lc
  568. }
  569. type LimitedReadCloser struct {
  570. io.LimitedReader
  571. }
  572. func (lc *LimitedReadCloser) Close() error {
  573. if r, ok := lc.R.(io.ReadCloser); ok {
  574. return r.Close()
  575. }
  576. return nil
  577. }
  578. type DiscardReadCloser struct {
  579. RC io.ReadCloser
  580. Discard int
  581. }
  582. func (drc *DiscardReadCloser) Read(data []byte) (int, error) {
  583. n, err := drc.RC.Read(data)
  584. if drc.Discard == 0 || n <= 0 {
  585. return n, err
  586. }
  587. if n <= drc.Discard {
  588. drc.Discard -= n
  589. return 0, err
  590. }
  591. realLen := n - drc.Discard
  592. copy(data[0:realLen], data[drc.Discard:n])
  593. drc.Discard = 0
  594. return realLen, err
  595. }
  596. func (drc *DiscardReadCloser) Close() error {
  597. if rc, ok := drc.RC.(io.ReadCloser); ok {
  598. return rc.Close()
  599. }
  600. return nil
  601. }
  602. func worker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  603. for j := range jobs {
  604. j.Opt.ContentLength = j.Chunk.Size
  605. rt := j.RetryTimes
  606. for {
  607. // http.Request.Body can be Closed in request
  608. fd, err := os.Open(j.FilePath)
  609. var res Results
  610. if err != nil {
  611. res.err = err
  612. res.PartNumber = j.Chunk.Number
  613. res.Resp = nil
  614. results <- &res
  615. break
  616. }
  617. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  618. resp, err := s.UploadPart(context.Background(), j.Name, j.UploadId, j.Chunk.Number,
  619. LimitReadCloser(fd, j.Chunk.Size), j.Opt)
  620. res.PartNumber = j.Chunk.Number
  621. res.Resp = resp
  622. res.err = err
  623. if err != nil {
  624. rt--
  625. if rt == 0 {
  626. results <- &res
  627. break
  628. }
  629. continue
  630. }
  631. results <- &res
  632. break
  633. }
  634. }
  635. }
  636. func downloadWorker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  637. for j := range jobs {
  638. opt := &RangeOptions{
  639. HasStart: true,
  640. HasEnd: true,
  641. Start: j.Chunk.OffSet,
  642. End: j.Chunk.OffSet + j.Chunk.Size - 1,
  643. }
  644. j.DownOpt.Range = FormatRangeOptions(opt)
  645. rt := j.RetryTimes
  646. for {
  647. var res Results
  648. res.PartNumber = j.Chunk.Number
  649. resp, err := s.Get(context.Background(), j.Name, j.DownOpt, j.VersionId...)
  650. res.err = err
  651. res.Resp = resp
  652. if err != nil {
  653. rt--
  654. if rt == 0 {
  655. results <- &res
  656. break
  657. }
  658. continue
  659. }
  660. defer resp.Body.Close()
  661. fd, err := os.OpenFile(j.FilePath, os.O_WRONLY, 0660)
  662. if err != nil {
  663. res.err = err
  664. results <- &res
  665. break
  666. }
  667. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  668. n, err := io.Copy(fd, LimitReadCloser(resp.Body, j.Chunk.Size))
  669. if n != j.Chunk.Size || err != nil {
  670. res.err = fmt.Errorf("io.Copy Failed, nread:%v, want:%v, err:%v", n, j.Chunk.Size, err)
  671. }
  672. fd.Close()
  673. results <- &res
  674. break
  675. }
  676. }
  677. }
  678. func DividePart(fileSize int64, last int) (int64, int64) {
  679. partSize := int64(last * 1024 * 1024)
  680. partNum := fileSize / partSize
  681. for partNum >= 10000 {
  682. partSize = partSize * 2
  683. partNum = fileSize / partSize
  684. }
  685. return partNum, partSize
  686. }
  687. func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int, error) {
  688. if filePath == "" {
  689. return 0, nil, 0, errors.New("filePath invalid")
  690. }
  691. file, err := os.Open(filePath)
  692. if err != nil {
  693. return 0, nil, 0, err
  694. }
  695. defer file.Close()
  696. stat, err := file.Stat()
  697. if err != nil {
  698. return 0, nil, 0, err
  699. }
  700. var partNum int64
  701. if partSize > 0 {
  702. if partSize < 1024*1024 {
  703. return 0, nil, 0, errors.New("partSize>=1048576 is required")
  704. }
  705. partNum = stat.Size() / partSize
  706. if partNum >= 10000 {
  707. return 0, nil, 0, errors.New("Too many parts, out of 10000")
  708. }
  709. } else {
  710. partNum, partSize = DividePart(stat.Size(), 64)
  711. }
  712. var chunks []Chunk
  713. var chunk = Chunk{}
  714. for i := int64(0); i < partNum; i++ {
  715. chunk.Number = int(i + 1)
  716. chunk.OffSet = i * partSize
  717. chunk.Size = partSize
  718. chunks = append(chunks, chunk)
  719. }
  720. if stat.Size()%partSize > 0 {
  721. chunk.Number = len(chunks) + 1
  722. chunk.OffSet = int64(len(chunks)) * partSize
  723. chunk.Size = stat.Size() % partSize
  724. chunks = append(chunks, chunk)
  725. partNum++
  726. }
  727. return int64(stat.Size()), chunks, int(partNum), nil
  728. }
  729. func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) {
  730. opt := &ObjectListUploadsOptions{
  731. Prefix: name,
  732. EncodingType: "url",
  733. }
  734. res, _, err := s.ListUploads(ctx, opt)
  735. if err != nil {
  736. return "", err
  737. }
  738. if len(res.Upload) == 0 {
  739. return "", nil
  740. }
  741. last := len(res.Upload) - 1
  742. for last >= 0 {
  743. decodeKey, _ := decodeURIComponent(res.Upload[last].Key)
  744. if decodeKey == name {
  745. return decodeURIComponent(res.Upload[last].UploadID)
  746. }
  747. last = last - 1
  748. }
  749. return "", nil
  750. }
  751. func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error {
  752. var uploadedParts []Object
  753. isTruncated := true
  754. opt := &ObjectListPartsOptions{
  755. EncodingType: "url",
  756. }
  757. for isTruncated {
  758. res, _, err := s.ListParts(ctx, name, UploadID, opt)
  759. if err != nil {
  760. return err
  761. }
  762. if len(res.Parts) > 0 {
  763. uploadedParts = append(uploadedParts, res.Parts...)
  764. }
  765. isTruncated = res.IsTruncated
  766. opt.PartNumberMarker = res.NextPartNumberMarker
  767. }
  768. fd, err := os.Open(filepath)
  769. if err != nil {
  770. return err
  771. }
  772. defer fd.Close()
  773. // 某个分块出错, 重置chunks
  774. ret := func(e error) error {
  775. for i, _ := range chunks {
  776. chunks[i].Done = false
  777. chunks[i].ETag = ""
  778. }
  779. return e
  780. }
  781. for _, part := range uploadedParts {
  782. partNumber := part.PartNumber
  783. if partNumber > partNum {
  784. return ret(errors.New("Part Number is not consistent"))
  785. }
  786. partNumber = partNumber - 1
  787. fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET)
  788. bs, err := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size))
  789. if err != nil {
  790. return ret(err)
  791. }
  792. localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs))
  793. if localMD5 != part.ETag {
  794. return ret(errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber)))
  795. }
  796. chunks[partNumber].Done = true
  797. chunks[partNumber].ETag = part.ETag
  798. }
  799. return nil
  800. }
  801. // MultiUpload/Upload 为高级upload接口,并发分块上传
  802. //
  803. // 当 partSize > 0 时,由调用者指定分块大小,否则由 SDK 自动切分,单位为MB
  804. // 由调用者指定分块大小时,请确认分块数量不超过10000
  805. //
  806. func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  807. return s.Upload(ctx, name, filepath, opt)
  808. }
  809. func (s *ObjectService) Upload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  810. if opt == nil {
  811. opt = &MultiUploadOptions{}
  812. }
  813. var localcrc uint64
  814. // 1.Get the file chunk
  815. totalBytes, chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize*1024*1024)
  816. if err != nil {
  817. return nil, nil, err
  818. }
  819. // 校验
  820. if s.client.Conf.EnableCRC {
  821. fd, err := os.Open(filepath)
  822. if err != nil {
  823. return nil, nil, err
  824. }
  825. defer fd.Close()
  826. localcrc, err = calCRC64(fd)
  827. if err != nil {
  828. return nil, nil, err
  829. }
  830. }
  831. // filesize=0 , use simple upload
  832. if partNum == 0 || partNum == 1 {
  833. var opt0 *ObjectPutOptions
  834. if opt.OptIni != nil {
  835. opt0 = &ObjectPutOptions{
  836. opt.OptIni.ACLHeaderOptions,
  837. opt.OptIni.ObjectPutHeaderOptions,
  838. }
  839. }
  840. rsp, err := s.PutFromFile(ctx, name, filepath, opt0)
  841. if err != nil {
  842. return nil, rsp, err
  843. }
  844. result := &CompleteMultipartUploadResult{
  845. Location: fmt.Sprintf("%s/%s", s.client.BaseURL.BucketURL, name),
  846. Key: name,
  847. ETag: rsp.Header.Get("ETag"),
  848. }
  849. if rsp != nil && s.client.Conf.EnableCRC {
  850. scoscrc := rsp.Header.Get("x-cos-hash-crc64ecma")
  851. icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64)
  852. if icoscrc != localcrc {
  853. return result, rsp, fmt.Errorf("verification failed, want:%v, return:%v", localcrc, icoscrc)
  854. }
  855. }
  856. return result, rsp, nil
  857. }
  858. var uploadID string
  859. resumableFlag := false
  860. if opt.CheckPoint {
  861. var err error
  862. uploadID, err = s.getResumableUploadID(ctx, name)
  863. if err == nil && uploadID != "" {
  864. err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum)
  865. resumableFlag = (err == nil)
  866. }
  867. }
  868. // 2.Init
  869. optini := opt.OptIni
  870. if !resumableFlag {
  871. res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
  872. if err != nil {
  873. return nil, nil, err
  874. }
  875. uploadID = res.UploadID
  876. }
  877. var poolSize int
  878. if opt.ThreadPoolSize > 0 {
  879. poolSize = opt.ThreadPoolSize
  880. } else {
  881. // Default is one
  882. poolSize = 1
  883. }
  884. chjobs := make(chan *Jobs, 100)
  885. chresults := make(chan *Results, 10000)
  886. optcom := &CompleteMultipartUploadOptions{}
  887. // 3.Start worker
  888. for w := 1; w <= poolSize; w++ {
  889. go worker(s, chjobs, chresults)
  890. }
  891. // progress started event
  892. var listener ProgressListener
  893. var consumedBytes int64
  894. if opt.OptIni != nil {
  895. if opt.OptIni.ObjectPutHeaderOptions != nil {
  896. listener = opt.OptIni.Listener
  897. }
  898. optcom.XOptionHeader, _ = deliverInitOptions(opt.OptIni)
  899. }
  900. event := newProgressEvent(ProgressStartedEvent, 0, 0, totalBytes)
  901. progressCallback(listener, event)
  902. // 4.Push jobs
  903. go func() {
  904. for _, chunk := range chunks {
  905. if chunk.Done {
  906. continue
  907. }
  908. partOpt := &ObjectUploadPartOptions{}
  909. if optini != nil && optini.ObjectPutHeaderOptions != nil {
  910. partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
  911. partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
  912. partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
  913. partOpt.XCosTrafficLimit = optini.XCosTrafficLimit
  914. }
  915. job := &Jobs{
  916. Name: name,
  917. RetryTimes: 3,
  918. FilePath: filepath,
  919. UploadId: uploadID,
  920. Chunk: chunk,
  921. Opt: partOpt,
  922. }
  923. chjobs <- job
  924. }
  925. close(chjobs)
  926. }()
  927. // 5.Recv the resp etag to complete
  928. err = nil
  929. for i := 0; i < partNum; i++ {
  930. if chunks[i].Done {
  931. optcom.Parts = append(optcom.Parts, Object{
  932. PartNumber: chunks[i].Number, ETag: chunks[i].ETag},
  933. )
  934. if err == nil {
  935. consumedBytes += chunks[i].Size
  936. event = newProgressEvent(ProgressDataEvent, chunks[i].Size, consumedBytes, totalBytes)
  937. progressCallback(listener, event)
  938. }
  939. continue
  940. }
  941. res := <-chresults
  942. // Notice one part fail can not get the etag according.
  943. if res.Resp == nil || res.err != nil {
  944. // Some part already fail, can not to get the header inside.
  945. err = fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
  946. continue
  947. }
  948. // Notice one part fail can not get the etag according.
  949. etag := res.Resp.Header.Get("ETag")
  950. optcom.Parts = append(optcom.Parts, Object{
  951. PartNumber: res.PartNumber, ETag: etag},
  952. )
  953. if err == nil {
  954. consumedBytes += chunks[res.PartNumber-1].Size
  955. event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes)
  956. progressCallback(listener, event)
  957. }
  958. }
  959. close(chresults)
  960. if err != nil {
  961. event = newProgressEvent(ProgressFailedEvent, 0, consumedBytes, totalBytes, err)
  962. progressCallback(listener, event)
  963. return nil, nil, err
  964. }
  965. sort.Sort(ObjectList(optcom.Parts))
  966. event = newProgressEvent(ProgressCompletedEvent, 0, consumedBytes, totalBytes)
  967. progressCallback(listener, event)
  968. v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
  969. if err != nil {
  970. return v, resp, err
  971. }
  972. if resp != nil && s.client.Conf.EnableCRC {
  973. scoscrc := resp.Header.Get("x-cos-hash-crc64ecma")
  974. icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64)
  975. if icoscrc != localcrc {
  976. return v, resp, fmt.Errorf("verification failed, want:%v, return:%v", localcrc, icoscrc)
  977. }
  978. }
  979. return v, resp, err
  980. }
  981. func SplitSizeIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, error) {
  982. var partNum int64
  983. if partSize > 0 {
  984. if partSize < 1024*1024 {
  985. return nil, 0, errors.New("partSize>=1048576 is required")
  986. }
  987. partNum = totalBytes / partSize
  988. if partNum >= 10000 {
  989. return nil, 0, errors.New("Too manry parts, out of 10000")
  990. }
  991. } else {
  992. partNum, partSize = DividePart(totalBytes, 64)
  993. }
  994. var chunks []Chunk
  995. var chunk = Chunk{}
  996. for i := int64(0); i < partNum; i++ {
  997. chunk.Number = int(i + 1)
  998. chunk.OffSet = i * partSize
  999. chunk.Size = partSize
  1000. chunks = append(chunks, chunk)
  1001. }
  1002. if totalBytes%partSize > 0 {
  1003. chunk.Number = len(chunks) + 1
  1004. chunk.OffSet = int64(len(chunks)) * partSize
  1005. chunk.Size = totalBytes % partSize
  1006. chunks = append(chunks, chunk)
  1007. partNum++
  1008. }
  1009. return chunks, int(partNum), nil
  1010. }
  1011. func (s *ObjectService) checkDownloadedParts(opt *MultiDownloadCPInfo, chfile string, chunks []Chunk) (*MultiDownloadCPInfo, bool) {
  1012. var defaultRes MultiDownloadCPInfo
  1013. defaultRes = *opt
  1014. fd, err := os.Open(chfile)
  1015. // checkpoint 文件不存在
  1016. if err != nil && os.IsNotExist(err) {
  1017. // 创建 checkpoint 文件
  1018. fd, _ = os.OpenFile(chfile, os.O_RDONLY|os.O_CREATE|os.O_TRUNC, 0660)
  1019. fd.Close()
  1020. return &defaultRes, false
  1021. }
  1022. if err != nil {
  1023. return &defaultRes, false
  1024. }
  1025. defer fd.Close()
  1026. var res MultiDownloadCPInfo
  1027. err = json.NewDecoder(fd).Decode(&res)
  1028. if err != nil {
  1029. return &defaultRes, false
  1030. }
  1031. // 与COS的文件比较
  1032. if res.CRC64 != opt.CRC64 || res.ETag != opt.ETag || res.Size != opt.Size || res.LastModified != opt.LastModified || len(res.DownloadedBlocks) == 0 {
  1033. return &defaultRes, false
  1034. }
  1035. // len(chunks) 大于1,否则为简单下载, chunks[0].Size为partSize
  1036. partSize := chunks[0].Size
  1037. for _, v := range res.DownloadedBlocks {
  1038. index := v.From / partSize
  1039. to := chunks[index].OffSet + chunks[index].Size - 1
  1040. if chunks[index].OffSet != v.From || to != v.To {
  1041. // 重置chunks
  1042. for i, _ := range chunks {
  1043. chunks[i].Done = false
  1044. }
  1045. return &defaultRes, false
  1046. }
  1047. chunks[index].Done = true
  1048. }
  1049. return &res, true
  1050. }
  1051. func (s *ObjectService) Download(ctx context.Context, name string, filepath string, opt *MultiDownloadOptions, id ...string) (*Response, error) {
  1052. // 参数校验
  1053. if opt == nil {
  1054. opt = &MultiDownloadOptions{}
  1055. }
  1056. if opt.Opt != nil && opt.Opt.Range != "" {
  1057. return nil, fmt.Errorf("Download doesn't support Range Options")
  1058. }
  1059. // 获取文件长度和CRC
  1060. var coscrc string
  1061. resp, err := s.Head(ctx, name, nil, id...)
  1062. if err != nil {
  1063. return resp, err
  1064. }
  1065. // 如果对象不存在x-cos-hash-crc64ecma,则跳过不做校验
  1066. coscrc = resp.Header.Get("x-cos-hash-crc64ecma")
  1067. strTotalBytes := resp.Header.Get("Content-Length")
  1068. totalBytes, err := strconv.ParseInt(strTotalBytes, 10, 64)
  1069. if err != nil {
  1070. return resp, err
  1071. }
  1072. // 切分
  1073. chunks, partNum, err := SplitSizeIntoChunks(totalBytes, opt.PartSize*1024*1024)
  1074. if err != nil {
  1075. return resp, err
  1076. }
  1077. // 直接下载到文件
  1078. if partNum == 0 || partNum == 1 {
  1079. rsp, err := s.GetToFile(ctx, name, filepath, opt.Opt, id...)
  1080. if err != nil {
  1081. return rsp, err
  1082. }
  1083. if coscrc != "" && s.client.Conf.EnableCRC {
  1084. icoscrc, _ := strconv.ParseUint(coscrc, 10, 64)
  1085. fd, err := os.Open(filepath)
  1086. if err != nil {
  1087. return rsp, err
  1088. }
  1089. defer fd.Close()
  1090. localcrc, err := calCRC64(fd)
  1091. if err != nil {
  1092. return rsp, err
  1093. }
  1094. if localcrc != icoscrc {
  1095. return rsp, fmt.Errorf("verification failed, want:%v, return:%v", icoscrc, localcrc)
  1096. }
  1097. }
  1098. return rsp, err
  1099. }
  1100. // 断点续载
  1101. var resumableFlag bool
  1102. var resumableInfo *MultiDownloadCPInfo
  1103. var cpfd *os.File
  1104. var cpfile string
  1105. if opt.CheckPoint {
  1106. cpInfo := &MultiDownloadCPInfo{
  1107. LastModified: resp.Header.Get("Last-Modified"),
  1108. ETag: resp.Header.Get("ETag"),
  1109. CRC64: coscrc,
  1110. Size: totalBytes,
  1111. }
  1112. cpfile = opt.CheckPointFile
  1113. if cpfile == "" {
  1114. cpfile = fmt.Sprintf("%s.cosresumabletask", filepath)
  1115. }
  1116. resumableInfo, resumableFlag = s.checkDownloadedParts(cpInfo, cpfile, chunks)
  1117. cpfd, err = os.OpenFile(cpfile, os.O_RDWR, 0660)
  1118. if err != nil {
  1119. return nil, fmt.Errorf("Open CheckPoint File[%v] Failed:%v", cpfile, err)
  1120. }
  1121. }
  1122. if !resumableFlag {
  1123. // 创建文件
  1124. nfile, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  1125. if err != nil {
  1126. if cpfd != nil {
  1127. cpfd.Close()
  1128. }
  1129. return resp, err
  1130. }
  1131. nfile.Close()
  1132. }
  1133. var poolSize int
  1134. if opt.ThreadPoolSize > 0 {
  1135. poolSize = opt.ThreadPoolSize
  1136. } else {
  1137. poolSize = 1
  1138. }
  1139. chjobs := make(chan *Jobs, 100)
  1140. chresults := make(chan *Results, 10000)
  1141. for w := 1; w <= poolSize; w++ {
  1142. go downloadWorker(s, chjobs, chresults)
  1143. }
  1144. go func() {
  1145. for _, chunk := range chunks {
  1146. if chunk.Done {
  1147. continue
  1148. }
  1149. var downOpt ObjectGetOptions
  1150. if opt.Opt != nil {
  1151. downOpt = *opt.Opt
  1152. downOpt.Listener = nil // listener need to set nil
  1153. }
  1154. job := &Jobs{
  1155. Name: name,
  1156. RetryTimes: 3,
  1157. FilePath: filepath,
  1158. Chunk: chunk,
  1159. DownOpt: &downOpt,
  1160. }
  1161. if len(id) > 0 {
  1162. job.VersionId = append(job.VersionId, id...)
  1163. }
  1164. chjobs <- job
  1165. }
  1166. close(chjobs)
  1167. }()
  1168. err = nil
  1169. for i := 0; i < partNum; i++ {
  1170. if chunks[i].Done {
  1171. continue
  1172. }
  1173. res := <-chresults
  1174. if res.Resp == nil || res.err != nil {
  1175. err = fmt.Errorf("part %d get resp Content. error: %s", res.PartNumber, res.err.Error())
  1176. continue
  1177. }
  1178. // Dump CheckPoint Info
  1179. if opt.CheckPoint {
  1180. cpfd.Truncate(0)
  1181. cpfd.Seek(0, os.SEEK_SET)
  1182. resumableInfo.DownloadedBlocks = append(resumableInfo.DownloadedBlocks, DownloadedBlock{
  1183. From: chunks[res.PartNumber-1].OffSet,
  1184. To: chunks[res.PartNumber-1].OffSet + chunks[res.PartNumber-1].Size - 1,
  1185. })
  1186. json.NewEncoder(cpfd).Encode(resumableInfo)
  1187. }
  1188. }
  1189. close(chresults)
  1190. if cpfd != nil {
  1191. cpfd.Close()
  1192. }
  1193. if err != nil {
  1194. return nil, err
  1195. }
  1196. // 下载成功,删除checkpoint文件
  1197. if opt.CheckPoint {
  1198. os.Remove(cpfile)
  1199. }
  1200. if coscrc != "" && s.client.Conf.EnableCRC {
  1201. icoscrc, _ := strconv.ParseUint(coscrc, 10, 64)
  1202. fd, err := os.Open(filepath)
  1203. if err != nil {
  1204. return resp, err
  1205. }
  1206. defer fd.Close()
  1207. localcrc, err := calCRC64(fd)
  1208. if err != nil {
  1209. return resp, err
  1210. }
  1211. if localcrc != icoscrc {
  1212. return resp, fmt.Errorf("verification failed, want:%v, return:%v", icoscrc, localcrc)
  1213. }
  1214. }
  1215. return resp, err
  1216. }
  1217. type ObjectPutTaggingOptions struct {
  1218. XMLName xml.Name `xml:"Tagging"`
  1219. TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`
  1220. }
  1221. type ObjectTaggingTag BucketTaggingTag
  1222. type ObjectGetTaggingResult ObjectPutTaggingOptions
  1223. func (s *ObjectService) PutTagging(ctx context.Context, name string, opt *ObjectPutTaggingOptions, id ...string) (*Response, error) {
  1224. var u string
  1225. if len(id) == 1 {
  1226. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  1227. } else if len(id) == 0 {
  1228. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  1229. } else {
  1230. return nil, errors.New("wrong params")
  1231. }
  1232. sendOpt := &sendOptions{
  1233. baseURL: s.client.BaseURL.BucketURL,
  1234. uri: u,
  1235. method: http.MethodPut,
  1236. body: opt,
  1237. }
  1238. resp, err := s.client.send(ctx, sendOpt)
  1239. return resp, err
  1240. }
  1241. func (s *ObjectService) GetTagging(ctx context.Context, name string, id ...string) (*ObjectGetTaggingResult, *Response, error) {
  1242. var u string
  1243. if len(id) == 1 {
  1244. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  1245. } else if len(id) == 0 {
  1246. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  1247. } else {
  1248. return nil, nil, errors.New("wrong params")
  1249. }
  1250. var res ObjectGetTaggingResult
  1251. sendOpt := &sendOptions{
  1252. baseURL: s.client.BaseURL.BucketURL,
  1253. uri: u,
  1254. method: http.MethodGet,
  1255. result: &res,
  1256. }
  1257. resp, err := s.client.send(ctx, sendOpt)
  1258. return &res, resp, err
  1259. }
  1260. func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...string) (*Response, error) {
  1261. var u string
  1262. if len(id) == 1 {
  1263. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  1264. } else if len(id) == 0 {
  1265. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  1266. } else {
  1267. return nil, errors.New("wrong params")
  1268. }
  1269. sendOpt := &sendOptions{
  1270. baseURL: s.client.BaseURL.BucketURL,
  1271. uri: u,
  1272. method: http.MethodDelete,
  1273. }
  1274. resp, err := s.client.send(ctx, sendOpt)
  1275. return resp, err
  1276. }