You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1385 lines
42 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package cos
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "encoding/json"
  6. "encoding/xml"
  7. "errors"
  8. "fmt"
  9. "hash/crc64"
  10. "io"
  11. "io/ioutil"
  12. "net/http"
  13. "net/url"
  14. "os"
  15. "sort"
  16. "strconv"
  17. "strings"
  18. "time"
  19. )
  20. // ObjectService 相关 API
  21. type ObjectService service
  22. // ObjectGetOptions is the option of GetObject
  23. type ObjectGetOptions struct {
  24. ResponseContentType string `url:"response-content-type,omitempty" header:"-"`
  25. ResponseContentLanguage string `url:"response-content-language,omitempty" header:"-"`
  26. ResponseExpires string `url:"response-expires,omitempty" header:"-"`
  27. ResponseCacheControl string `url:"response-cache-control,omitempty" header:"-"`
  28. ResponseContentDisposition string `url:"response-content-disposition,omitempty" header:"-"`
  29. ResponseContentEncoding string `url:"response-content-encoding,omitempty" header:"-"`
  30. Range string `url:"-" header:"Range,omitempty"`
  31. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  32. // SSE-C
  33. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  34. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  35. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  36. //兼容其他自定义头部
  37. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  38. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  39. // 下载进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
  40. Listener ProgressListener `header:"-" url:"-" xml:"-"`
  41. }
  42. // presignedURLTestingOptions is the opt of presigned url
  43. type presignedURLTestingOptions struct {
  44. authTime *AuthTime
  45. }
  46. // Get Object 请求可以将一个文件(Object)下载至本地。
  47. // 该操作需要对目标 Object 具有读权限或目标 Object 对所有人都开放了读权限(公有读)。
  48. //
  49. // https://www.qcloud.com/document/product/436/7753
  50. func (s *ObjectService) Get(ctx context.Context, name string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  51. var u string
  52. if len(id) == 1 {
  53. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  54. } else if len(id) == 0 {
  55. u = "/" + encodeURIComponent(name)
  56. } else {
  57. return nil, errors.New("wrong params")
  58. }
  59. sendOpt := sendOptions{
  60. baseURL: s.client.BaseURL.BucketURL,
  61. uri: u,
  62. method: http.MethodGet,
  63. optQuery: opt,
  64. optHeader: opt,
  65. disableCloseBody: true,
  66. }
  67. resp, err := s.client.send(ctx, &sendOpt)
  68. if opt != nil && opt.Listener != nil {
  69. if err == nil && resp != nil {
  70. if totalBytes, e := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64); e == nil {
  71. resp.Body = TeeReader(resp.Body, nil, totalBytes, opt.Listener)
  72. }
  73. }
  74. }
  75. return resp, err
  76. }
  77. // GetToFile download the object to local file
  78. func (s *ObjectService) GetToFile(ctx context.Context, name, localpath string, opt *ObjectGetOptions, id ...string) (*Response, error) {
  79. resp, err := s.Get(ctx, name, opt, id...)
  80. if err != nil {
  81. return resp, err
  82. }
  83. defer resp.Body.Close()
  84. // If file exist, overwrite it
  85. fd, err := os.OpenFile(localpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  86. if err != nil {
  87. return resp, err
  88. }
  89. _, err = io.Copy(fd, resp.Body)
  90. fd.Close()
  91. if err != nil {
  92. return resp, err
  93. }
  94. return resp, nil
  95. }
  96. func (s *ObjectService) GetObjectURL(name string) *url.URL {
  97. uri, _ := url.Parse("/" + encodeURIComponent(name, []byte{'/'}))
  98. return s.client.BaseURL.BucketURL.ResolveReference(uri)
  99. }
  100. type PresignedURLOptions struct {
  101. Query *url.Values `xml:"-" url:"-" header:"-"`
  102. Header *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  103. }
  104. // GetPresignedURL get the object presigned to down or upload file by url
  105. func (s *ObjectService) GetPresignedURL(ctx context.Context, httpMethod, name, ak, sk string, expired time.Duration, opt interface{}) (*url.URL, error) {
  106. sendOpt := sendOptions{
  107. baseURL: s.client.BaseURL.BucketURL,
  108. uri: "/" + encodeURIComponent(name),
  109. method: httpMethod,
  110. optQuery: opt,
  111. optHeader: opt,
  112. }
  113. if popt, ok := opt.(*PresignedURLOptions); ok {
  114. qs := popt.Query.Encode()
  115. if qs != "" {
  116. sendOpt.uri = fmt.Sprintf("%s?%s", sendOpt.uri, qs)
  117. }
  118. }
  119. req, err := s.client.newRequest(ctx, sendOpt.baseURL, sendOpt.uri, sendOpt.method, sendOpt.body, sendOpt.optQuery, sendOpt.optHeader)
  120. if err != nil {
  121. return nil, err
  122. }
  123. var authTime *AuthTime
  124. if opt != nil {
  125. if opt, ok := opt.(*presignedURLTestingOptions); ok {
  126. authTime = opt.authTime
  127. }
  128. }
  129. if authTime == nil {
  130. authTime = NewAuthTime(expired)
  131. }
  132. authorization := newAuthorization(ak, sk, req, authTime)
  133. sign := encodeURIComponent(authorization, []byte{'&', '='})
  134. if req.URL.RawQuery == "" {
  135. req.URL.RawQuery = fmt.Sprintf("%s", sign)
  136. } else {
  137. req.URL.RawQuery = fmt.Sprintf("%s&%s", req.URL.RawQuery, sign)
  138. }
  139. return req.URL, nil
  140. }
  141. // ObjectPutHeaderOptions the options of header of the put object
  142. type ObjectPutHeaderOptions struct {
  143. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  144. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  145. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  146. ContentType string `header:"Content-Type,omitempty" url:"-"`
  147. ContentMD5 string `header:"Content-MD5,omitempty" url:"-"`
  148. ContentLength int64 `header:"Content-Length,omitempty" url:"-"`
  149. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  150. Expect string `header:"Expect,omitempty" url:"-"`
  151. Expires string `header:"Expires,omitempty" url:"-"`
  152. XCosContentSHA1 string `header:"x-cos-content-sha1,omitempty" url:"-"`
  153. // 自定义的 x-cos-meta-* header
  154. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  155. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-"`
  156. // 可选值: Normal, Appendable
  157. //XCosObjectType string `header:"x-cos-object-type,omitempty" url:"-"`
  158. // Enable Server Side Encryption, Only supported: AES256
  159. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  160. // SSE-C
  161. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  162. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  163. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  164. //兼容其他自定义头部
  165. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  166. XCosTrafficLimit int `header:"x-cos-traffic-limit,omitempty" url:"-" xml:"-"`
  167. // 上传进度, ProgressCompleteEvent不能表示对应API调用成功,API是否调用成功的判断标准为返回err==nil
  168. Listener ProgressListener `header:"-" url:"-" xml:"-"`
  169. }
  170. // ObjectPutOptions the options of put object
  171. type ObjectPutOptions struct {
  172. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  173. *ObjectPutHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  174. }
  175. // Put Object请求可以将一个文件(Oject)上传至指定Bucket。
  176. //
  177. // https://www.qcloud.com/document/product/436/7749
  178. func (s *ObjectService) Put(ctx context.Context, name string, r io.Reader, uopt *ObjectPutOptions) (*Response, error) {
  179. if r == nil {
  180. return nil, fmt.Errorf("reader is nil")
  181. }
  182. if err := CheckReaderLen(r); err != nil {
  183. return nil, err
  184. }
  185. opt := CloneObjectPutOptions(uopt)
  186. totalBytes, err := GetReaderLen(r)
  187. if err != nil && opt != nil && opt.Listener != nil {
  188. if opt.ContentLength == 0 {
  189. return nil, err
  190. }
  191. totalBytes = opt.ContentLength
  192. }
  193. if err == nil {
  194. // 与 go http 保持一致, 非bytes.Buffer/bytes.Reader/strings.Reader由用户指定ContentLength, 或使用 Chunk 上传
  195. if opt != nil && opt.ContentLength == 0 && IsLenReader(r) {
  196. opt.ContentLength = totalBytes
  197. }
  198. }
  199. reader := TeeReader(r, nil, totalBytes, nil)
  200. if s.client.Conf.EnableCRC {
  201. reader.writer = crc64.New(crc64.MakeTable(crc64.ECMA))
  202. }
  203. if opt != nil && opt.Listener != nil {
  204. reader.listener = opt.Listener
  205. }
  206. sendOpt := sendOptions{
  207. baseURL: s.client.BaseURL.BucketURL,
  208. uri: "/" + encodeURIComponent(name),
  209. method: http.MethodPut,
  210. body: reader,
  211. optHeader: opt,
  212. }
  213. resp, err := s.client.send(ctx, &sendOpt)
  214. return resp, err
  215. }
  216. // PutFromFile put object from local file
  217. func (s *ObjectService) PutFromFile(ctx context.Context, name string, filePath string, opt *ObjectPutOptions) (resp *Response, err error) {
  218. nr := 0
  219. for nr < 3 {
  220. fd, e := os.Open(filePath)
  221. if e != nil {
  222. err = e
  223. return
  224. }
  225. resp, err = s.Put(ctx, name, fd, opt)
  226. if err != nil {
  227. nr++
  228. fd.Close()
  229. continue
  230. }
  231. fd.Close()
  232. break
  233. }
  234. return
  235. }
  236. // ObjectCopyHeaderOptions is the head option of the Copy
  237. type ObjectCopyHeaderOptions struct {
  238. // When use replace directive to update meta infos
  239. CacheControl string `header:"Cache-Control,omitempty" url:"-"`
  240. ContentDisposition string `header:"Content-Disposition,omitempty" url:"-"`
  241. ContentEncoding string `header:"Content-Encoding,omitempty" url:"-"`
  242. ContentLanguage string `header:"Content-Language,omitempty" url:"-"`
  243. ContentType string `header:"Content-Type,omitempty" url:"-"`
  244. Expires string `header:"Expires,omitempty" url:"-"`
  245. Expect string `header:"Expect,omitempty" url:"-"`
  246. XCosMetadataDirective string `header:"x-cos-metadata-directive,omitempty" url:"-" xml:"-"`
  247. XCosCopySourceIfModifiedSince string `header:"x-cos-copy-source-If-Modified-Since,omitempty" url:"-" xml:"-"`
  248. XCosCopySourceIfUnmodifiedSince string `header:"x-cos-copy-source-If-Unmodified-Since,omitempty" url:"-" xml:"-"`
  249. XCosCopySourceIfMatch string `header:"x-cos-copy-source-If-Match,omitempty" url:"-" xml:"-"`
  250. XCosCopySourceIfNoneMatch string `header:"x-cos-copy-source-If-None-Match,omitempty" url:"-" xml:"-"`
  251. XCosStorageClass string `header:"x-cos-storage-class,omitempty" url:"-" xml:"-"`
  252. // 自定义的 x-cos-meta-* header
  253. XCosMetaXXX *http.Header `header:"x-cos-meta-*,omitempty" url:"-"`
  254. XCosCopySource string `header:"x-cos-copy-source" url:"-" xml:"-"`
  255. XCosServerSideEncryption string `header:"x-cos-server-side-encryption,omitempty" url:"-" xml:"-"`
  256. // SSE-C
  257. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  258. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  259. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  260. XCosCopySourceSSECustomerAglo string `header:"x-cos-copy-source-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  261. XCosCopySourceSSECustomerKey string `header:"x-cos-copy-source-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  262. XCosCopySourceSSECustomerKeyMD5 string `header:"x-cos-copy-source-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  263. //兼容其他自定义头部
  264. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  265. }
  266. // ObjectCopyOptions is the option of Copy, choose header or body
  267. type ObjectCopyOptions struct {
  268. *ObjectCopyHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  269. *ACLHeaderOptions `header:",omitempty" url:"-" xml:"-"`
  270. }
  271. // ObjectCopyResult is the result of Copy
  272. type ObjectCopyResult struct {
  273. XMLName xml.Name `xml:"CopyObjectResult"`
  274. ETag string `xml:"ETag,omitempty"`
  275. LastModified string `xml:"LastModified,omitempty"`
  276. CRC64 string `xml:"CRC64,omitempty"`
  277. VersionId string `xml:"VersionId,omitempty"`
  278. }
  279. // Copy 调用 PutObjectCopy 请求实现将一个文件从源路径复制到目标路径。建议文件大小 1M 到 5G,
  280. // 超过 5G 的文件请使用分块上传 Upload - Copy。在拷贝的过程中,文件元属性和 ACL 可以被修改。
  281. //
  282. // 用户可以通过该接口实现文件移动,文件重命名,修改文件属性和创建副本。
  283. //
  284. // 注意:在跨帐号复制的时候,需要先设置被复制文件的权限为公有读,或者对目标帐号赋权,同帐号则不需要。
  285. //
  286. // https://cloud.tencent.com/document/product/436/10881
  287. func (s *ObjectService) Copy(ctx context.Context, name, sourceURL string, opt *ObjectCopyOptions, id ...string) (*ObjectCopyResult, *Response, error) {
  288. surl := strings.SplitN(sourceURL, "/", 2)
  289. if len(surl) < 2 {
  290. return nil, nil, errors.New(fmt.Sprintf("x-cos-copy-source format error: %s", sourceURL))
  291. }
  292. var u string
  293. if len(id) == 1 {
  294. u = fmt.Sprintf("%s/%s?versionId=%s", surl[0], encodeURIComponent(surl[1]), id[0])
  295. } else if len(id) == 0 {
  296. u = fmt.Sprintf("%s/%s", surl[0], encodeURIComponent(surl[1]))
  297. } else {
  298. return nil, nil, errors.New("wrong params")
  299. }
  300. var res ObjectCopyResult
  301. copyOpt := &ObjectCopyOptions{
  302. &ObjectCopyHeaderOptions{},
  303. &ACLHeaderOptions{},
  304. }
  305. if opt != nil {
  306. if opt.ObjectCopyHeaderOptions != nil {
  307. *copyOpt.ObjectCopyHeaderOptions = *opt.ObjectCopyHeaderOptions
  308. }
  309. if opt.ACLHeaderOptions != nil {
  310. *copyOpt.ACLHeaderOptions = *opt.ACLHeaderOptions
  311. }
  312. }
  313. copyOpt.XCosCopySource = u
  314. sendOpt := sendOptions{
  315. baseURL: s.client.BaseURL.BucketURL,
  316. uri: "/" + encodeURIComponent(name),
  317. method: http.MethodPut,
  318. body: nil,
  319. optHeader: copyOpt,
  320. result: &res,
  321. }
  322. resp, err := s.client.send(ctx, &sendOpt)
  323. // If the error occurs during the copy operation, the error response is embedded in the 200 OK response. This means that a 200 OK response can contain either a success or an error.
  324. if err == nil && resp.StatusCode == 200 {
  325. if res.ETag == "" {
  326. return &res, resp, errors.New("response 200 OK, but body contains an error")
  327. }
  328. }
  329. return &res, resp, err
  330. }
  331. type ObjectDeleteOptions struct {
  332. // SSE-C
  333. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  334. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  335. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  336. //兼容其他自定义头部
  337. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  338. VersionId string `header:"-" url:"VersionId,omitempty" xml:"-"`
  339. }
  340. // Delete Object请求可以将一个文件(Object)删除。
  341. //
  342. // https://www.qcloud.com/document/product/436/7743
  343. func (s *ObjectService) Delete(ctx context.Context, name string, opt ...*ObjectDeleteOptions) (*Response, error) {
  344. var optHeader *ObjectDeleteOptions
  345. // When use "" string might call the delete bucket interface
  346. if len(name) == 0 {
  347. return nil, errors.New("empty object name")
  348. }
  349. if len(opt) > 0 {
  350. optHeader = opt[0]
  351. }
  352. sendOpt := sendOptions{
  353. baseURL: s.client.BaseURL.BucketURL,
  354. uri: "/" + encodeURIComponent(name),
  355. method: http.MethodDelete,
  356. optHeader: optHeader,
  357. optQuery: optHeader,
  358. }
  359. resp, err := s.client.send(ctx, &sendOpt)
  360. return resp, err
  361. }
  362. // ObjectHeadOptions is the option of HeadObject
  363. type ObjectHeadOptions struct {
  364. IfModifiedSince string `url:"-" header:"If-Modified-Since,omitempty"`
  365. // SSE-C
  366. XCosSSECustomerAglo string `header:"x-cos-server-side-encryption-customer-algorithm,omitempty" url:"-" xml:"-"`
  367. XCosSSECustomerKey string `header:"x-cos-server-side-encryption-customer-key,omitempty" url:"-" xml:"-"`
  368. XCosSSECustomerKeyMD5 string `header:"x-cos-server-side-encryption-customer-key-MD5,omitempty" url:"-" xml:"-"`
  369. XOptionHeader *http.Header `header:"-,omitempty" url:"-" xml:"-"`
  370. }
  371. // Head Object请求可以取回对应Object的元数据,Head的权限与Get的权限一致
  372. //
  373. // https://www.qcloud.com/document/product/436/7745
  374. func (s *ObjectService) Head(ctx context.Context, name string, opt *ObjectHeadOptions, id ...string) (*Response, error) {
  375. var u string
  376. if len(id) == 1 {
  377. u = fmt.Sprintf("/%s?versionId=%s", encodeURIComponent(name), id[0])
  378. } else if len(id) == 0 {
  379. u = "/" + encodeURIComponent(name)
  380. } else {
  381. return nil, errors.New("wrong params")
  382. }
  383. sendOpt := sendOptions{
  384. baseURL: s.client.BaseURL.BucketURL,
  385. uri: u,
  386. method: http.MethodHead,
  387. optHeader: opt,
  388. }
  389. resp, err := s.client.send(ctx, &sendOpt)
  390. if resp != nil && resp.Header["X-Cos-Object-Type"] != nil && resp.Header["X-Cos-Object-Type"][0] == "appendable" {
  391. resp.Header.Add("x-cos-next-append-position", resp.Header["Content-Length"][0])
  392. }
  393. return resp, err
  394. }
  395. // ObjectOptionsOptions is the option of object options
  396. type ObjectOptionsOptions struct {
  397. Origin string `url:"-" header:"Origin"`
  398. AccessControlRequestMethod string `url:"-" header:"Access-Control-Request-Method"`
  399. AccessControlRequestHeaders string `url:"-" header:"Access-Control-Request-Headers,omitempty"`
  400. }
  401. // Options Object请求实现跨域访问的预请求。即发出一个 OPTIONS 请求给服务器以确认是否可以进行跨域操作。
  402. //
  403. // 当CORS配置不存在时,请求返回403 Forbidden。
  404. //
  405. // https://www.qcloud.com/document/product/436/8288
  406. func (s *ObjectService) Options(ctx context.Context, name string, opt *ObjectOptionsOptions) (*Response, error) {
  407. sendOpt := sendOptions{
  408. baseURL: s.client.BaseURL.BucketURL,
  409. uri: "/" + encodeURIComponent(name),
  410. method: http.MethodOptions,
  411. optHeader: opt,
  412. }
  413. resp, err := s.client.send(ctx, &sendOpt)
  414. return resp, err
  415. }
  416. // CASJobParameters support three way: Standard(in 35 hours), Expedited(quick way, in 15 mins), Bulk(in 5-12 hours_
  417. type CASJobParameters struct {
  418. Tier string `xml:"Tier" header:"-" url:"-"`
  419. }
  420. // ObjectRestoreOptions is the option of object restore
  421. type ObjectRestoreOptions struct {
  422. XMLName xml.Name `xml:"RestoreRequest" header:"-" url:"-"`
  423. Days int `xml:"Days" header:"-" url:"-"`
  424. Tier *CASJobParameters `xml:"CASJobParameters" header:"-" url:"-"`
  425. XOptionHeader *http.Header `xml:"-" header:",omitempty" url:"-"`
  426. }
  427. // PutRestore API can recover an object of type archived by COS archive.
  428. //
  429. // https://cloud.tencent.com/document/product/436/12633
  430. func (s *ObjectService) PostRestore(ctx context.Context, name string, opt *ObjectRestoreOptions) (*Response, error) {
  431. u := fmt.Sprintf("/%s?restore", encodeURIComponent(name))
  432. sendOpt := sendOptions{
  433. baseURL: s.client.BaseURL.BucketURL,
  434. uri: u,
  435. method: http.MethodPost,
  436. body: opt,
  437. optHeader: opt,
  438. }
  439. resp, err := s.client.send(ctx, &sendOpt)
  440. return resp, err
  441. }
  442. // TODO Append 接口在优化未开放使用
  443. //
  444. // Append请求可以将一个文件(Object)以分块追加的方式上传至 Bucket 中。使用Append Upload的文件必须事前被设定为Appendable。
  445. // 当Appendable的文件被执行Put Object的操作以后,文件被覆盖,属性改变为Normal。
  446. //
  447. // 文件属性可以在Head Object操作中被查询到,当您发起Head Object请求时,会返回自定义Header『x-cos-object-type』,该Header只有两个枚举值:Normal或者Appendable。
  448. //
  449. // 追加上传建议文件大小1M - 5G。如果position的值和当前Object的长度不致,COS会返回409错误。
  450. // 如果Append一个Normal的Object,COS会返回409 ObjectNotAppendable。
  451. //
  452. // Appendable的文件不可以被复制,不参与版本管理,不参与生命周期管理,不可跨区域复制。
  453. //
  454. // 当 r 不是 bytes.Buffer/bytes.Reader/strings.Reader 时,必须指定 opt.ObjectPutHeaderOptions.ContentLength
  455. //
  456. // https://www.qcloud.com/document/product/436/7741
  457. // func (s *ObjectService) Append(ctx context.Context, name string, position int, r io.Reader, opt *ObjectPutOptions) (*Response, error) {
  458. // u := fmt.Sprintf("/%s?append&position=%d", encodeURIComponent(name), position)
  459. // if position != 0{
  460. // opt = nil
  461. // }
  462. // sendOpt := sendOptions{
  463. // baseURL: s.client.BaseURL.BucketURL,
  464. // uri: u,
  465. // method: http.MethodPost,
  466. // optHeader: opt,
  467. // body: r,
  468. // }
  469. // resp, err := s.client.send(ctx, &sendOpt)
  470. // return resp, err
  471. // }
  472. // ObjectDeleteMultiOptions is the option of DeleteMulti
  473. type ObjectDeleteMultiOptions struct {
  474. XMLName xml.Name `xml:"Delete" header:"-"`
  475. Quiet bool `xml:"Quiet" header:"-"`
  476. Objects []Object `xml:"Object" header:"-"`
  477. //XCosSha1 string `xml:"-" header:"x-cos-sha1"`
  478. }
  479. // ObjectDeleteMultiResult is the result of DeleteMulti
  480. type ObjectDeleteMultiResult struct {
  481. XMLName xml.Name `xml:"DeleteResult"`
  482. DeletedObjects []Object `xml:"Deleted,omitempty"`
  483. Errors []struct {
  484. Key string `xml:",omitempty"`
  485. Code string `xml:",omitempty"`
  486. Message string `xml:",omitempty"`
  487. VersionId string `xml:",omitempty"`
  488. } `xml:"Error,omitempty"`
  489. }
  490. // DeleteMulti 请求实现批量删除文件,最大支持单次删除1000个文件。
  491. // 对于返回结果,COS提供Verbose和Quiet两种结果模式。Verbose模式将返回每个Object的删除结果;
  492. // Quiet模式只返回报错的Object信息。
  493. // https://www.qcloud.com/document/product/436/8289
  494. func (s *ObjectService) DeleteMulti(ctx context.Context, opt *ObjectDeleteMultiOptions) (*ObjectDeleteMultiResult, *Response, error) {
  495. var res ObjectDeleteMultiResult
  496. sendOpt := sendOptions{
  497. baseURL: s.client.BaseURL.BucketURL,
  498. uri: "/?delete",
  499. method: http.MethodPost,
  500. body: opt,
  501. result: &res,
  502. }
  503. resp, err := s.client.send(ctx, &sendOpt)
  504. return &res, resp, err
  505. }
  506. // Object is the meta info of the object
  507. type Object struct {
  508. Key string `xml:",omitempty"`
  509. ETag string `xml:",omitempty"`
  510. Size int64 `xml:",omitempty"`
  511. PartNumber int `xml:",omitempty"`
  512. LastModified string `xml:",omitempty"`
  513. StorageClass string `xml:",omitempty"`
  514. Owner *Owner `xml:",omitempty"`
  515. VersionId string `xml:",omitempty"`
  516. }
  517. // MultiUploadOptions is the option of the multiupload,
  518. // ThreadPoolSize default is one
  519. type MultiUploadOptions struct {
  520. OptIni *InitiateMultipartUploadOptions
  521. PartSize int64
  522. ThreadPoolSize int
  523. CheckPoint bool
  524. EnableVerification bool
  525. }
  526. type MultiDownloadOptions struct {
  527. Opt *ObjectGetOptions
  528. PartSize int64
  529. ThreadPoolSize int
  530. CheckPoint bool
  531. CheckPointFile string
  532. }
  533. type MultiDownloadCPInfo struct {
  534. Size int64 `json:"contentLength,omitempty"`
  535. ETag string `json:"eTag,omitempty"`
  536. CRC64 string `json:"crc64ecma,omitempty"`
  537. LastModified string `json:"lastModified,omitempty"`
  538. DownloadedBlocks []DownloadedBlock `json:"downloadedBlocks,omitempty"`
  539. }
  540. type DownloadedBlock struct {
  541. From int64 `json:"from,omitempty"`
  542. To int64 `json:"to,omitempty"`
  543. }
  544. type Chunk struct {
  545. Number int
  546. OffSet int64
  547. Size int64
  548. Done bool
  549. ETag string
  550. }
  551. // jobs
  552. type Jobs struct {
  553. Name string
  554. UploadId string
  555. FilePath string
  556. RetryTimes int
  557. VersionId []string
  558. Chunk Chunk
  559. Data io.Reader
  560. Opt *ObjectUploadPartOptions
  561. DownOpt *ObjectGetOptions
  562. }
  563. type Results struct {
  564. PartNumber int
  565. Resp *Response
  566. err error
  567. }
  568. func LimitReadCloser(r io.Reader, n int64) io.Reader {
  569. var lc LimitedReadCloser
  570. lc.R = r
  571. lc.N = n
  572. return &lc
  573. }
  574. type LimitedReadCloser struct {
  575. io.LimitedReader
  576. }
  577. func (lc *LimitedReadCloser) Close() error {
  578. if r, ok := lc.R.(io.ReadCloser); ok {
  579. return r.Close()
  580. }
  581. return nil
  582. }
  583. type DiscardReadCloser struct {
  584. RC io.ReadCloser
  585. Discard int
  586. }
  587. func (drc *DiscardReadCloser) Read(data []byte) (int, error) {
  588. n, err := drc.RC.Read(data)
  589. if drc.Discard == 0 || n <= 0 {
  590. return n, err
  591. }
  592. if n <= drc.Discard {
  593. drc.Discard -= n
  594. return 0, err
  595. }
  596. realLen := n - drc.Discard
  597. copy(data[0:realLen], data[drc.Discard:n])
  598. drc.Discard = 0
  599. return realLen, err
  600. }
  601. func (drc *DiscardReadCloser) Close() error {
  602. if rc, ok := drc.RC.(io.ReadCloser); ok {
  603. return rc.Close()
  604. }
  605. return nil
  606. }
  607. func worker(ctx context.Context, s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  608. for j := range jobs {
  609. j.Opt.ContentLength = j.Chunk.Size
  610. rt := j.RetryTimes
  611. for {
  612. // http.Request.Body can be Closed in request
  613. fd, err := os.Open(j.FilePath)
  614. var res Results
  615. if err != nil {
  616. res.err = err
  617. res.PartNumber = j.Chunk.Number
  618. res.Resp = nil
  619. results <- &res
  620. break
  621. }
  622. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  623. resp, err := s.UploadPart(ctx, j.Name, j.UploadId, j.Chunk.Number,
  624. LimitReadCloser(fd, j.Chunk.Size), j.Opt)
  625. res.PartNumber = j.Chunk.Number
  626. res.Resp = resp
  627. res.err = err
  628. if err != nil {
  629. rt--
  630. if rt == 0 {
  631. results <- &res
  632. break
  633. }
  634. continue
  635. }
  636. results <- &res
  637. break
  638. }
  639. }
  640. }
  641. func downloadWorker(ctx context.Context, s *ObjectService, jobs <-chan *Jobs, results chan<- *Results) {
  642. for j := range jobs {
  643. opt := &RangeOptions{
  644. HasStart: true,
  645. HasEnd: true,
  646. Start: j.Chunk.OffSet,
  647. End: j.Chunk.OffSet + j.Chunk.Size - 1,
  648. }
  649. j.DownOpt.Range = FormatRangeOptions(opt)
  650. rt := j.RetryTimes
  651. for {
  652. var res Results
  653. res.PartNumber = j.Chunk.Number
  654. resp, err := s.Get(ctx, j.Name, j.DownOpt, j.VersionId...)
  655. res.err = err
  656. res.Resp = resp
  657. if err != nil {
  658. rt--
  659. if rt == 0 {
  660. results <- &res
  661. break
  662. }
  663. continue
  664. }
  665. defer resp.Body.Close()
  666. fd, err := os.OpenFile(j.FilePath, os.O_WRONLY, 0660)
  667. if err != nil {
  668. res.err = err
  669. results <- &res
  670. break
  671. }
  672. fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
  673. n, err := io.Copy(fd, LimitReadCloser(resp.Body, j.Chunk.Size))
  674. if n != j.Chunk.Size || err != nil {
  675. res.err = fmt.Errorf("io.Copy Failed, nread:%v, want:%v, err:%v", n, j.Chunk.Size, err)
  676. }
  677. fd.Close()
  678. results <- &res
  679. break
  680. }
  681. }
  682. }
  683. func DividePart(fileSize int64, last int) (int64, int64) {
  684. partSize := int64(last * 1024 * 1024)
  685. partNum := fileSize / partSize
  686. for partNum >= 10000 {
  687. partSize = partSize * 2
  688. partNum = fileSize / partSize
  689. }
  690. return partNum, partSize
  691. }
  692. func SplitFileIntoChunks(filePath string, partSize int64) (int64, []Chunk, int, error) {
  693. if filePath == "" {
  694. return 0, nil, 0, errors.New("filePath invalid")
  695. }
  696. file, err := os.Open(filePath)
  697. if err != nil {
  698. return 0, nil, 0, err
  699. }
  700. defer file.Close()
  701. stat, err := file.Stat()
  702. if err != nil {
  703. return 0, nil, 0, err
  704. }
  705. var partNum int64
  706. if partSize > 0 {
  707. if partSize < 1024*1024 {
  708. return 0, nil, 0, errors.New("partSize>=1048576 is required")
  709. }
  710. partNum = stat.Size() / partSize
  711. if partNum >= 10000 {
  712. return 0, nil, 0, errors.New("Too many parts, out of 10000")
  713. }
  714. } else {
  715. partNum, partSize = DividePart(stat.Size(), 64)
  716. }
  717. var chunks []Chunk
  718. var chunk = Chunk{}
  719. for i := int64(0); i < partNum; i++ {
  720. chunk.Number = int(i + 1)
  721. chunk.OffSet = i * partSize
  722. chunk.Size = partSize
  723. chunks = append(chunks, chunk)
  724. }
  725. if stat.Size()%partSize > 0 {
  726. chunk.Number = len(chunks) + 1
  727. chunk.OffSet = int64(len(chunks)) * partSize
  728. chunk.Size = stat.Size() % partSize
  729. chunks = append(chunks, chunk)
  730. partNum++
  731. }
  732. return int64(stat.Size()), chunks, int(partNum), nil
  733. }
  734. func (s *ObjectService) getResumableUploadID(ctx context.Context, name string) (string, error) {
  735. opt := &ObjectListUploadsOptions{
  736. Prefix: name,
  737. EncodingType: "url",
  738. }
  739. res, _, err := s.ListUploads(ctx, opt)
  740. if err != nil {
  741. return "", err
  742. }
  743. if len(res.Upload) == 0 {
  744. return "", nil
  745. }
  746. last := len(res.Upload) - 1
  747. for last >= 0 {
  748. decodeKey, _ := decodeURIComponent(res.Upload[last].Key)
  749. if decodeKey == name {
  750. return decodeURIComponent(res.Upload[last].UploadID)
  751. }
  752. last = last - 1
  753. }
  754. return "", nil
  755. }
  756. func (s *ObjectService) checkUploadedParts(ctx context.Context, name, UploadID, filepath string, chunks []Chunk, partNum int) error {
  757. var uploadedParts []Object
  758. isTruncated := true
  759. opt := &ObjectListPartsOptions{
  760. EncodingType: "url",
  761. }
  762. for isTruncated {
  763. res, _, err := s.ListParts(ctx, name, UploadID, opt)
  764. if err != nil {
  765. return err
  766. }
  767. if len(res.Parts) > 0 {
  768. uploadedParts = append(uploadedParts, res.Parts...)
  769. }
  770. isTruncated = res.IsTruncated
  771. opt.PartNumberMarker = res.NextPartNumberMarker
  772. }
  773. fd, err := os.Open(filepath)
  774. if err != nil {
  775. return err
  776. }
  777. defer fd.Close()
  778. // 某个分块出错, 重置chunks
  779. ret := func(e error) error {
  780. for i, _ := range chunks {
  781. chunks[i].Done = false
  782. chunks[i].ETag = ""
  783. }
  784. return e
  785. }
  786. for _, part := range uploadedParts {
  787. partNumber := part.PartNumber
  788. if partNumber > partNum {
  789. return ret(errors.New("Part Number is not consistent"))
  790. }
  791. partNumber = partNumber - 1
  792. fd.Seek(chunks[partNumber].OffSet, os.SEEK_SET)
  793. bs, err := ioutil.ReadAll(io.LimitReader(fd, chunks[partNumber].Size))
  794. if err != nil {
  795. return ret(err)
  796. }
  797. localMD5 := fmt.Sprintf("\"%x\"", md5.Sum(bs))
  798. if localMD5 != part.ETag {
  799. return ret(errors.New(fmt.Sprintf("CheckSum Failed in Part[%d]", part.PartNumber)))
  800. }
  801. chunks[partNumber].Done = true
  802. chunks[partNumber].ETag = part.ETag
  803. }
  804. return nil
  805. }
  806. // MultiUpload/Upload 为高级upload接口,并发分块上传
  807. //
  808. // 当 partSize > 0 时,由调用者指定分块大小,否则由 SDK 自动切分,单位为MB
  809. // 由调用者指定分块大小时,请确认分块数量不超过10000
  810. //
  811. func (s *ObjectService) MultiUpload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  812. return s.Upload(ctx, name, filepath, opt)
  813. }
  814. func (s *ObjectService) Upload(ctx context.Context, name string, filepath string, opt *MultiUploadOptions) (*CompleteMultipartUploadResult, *Response, error) {
  815. if opt == nil {
  816. opt = &MultiUploadOptions{}
  817. }
  818. var localcrc uint64
  819. // 1.Get the file chunk
  820. totalBytes, chunks, partNum, err := SplitFileIntoChunks(filepath, opt.PartSize*1024*1024)
  821. if err != nil {
  822. return nil, nil, err
  823. }
  824. // 校验
  825. if s.client.Conf.EnableCRC {
  826. fd, err := os.Open(filepath)
  827. if err != nil {
  828. return nil, nil, err
  829. }
  830. defer fd.Close()
  831. localcrc, err = calCRC64(fd)
  832. if err != nil {
  833. return nil, nil, err
  834. }
  835. }
  836. // filesize=0 , use simple upload
  837. if partNum == 0 || partNum == 1 {
  838. var opt0 *ObjectPutOptions
  839. if opt.OptIni != nil {
  840. opt0 = &ObjectPutOptions{
  841. opt.OptIni.ACLHeaderOptions,
  842. opt.OptIni.ObjectPutHeaderOptions,
  843. }
  844. }
  845. rsp, err := s.PutFromFile(ctx, name, filepath, opt0)
  846. if err != nil {
  847. return nil, rsp, err
  848. }
  849. result := &CompleteMultipartUploadResult{
  850. Location: fmt.Sprintf("%s/%s", s.client.BaseURL.BucketURL, name),
  851. Key: name,
  852. ETag: rsp.Header.Get("ETag"),
  853. }
  854. if rsp != nil && s.client.Conf.EnableCRC {
  855. scoscrc := rsp.Header.Get("x-cos-hash-crc64ecma")
  856. icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64)
  857. if icoscrc != localcrc {
  858. return result, rsp, fmt.Errorf("verification failed, want:%v, return:%v", localcrc, icoscrc)
  859. }
  860. }
  861. return result, rsp, nil
  862. }
  863. var uploadID string
  864. resumableFlag := false
  865. if opt.CheckPoint {
  866. var err error
  867. uploadID, err = s.getResumableUploadID(ctx, name)
  868. if err == nil && uploadID != "" {
  869. err = s.checkUploadedParts(ctx, name, uploadID, filepath, chunks, partNum)
  870. resumableFlag = (err == nil)
  871. }
  872. }
  873. // 2.Init
  874. optini := opt.OptIni
  875. if !resumableFlag {
  876. res, _, err := s.InitiateMultipartUpload(ctx, name, optini)
  877. if err != nil {
  878. return nil, nil, err
  879. }
  880. uploadID = res.UploadID
  881. }
  882. var poolSize int
  883. if opt.ThreadPoolSize > 0 {
  884. poolSize = opt.ThreadPoolSize
  885. } else {
  886. // Default is one
  887. poolSize = 1
  888. }
  889. chjobs := make(chan *Jobs, 100)
  890. chresults := make(chan *Results, 10000)
  891. optcom := &CompleteMultipartUploadOptions{}
  892. // 3.Start worker
  893. for w := 1; w <= poolSize; w++ {
  894. go worker(ctx, s, chjobs, chresults)
  895. }
  896. // progress started event
  897. var listener ProgressListener
  898. var consumedBytes int64
  899. if opt.OptIni != nil {
  900. if opt.OptIni.ObjectPutHeaderOptions != nil {
  901. listener = opt.OptIni.Listener
  902. }
  903. optcom.XOptionHeader, _ = deliverInitOptions(opt.OptIni)
  904. }
  905. event := newProgressEvent(ProgressStartedEvent, 0, 0, totalBytes)
  906. progressCallback(listener, event)
  907. // 4.Push jobs
  908. go func() {
  909. for _, chunk := range chunks {
  910. if chunk.Done {
  911. continue
  912. }
  913. partOpt := &ObjectUploadPartOptions{}
  914. if optini != nil && optini.ObjectPutHeaderOptions != nil {
  915. partOpt.XCosSSECustomerAglo = optini.XCosSSECustomerAglo
  916. partOpt.XCosSSECustomerKey = optini.XCosSSECustomerKey
  917. partOpt.XCosSSECustomerKeyMD5 = optini.XCosSSECustomerKeyMD5
  918. partOpt.XCosTrafficLimit = optini.XCosTrafficLimit
  919. }
  920. job := &Jobs{
  921. Name: name,
  922. RetryTimes: 3,
  923. FilePath: filepath,
  924. UploadId: uploadID,
  925. Chunk: chunk,
  926. Opt: partOpt,
  927. }
  928. chjobs <- job
  929. }
  930. close(chjobs)
  931. }()
  932. // 5.Recv the resp etag to complete
  933. err = nil
  934. for i := 0; i < partNum; i++ {
  935. if chunks[i].Done {
  936. optcom.Parts = append(optcom.Parts, Object{
  937. PartNumber: chunks[i].Number, ETag: chunks[i].ETag},
  938. )
  939. if err == nil {
  940. consumedBytes += chunks[i].Size
  941. event = newProgressEvent(ProgressDataEvent, chunks[i].Size, consumedBytes, totalBytes)
  942. progressCallback(listener, event)
  943. }
  944. continue
  945. }
  946. res := <-chresults
  947. // Notice one part fail can not get the etag according.
  948. if res.Resp == nil || res.err != nil {
  949. // Some part already fail, can not to get the header inside.
  950. err = fmt.Errorf("UploadID %s, part %d failed to get resp content. error: %s", uploadID, res.PartNumber, res.err.Error())
  951. continue
  952. }
  953. // Notice one part fail can not get the etag according.
  954. etag := res.Resp.Header.Get("ETag")
  955. optcom.Parts = append(optcom.Parts, Object{
  956. PartNumber: res.PartNumber, ETag: etag},
  957. )
  958. if err == nil {
  959. consumedBytes += chunks[res.PartNumber-1].Size
  960. event = newProgressEvent(ProgressDataEvent, chunks[res.PartNumber-1].Size, consumedBytes, totalBytes)
  961. progressCallback(listener, event)
  962. }
  963. }
  964. close(chresults)
  965. if err != nil {
  966. event = newProgressEvent(ProgressFailedEvent, 0, consumedBytes, totalBytes, err)
  967. progressCallback(listener, event)
  968. return nil, nil, err
  969. }
  970. sort.Sort(ObjectList(optcom.Parts))
  971. event = newProgressEvent(ProgressCompletedEvent, 0, consumedBytes, totalBytes)
  972. progressCallback(listener, event)
  973. v, resp, err := s.CompleteMultipartUpload(context.Background(), name, uploadID, optcom)
  974. if err != nil {
  975. return v, resp, err
  976. }
  977. if resp != nil && s.client.Conf.EnableCRC {
  978. scoscrc := resp.Header.Get("x-cos-hash-crc64ecma")
  979. icoscrc, _ := strconv.ParseUint(scoscrc, 10, 64)
  980. if icoscrc != localcrc {
  981. return v, resp, fmt.Errorf("verification failed, want:%v, return:%v", localcrc, icoscrc)
  982. }
  983. }
  984. return v, resp, err
  985. }
  986. func SplitSizeIntoChunks(totalBytes int64, partSize int64) ([]Chunk, int, error) {
  987. var partNum int64
  988. if partSize > 0 {
  989. if partSize < 1024*1024 {
  990. return nil, 0, errors.New("partSize>=1048576 is required")
  991. }
  992. partNum = totalBytes / partSize
  993. if partNum >= 10000 {
  994. return nil, 0, errors.New("Too manry parts, out of 10000")
  995. }
  996. } else {
  997. partNum, partSize = DividePart(totalBytes, 64)
  998. }
  999. var chunks []Chunk
  1000. var chunk = Chunk{}
  1001. for i := int64(0); i < partNum; i++ {
  1002. chunk.Number = int(i + 1)
  1003. chunk.OffSet = i * partSize
  1004. chunk.Size = partSize
  1005. chunks = append(chunks, chunk)
  1006. }
  1007. if totalBytes%partSize > 0 {
  1008. chunk.Number = len(chunks) + 1
  1009. chunk.OffSet = int64(len(chunks)) * partSize
  1010. chunk.Size = totalBytes % partSize
  1011. chunks = append(chunks, chunk)
  1012. partNum++
  1013. }
  1014. return chunks, int(partNum), nil
  1015. }
  1016. func (s *ObjectService) checkDownloadedParts(opt *MultiDownloadCPInfo, chfile string, chunks []Chunk) (*MultiDownloadCPInfo, bool) {
  1017. var defaultRes MultiDownloadCPInfo
  1018. defaultRes = *opt
  1019. fd, err := os.Open(chfile)
  1020. // checkpoint 文件不存在
  1021. if err != nil && os.IsNotExist(err) {
  1022. // 创建 checkpoint 文件
  1023. fd, _ = os.OpenFile(chfile, os.O_RDONLY|os.O_CREATE|os.O_TRUNC, 0660)
  1024. fd.Close()
  1025. return &defaultRes, false
  1026. }
  1027. if err != nil {
  1028. return &defaultRes, false
  1029. }
  1030. defer fd.Close()
  1031. var res MultiDownloadCPInfo
  1032. err = json.NewDecoder(fd).Decode(&res)
  1033. if err != nil {
  1034. return &defaultRes, false
  1035. }
  1036. // 与COS的文件比较
  1037. if res.CRC64 != opt.CRC64 || res.ETag != opt.ETag || res.Size != opt.Size || res.LastModified != opt.LastModified || len(res.DownloadedBlocks) == 0 {
  1038. return &defaultRes, false
  1039. }
  1040. // len(chunks) 大于1,否则为简单下载, chunks[0].Size为partSize
  1041. partSize := chunks[0].Size
  1042. for _, v := range res.DownloadedBlocks {
  1043. index := v.From / partSize
  1044. to := chunks[index].OffSet + chunks[index].Size - 1
  1045. if chunks[index].OffSet != v.From || to != v.To {
  1046. // 重置chunks
  1047. for i, _ := range chunks {
  1048. chunks[i].Done = false
  1049. }
  1050. return &defaultRes, false
  1051. }
  1052. chunks[index].Done = true
  1053. }
  1054. return &res, true
  1055. }
  1056. func (s *ObjectService) Download(ctx context.Context, name string, filepath string, opt *MultiDownloadOptions, id ...string) (*Response, error) {
  1057. // 参数校验
  1058. if opt == nil {
  1059. opt = &MultiDownloadOptions{}
  1060. }
  1061. if opt.Opt != nil && opt.Opt.Range != "" {
  1062. return nil, fmt.Errorf("Download doesn't support Range Options")
  1063. }
  1064. // 获取文件长度和CRC
  1065. var coscrc string
  1066. resp, err := s.Head(ctx, name, nil, id...)
  1067. if err != nil {
  1068. return resp, err
  1069. }
  1070. // 如果对象不存在x-cos-hash-crc64ecma,则跳过不做校验
  1071. coscrc = resp.Header.Get("x-cos-hash-crc64ecma")
  1072. strTotalBytes := resp.Header.Get("Content-Length")
  1073. totalBytes, err := strconv.ParseInt(strTotalBytes, 10, 64)
  1074. if err != nil {
  1075. return resp, err
  1076. }
  1077. // 切分
  1078. chunks, partNum, err := SplitSizeIntoChunks(totalBytes, opt.PartSize*1024*1024)
  1079. if err != nil {
  1080. return resp, err
  1081. }
  1082. // 直接下载到文件
  1083. if partNum == 0 || partNum == 1 {
  1084. rsp, err := s.GetToFile(ctx, name, filepath, opt.Opt, id...)
  1085. if err != nil {
  1086. return rsp, err
  1087. }
  1088. if coscrc != "" && s.client.Conf.EnableCRC {
  1089. icoscrc, _ := strconv.ParseUint(coscrc, 10, 64)
  1090. fd, err := os.Open(filepath)
  1091. if err != nil {
  1092. return rsp, err
  1093. }
  1094. defer fd.Close()
  1095. localcrc, err := calCRC64(fd)
  1096. if err != nil {
  1097. return rsp, err
  1098. }
  1099. if localcrc != icoscrc {
  1100. return rsp, fmt.Errorf("verification failed, want:%v, return:%v", icoscrc, localcrc)
  1101. }
  1102. }
  1103. return rsp, err
  1104. }
  1105. // 断点续载
  1106. var resumableFlag bool
  1107. var resumableInfo *MultiDownloadCPInfo
  1108. var cpfd *os.File
  1109. var cpfile string
  1110. if opt.CheckPoint {
  1111. cpInfo := &MultiDownloadCPInfo{
  1112. LastModified: resp.Header.Get("Last-Modified"),
  1113. ETag: resp.Header.Get("ETag"),
  1114. CRC64: coscrc,
  1115. Size: totalBytes,
  1116. }
  1117. cpfile = opt.CheckPointFile
  1118. if cpfile == "" {
  1119. cpfile = fmt.Sprintf("%s.cosresumabletask", filepath)
  1120. }
  1121. resumableInfo, resumableFlag = s.checkDownloadedParts(cpInfo, cpfile, chunks)
  1122. cpfd, err = os.OpenFile(cpfile, os.O_RDWR, 0660)
  1123. if err != nil {
  1124. return nil, fmt.Errorf("Open CheckPoint File[%v] Failed:%v", cpfile, err)
  1125. }
  1126. }
  1127. if !resumableFlag {
  1128. // 创建文件
  1129. nfile, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
  1130. if err != nil {
  1131. if cpfd != nil {
  1132. cpfd.Close()
  1133. }
  1134. return resp, err
  1135. }
  1136. nfile.Close()
  1137. }
  1138. var poolSize int
  1139. if opt.ThreadPoolSize > 0 {
  1140. poolSize = opt.ThreadPoolSize
  1141. } else {
  1142. poolSize = 1
  1143. }
  1144. chjobs := make(chan *Jobs, 100)
  1145. chresults := make(chan *Results, 10000)
  1146. for w := 1; w <= poolSize; w++ {
  1147. go downloadWorker(ctx, s, chjobs, chresults)
  1148. }
  1149. go func() {
  1150. for _, chunk := range chunks {
  1151. if chunk.Done {
  1152. continue
  1153. }
  1154. var downOpt ObjectGetOptions
  1155. if opt.Opt != nil {
  1156. downOpt = *opt.Opt
  1157. downOpt.Listener = nil // listener need to set nil
  1158. }
  1159. job := &Jobs{
  1160. Name: name,
  1161. RetryTimes: 3,
  1162. FilePath: filepath,
  1163. Chunk: chunk,
  1164. DownOpt: &downOpt,
  1165. }
  1166. if len(id) > 0 {
  1167. job.VersionId = append(job.VersionId, id...)
  1168. }
  1169. chjobs <- job
  1170. }
  1171. close(chjobs)
  1172. }()
  1173. err = nil
  1174. for i := 0; i < partNum; i++ {
  1175. if chunks[i].Done {
  1176. continue
  1177. }
  1178. res := <-chresults
  1179. if res.Resp == nil || res.err != nil {
  1180. err = fmt.Errorf("part %d get resp Content. error: %s", res.PartNumber, res.err.Error())
  1181. continue
  1182. }
  1183. // Dump CheckPoint Info
  1184. if opt.CheckPoint {
  1185. cpfd.Truncate(0)
  1186. cpfd.Seek(0, os.SEEK_SET)
  1187. resumableInfo.DownloadedBlocks = append(resumableInfo.DownloadedBlocks, DownloadedBlock{
  1188. From: chunks[res.PartNumber-1].OffSet,
  1189. To: chunks[res.PartNumber-1].OffSet + chunks[res.PartNumber-1].Size - 1,
  1190. })
  1191. json.NewEncoder(cpfd).Encode(resumableInfo)
  1192. }
  1193. }
  1194. close(chresults)
  1195. if cpfd != nil {
  1196. cpfd.Close()
  1197. }
  1198. if err != nil {
  1199. return nil, err
  1200. }
  1201. // 下载成功,删除checkpoint文件
  1202. if opt.CheckPoint {
  1203. os.Remove(cpfile)
  1204. }
  1205. if coscrc != "" && s.client.Conf.EnableCRC {
  1206. icoscrc, _ := strconv.ParseUint(coscrc, 10, 64)
  1207. fd, err := os.Open(filepath)
  1208. if err != nil {
  1209. return resp, err
  1210. }
  1211. defer fd.Close()
  1212. localcrc, err := calCRC64(fd)
  1213. if err != nil {
  1214. return resp, err
  1215. }
  1216. if localcrc != icoscrc {
  1217. return resp, fmt.Errorf("verification failed, want:%v, return:%v", icoscrc, localcrc)
  1218. }
  1219. }
  1220. return resp, err
  1221. }
  1222. type ObjectPutTaggingOptions struct {
  1223. XMLName xml.Name `xml:"Tagging"`
  1224. TagSet []ObjectTaggingTag `xml:"TagSet>Tag,omitempty"`
  1225. }
  1226. type ObjectTaggingTag BucketTaggingTag
  1227. type ObjectGetTaggingResult ObjectPutTaggingOptions
  1228. func (s *ObjectService) PutTagging(ctx context.Context, name string, opt *ObjectPutTaggingOptions, id ...string) (*Response, error) {
  1229. var u string
  1230. if len(id) == 1 {
  1231. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  1232. } else if len(id) == 0 {
  1233. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  1234. } else {
  1235. return nil, errors.New("wrong params")
  1236. }
  1237. sendOpt := &sendOptions{
  1238. baseURL: s.client.BaseURL.BucketURL,
  1239. uri: u,
  1240. method: http.MethodPut,
  1241. body: opt,
  1242. }
  1243. resp, err := s.client.send(ctx, sendOpt)
  1244. return resp, err
  1245. }
  1246. func (s *ObjectService) GetTagging(ctx context.Context, name string, id ...string) (*ObjectGetTaggingResult, *Response, error) {
  1247. var u string
  1248. if len(id) == 1 {
  1249. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  1250. } else if len(id) == 0 {
  1251. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  1252. } else {
  1253. return nil, nil, errors.New("wrong params")
  1254. }
  1255. var res ObjectGetTaggingResult
  1256. sendOpt := &sendOptions{
  1257. baseURL: s.client.BaseURL.BucketURL,
  1258. uri: u,
  1259. method: http.MethodGet,
  1260. result: &res,
  1261. }
  1262. resp, err := s.client.send(ctx, sendOpt)
  1263. return &res, resp, err
  1264. }
  1265. func (s *ObjectService) DeleteTagging(ctx context.Context, name string, id ...string) (*Response, error) {
  1266. var u string
  1267. if len(id) == 1 {
  1268. u = fmt.Sprintf("/%s?tagging&versionId=%s", encodeURIComponent(name), id[0])
  1269. } else if len(id) == 0 {
  1270. u = fmt.Sprintf("/%s?tagging", encodeURIComponent(name))
  1271. } else {
  1272. return nil, errors.New("wrong params")
  1273. }
  1274. sendOpt := &sendOptions{
  1275. baseURL: s.client.BaseURL.BucketURL,
  1276. uri: u,
  1277. method: http.MethodDelete,
  1278. }
  1279. resp, err := s.client.send(ctx, sendOpt)
  1280. return resp, err
  1281. }