Browse Source

update multidownload

master
jojoliang 4 years ago
parent
commit
2a64ad473b
  1. 39
      costesting/ci_test.go
  2. 11
      object.go
  3. 55
      object_test.go

39
costesting/ci_test.go

@ -100,7 +100,7 @@ func (s *CosTestSuite) SetupSuite() {
XCosACL: "public-read",
}
r, err := s.Client.Bucket.Put(context.Background(), opt)
if err != nil && r.StatusCode == 409 {
if err != nil && r != nil && r.StatusCode == 409 {
fmt.Println("BucketAlreadyOwnedByYou")
} else if err != nil {
assert.Nil(s.T(), err, "PutBucket Failed")
@ -142,7 +142,7 @@ func (s *CosTestSuite) TestPutHeadDeleteBucket() {
},
})
r, err := client.Bucket.Put(context.Background(), nil)
if err != nil && r.StatusCode == 409 {
if err != nil && r != nil && r.StatusCode == 409 {
fmt.Println("BucketAlreadyOwnedByYou")
} else if err != nil {
assert.Nil(s.T(), err, "PutBucket Failed")
@ -507,6 +507,37 @@ func (s *CosTestSuite) TestPutGetDeleteObjectByUpload_10MB() {
assert.Nil(s.T(), err, "remove local file Failed")
}
func (s *CosTestSuite) TestPutGetDeleteObjectByUploadAndDownload_10MB() {
// Create tmp file
filePath := "tmpfile" + time.Now().Format(time.RFC3339)
newfile, err := os.Create(filePath)
assert.Nil(s.T(), err, "create tmp file Failed")
defer newfile.Close()
name := "test/objectUpload" + time.Now().Format(time.RFC3339)
b := make([]byte, 1024*1024*10)
_, err = rand.Read(b)
newfile.Write(b)
opt := &cos.MultiUploadOptions{
PartSize: 1,
ThreadPoolSize: 3,
}
_, _, err = s.Client.Object.Upload(context.Background(), name, filePath, opt)
assert.Nil(s.T(), err, "PutObject Failed")
// Over write tmp file
_, err = s.Client.Object.Download(context.Background(), name, filePath, nil)
assert.Nil(s.T(), err, "DownloadObject Failed")
_, err = s.Client.Object.Delete(context.Background(), name)
assert.Nil(s.T(), err, "DeleteObject Failed")
// remove the local tmp file
err = os.Remove(filePath)
assert.Nil(s.T(), err, "remove local file Failed")
}
func (s *CosTestSuite) TestPutGetDeleteObjectSpecialName() {
f := strings.NewReader("test")
name := s.SepFileName + time.Now().Format(time.RFC3339)
@ -606,7 +637,7 @@ func (s *CosTestSuite) TestCopyObject() {
// Notice in intranet the bucket host sometimes has i/o timeout problem
r, err := c.Bucket.Put(context.Background(), opt)
if err != nil && r.StatusCode == 409 {
if err != nil && r != nil && r.StatusCode == 409 {
fmt.Println("BucketAlreadyOwnedByYou")
} else if err != nil {
assert.Nil(s.T(), err, "PutBucket Failed")
@ -971,7 +1002,7 @@ func (s *CosTestSuite) TestMultiCopy() {
// Notice in intranet the bucket host sometimes has i/o timeout problem
r, err := c.Bucket.Put(context.Background(), opt)
if err != nil && r.StatusCode == 409 {
if err != nil && r != nil && r.StatusCode == 409 {
fmt.Println("BucketAlreadyOwnedByYou")
} else if err != nil {
assert.Nil(s.T(), err, "PutBucket Failed")

11
object.go

@ -673,8 +673,9 @@ func downloadWorker(s *ObjectService, jobs <-chan *Jobs, results chan<- *Results
fd.Seek(j.Chunk.OffSet, os.SEEK_SET)
n, err := io.Copy(fd, LimitReadCloser(resp.Body, j.Chunk.Size))
if n != j.Chunk.Size || err != nil {
res.err = fmt.Errorf("io.Copy Failed, read:%v, size:%v, err:%v", n, j.Chunk.Size, err)
res.err = fmt.Errorf("io.Copy Failed, nread:%v, want:%v, err:%v", n, j.Chunk.Size, err)
}
fd.Close()
results <- &res
break
}
@ -1040,7 +1041,7 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
opt = &MultiDownloadOptions{}
}
if opt.Opt != nil && opt.Opt.Range != "" {
return nil, fmt.Errorf("does not supported Range Get")
return nil, fmt.Errorf("Download doesn't support Range Options")
}
// 获取文件长度和CRC
var coscrc string
@ -1048,6 +1049,7 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
if err != nil {
return resp, err
}
// 如果对象不存在x-cos-hash-crc64ecma,则跳过不做校验
coscrc = resp.Header.Get("x-cos-hash-crc64ecma")
strTotalBytes := resp.Header.Get("Content-Length")
totalBytes, err := strconv.ParseInt(strTotalBytes, 10, 64)
@ -1072,6 +1074,7 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
if err != nil {
return rsp, err
}
defer fd.Close()
localcrc, err := calCRC64(fd)
if err != nil {
return rsp, err
@ -1082,11 +1085,13 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
}
return rsp, err
}
// 创建文件
nfile, err := os.OpenFile(filepath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0660)
if err != nil {
return resp, err
}
nfile.Close()
var poolSize int
if opt.ThreadPoolSize > 0 {
poolSize = opt.ThreadPoolSize
@ -1104,6 +1109,7 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
var downOpt ObjectGetOptions
if opt.Opt != nil {
downOpt = *opt.Opt
downOpt.Listener = nil // listener need to set nil
}
job := &Jobs{
Name: name,
@ -1135,6 +1141,7 @@ func (s *ObjectService) Download(ctx context.Context, name string, filepath stri
if err != nil {
return resp, err
}
defer fd.Close()
localcrc, err := calCRC64(fd)
if err != nil {
return resp, err

55
object_test.go

@ -10,6 +10,7 @@ import (
"hash/crc64"
"io"
"io/ioutil"
math_rand "math/rand"
"net/http"
"net/url"
"os"
@ -528,19 +529,19 @@ func TestObjectService_Download(t *testing.T) {
}
defer os.Remove(filePath)
// 源文件内容
totalBytes := int64(1024 * 1024 * 10)
totalBytes := int64(1024*1024*9 + 123)
b := make([]byte, totalBytes)
_, err = rand.Read(b)
newfile.Write(b)
newfile.Close()
tb := crc64.MakeTable(crc64.ECMA)
localcrc := crc64.Update(0, tb, b)
localcrc := strconv.FormatUint(crc64.Update(0, tb, b), 10)
retryMap := make(map[int64]int)
mux.HandleFunc("/test.go.download", func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodHead {
w.Header().Add("Content-Length", strconv.FormatInt(totalBytes, 10))
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(localcrc, 10))
w.Header().Add("x-cos-hash-crc64ecma", localcrc)
return
}
strRange := r.Header.Get("Range")
@ -549,36 +550,21 @@ func TestObjectService_Download(t *testing.T) {
start, _ := strconv.ParseInt(slice2[0], 10, 64)
end, _ := strconv.ParseInt(slice2[1], 10, 64)
if retryMap[start] == 0 {
// 重试校验1
retryMap[start]++
w.WriteHeader(http.StatusGatewayTimeout)
} else if retryMap[start] == 1 {
// 重试检验2
retryMap[start]++
w.WriteHeader(http.StatusGatewayTimeout)
return
fd, err := os.Open(filePath)
if err != nil {
t.Fatalf("open file failed: %v", err)
}
defer fd.Close()
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(localcrc, 10))
fd.Seek(start, os.SEEK_SET)
n, err := io.Copy(w, LimitReadCloser(fd, (end-start)/2))
if err != nil || int64(n) != (end-start)/2 {
t.Fatalf("write file failed:%v, n:%v", err, n)
}
io.Copy(w, bytes.NewBuffer(b[start:end]))
} else if retryMap[start] == 2 {
// 重试检验3
retryMap[start]++
st := math_rand.Int63n(totalBytes - 1024*1024)
et := st + end - start
io.Copy(w, bytes.NewBuffer(b[st:et+1]))
} else {
fd, err := os.Open(filePath)
if err != nil {
t.Fatalf("open file failed: %v", err)
}
defer fd.Close()
w.Header().Add("x-cos-hash-crc64ecma", strconv.FormatUint(localcrc, 10))
fd.Seek(start, os.SEEK_SET)
n, err := io.Copy(w, LimitReadCloser(fd, end-start+1))
if err != nil || int64(n) != end-start+1 {
t.Fatalf("write file failed:%v, n:%v", err, n)
}
io.Copy(w, bytes.NewBuffer(b[start:end+1]))
}
})
@ -587,9 +573,20 @@ func TestObjectService_Download(t *testing.T) {
PartSize: 1,
}
downPath := "down.file" + time.Now().Format(time.RFC3339)
defer os.Remove(downPath)
_, err = client.Object.Download(context.Background(), "test.go.download", downPath, opt)
if err == nil {
// 长度不一致 Failed
t.Fatalf("Object.Upload returned error: %v", err)
}
_, err = client.Object.Download(context.Background(), "test.go.download", downPath, opt)
if err == nil {
// CRC不一致
t.Fatalf("Object.Upload returned error: %v", err)
}
_, err = client.Object.Download(context.Background(), "test.go.download", downPath, opt)
if err != nil {
// 正确
t.Fatalf("Object.Upload returned error: %v", err)
}
os.Remove(downPath)
}
Loading…
Cancel
Save