jojoliang
5 years ago
5 changed files with 582 additions and 0 deletions
-
66example/object/select.go
-
66example/object/select_csv.go
-
4object.go
-
2object_part.go
-
444object_select.go
@ -0,0 +1,66 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"net/url" |
|||
"os" |
|||
|
|||
"io/ioutil" |
|||
"net/http" |
|||
|
|||
"github.com/tencentyun/cos-go-sdk-v5" |
|||
"github.com/tencentyun/cos-go-sdk-v5/debug" |
|||
) |
|||
|
|||
func main() { |
|||
u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") |
|||
b := &cos.BaseURL{BucketURL: u} |
|||
c := cos.NewClient(b, &http.Client{ |
|||
Transport: &cos.AuthorizationTransport{ |
|||
SecretID: os.Getenv("COS_SECRETID"), |
|||
SecretKey: os.Getenv("COS_SECRETKEY"), |
|||
Transport: &debug.DebugRequestTransport{ |
|||
RequestHeader: true, |
|||
// Notice when put a large file and set need the request body, might happend out of memory error.
|
|||
RequestBody: false, |
|||
ResponseHeader: true, |
|||
ResponseBody: false, |
|||
}, |
|||
}, |
|||
}) |
|||
|
|||
opt := &cos.ObjectSelectOptions{ |
|||
Expression: "Select * from COSObject", |
|||
ExpressionType: "SQL", |
|||
InputSerialization: &cos.SelectInputSerialization{ |
|||
JSON: &cos.JSONInputSerialization{ |
|||
Type: "DOCUMENT", |
|||
}, |
|||
}, |
|||
OutputSerialization: &cos.SelectOutputSerialization{ |
|||
JSON: &cos.JSONOutputSerialization{ |
|||
RecordDelimiter: "\n", |
|||
}, |
|||
}, |
|||
RequestProgress: "TRUE", |
|||
} |
|||
res, err := c.Object.Select(context.Background(), "test.json", opt) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
defer res.Close() |
|||
data, err := ioutil.ReadAll(res) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
fmt.Printf("data: %v\n", string(data)) |
|||
resp, _ := res.(*cos.ObjectSelectResponse) |
|||
fmt.Printf("data: %+v\n", resp.Frame) |
|||
|
|||
// Select to File
|
|||
_, err = c.Object.SelectToFile(context.Background(), "test.json", "./test.json", opt) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
} |
@ -0,0 +1,66 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"context" |
|||
"fmt" |
|||
"net/url" |
|||
"os" |
|||
|
|||
"io/ioutil" |
|||
"net/http" |
|||
|
|||
"github.com/tencentyun/cos-go-sdk-v5" |
|||
"github.com/tencentyun/cos-go-sdk-v5/debug" |
|||
) |
|||
|
|||
func main() { |
|||
u, _ := url.Parse("https://test-1259654469.cos.ap-guangzhou.myqcloud.com") |
|||
b := &cos.BaseURL{BucketURL: u} |
|||
c := cos.NewClient(b, &http.Client{ |
|||
Transport: &cos.AuthorizationTransport{ |
|||
SecretID: os.Getenv("COS_SECRETID"), |
|||
SecretKey: os.Getenv("COS_SECRETKEY"), |
|||
Transport: &debug.DebugRequestTransport{ |
|||
RequestHeader: true, |
|||
// Notice when put a large file and set need the request body, might happend out of memory error.
|
|||
RequestBody: false, |
|||
ResponseHeader: true, |
|||
ResponseBody: false, |
|||
}, |
|||
}, |
|||
}) |
|||
|
|||
opt := &cos.ObjectSelectOptions{ |
|||
Expression: "Select * from COSObject", |
|||
ExpressionType: "SQL", |
|||
InputSerialization: &cos.SelectInputSerialization{ |
|||
CSV: &cos.CSVInputSerialization{ |
|||
FileHeaderInfo: "IGNORE", |
|||
}, |
|||
}, |
|||
OutputSerialization: &cos.SelectOutputSerialization{ |
|||
CSV: &cos.CSVOutputSerialization{ |
|||
RecordDelimiter: "\n", |
|||
}, |
|||
}, |
|||
RequestProgress: "TRUE", |
|||
} |
|||
res, err := c.Object.Select(context.Background(), "test.csv", opt) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
defer res.Close() |
|||
data, err := ioutil.ReadAll(res) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
fmt.Printf("data: %v\n", string(data)) |
|||
resp, _ := res.(*cos.ObjectSelectResponse) |
|||
fmt.Printf("data: %+v\n", resp.Frame) |
|||
|
|||
// Select To File
|
|||
_, err = c.Object.SelectToFile(context.Background(), "test.csv", "./test.csv", opt) |
|||
if err != nil { |
|||
panic(err) |
|||
} |
|||
} |
@ -0,0 +1,444 @@ |
|||
package cos |
|||
|
|||
import ( |
|||
"bytes" |
|||
"context" |
|||
"encoding/binary" |
|||
"encoding/xml" |
|||
"fmt" |
|||
"hash/crc32" |
|||
"io" |
|||
"io/ioutil" |
|||
"net/http" |
|||
"os" |
|||
"time" |
|||
) |
|||
|
|||
type JSONInputSerialization struct { |
|||
Type string `xml:"Type"` |
|||
} |
|||
|
|||
type CSVInputSerialization struct { |
|||
RecordDelimiter string `xml:"RecordDelimiter,omitempty"` |
|||
FieldDelimiter string `xml:"FieldDelimiter,omitempty"` |
|||
QuoteCharacter string `xml:"QuoteCharacter,omitempty"` |
|||
QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"` |
|||
AllowQuotedRecordDelimiter string `xml:"AllowQuotedRecordDelimiter,omitempty"` |
|||
FileHeaderInfo string `xml:"FileHeaderInfo,omitempty"` |
|||
Comments string `xml:"Comments,omitempty"` |
|||
} |
|||
|
|||
type SelectInputSerialization struct { |
|||
CompressionType string `xml:"CompressionType,omitempty"` |
|||
CSV *CSVInputSerialization `xml:"CSV,omitempty"` |
|||
JSON *JSONInputSerialization `xml:"JSON,omitempty"` |
|||
} |
|||
|
|||
type JSONOutputSerialization struct { |
|||
RecordDelimiter string `xml:"RecordDelimiter,omitempty"` |
|||
} |
|||
|
|||
type CSVOutputSerialization struct { |
|||
QuoteFileds string `xml:"QuoteFileds,omitempty"` |
|||
RecordDelimiter string `xml:"RecordDelimiter,omitempty"` |
|||
FieldDelimiter string `xml:"FieldDelimiter,omitempty"` |
|||
QuoteCharacter string `xml:"QuoteCharacter,omitempty"` |
|||
QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"` |
|||
} |
|||
|
|||
type SelectOutputSerialization struct { |
|||
CSV *CSVOutputSerialization `xml:"CSV,omitempty"` |
|||
JSON *JSONOutputSerialization `xml:"JSON,omitempty"` |
|||
} |
|||
|
|||
type ObjectSelectOptions struct { |
|||
XMLName xml.Name `xml:"SelectRequest"` |
|||
Expression string `xml:"Expression"` |
|||
ExpressionType string `xml:"ExpressionType"` |
|||
InputSerialization *SelectInputSerialization `xml:"InputSerialization"` |
|||
OutputSerialization *SelectOutputSerialization `xml:"OutputSerialization"` |
|||
RequestProgress string `xml:"RequestProgress>Enabled,omitempty"` |
|||
} |
|||
|
|||
func (s *ObjectService) Select(ctx context.Context, name string, opt *ObjectSelectOptions) (io.ReadCloser, error) { |
|||
u := fmt.Sprintf("/%s?select&select-type=2", encodeURIComponent(name)) |
|||
sendOpt := sendOptions{ |
|||
baseURL: s.client.BaseURL.BucketURL, |
|||
uri: u, |
|||
method: http.MethodPost, |
|||
body: opt, |
|||
disableCloseBody: true, |
|||
} |
|||
resp, err := s.client.send(ctx, &sendOpt) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
result := &ObjectSelectResponse{ |
|||
Headers: resp.Header, |
|||
Body: resp.Body, |
|||
StatusCode: resp.StatusCode, |
|||
Frame: &ObjectSelectResult{ |
|||
NextFrame: true, |
|||
Payload: []byte{}, |
|||
}, |
|||
Finish: false, |
|||
} |
|||
|
|||
return result, nil |
|||
} |
|||
|
|||
func (s *ObjectService) SelectToFile(ctx context.Context, name, file string, opt *ObjectSelectOptions) (*ObjectSelectResponse, error) { |
|||
resp, err := s.Select(ctx, name, opt) |
|||
if err != nil { |
|||
return nil, err |
|||
} |
|||
res, _ := resp.(*ObjectSelectResponse) |
|||
defer func() { |
|||
io.Copy(ioutil.Discard, resp) |
|||
resp.Close() |
|||
}() |
|||
|
|||
fd, err := os.OpenFile(file, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(0664)) |
|||
if err != nil { |
|||
return res, err |
|||
} |
|||
|
|||
_, err = io.Copy(fd, resp) |
|||
fd.Close() |
|||
res.Finish = true |
|||
return res, err |
|||
} |
|||
|
|||
const ( |
|||
kReadTimeout = 3 |
|||
kMessageType = ":message-type" |
|||
kEventType = ":event-type" |
|||
kContentType = ":content-type" |
|||
|
|||
kRecordsFrameType = iota |
|||
kContinuationFrameType |
|||
kProgressFrameType |
|||
kStatsFrameType |
|||
kEndFrameType |
|||
kErrorFrameType |
|||
) |
|||
|
|||
type ProgressFrame struct { |
|||
XMLName xml.Name `xml:"Progress"` |
|||
BytesScanned int `xml:"BytesScanned"` |
|||
BytesProcessed int `xml:"BytesProcessed"` |
|||
BytesReturned int `xml:"BytesReturned"` |
|||
} |
|||
|
|||
type StatsFrame struct { |
|||
XMLName xml.Name `xml:"Stats"` |
|||
BytesScanned int `xml:"BytesScanned"` |
|||
BytesProcessed int `xml:"BytesProcessed"` |
|||
BytesReturned int `xml:"BytesReturned"` |
|||
} |
|||
|
|||
type DataFrame struct { |
|||
ContentType string |
|||
ConsumedBytesLength int32 |
|||
LeftBytesLength int32 |
|||
} |
|||
|
|||
type ErrorFrame struct { |
|||
Code string |
|||
Message string |
|||
} |
|||
|
|||
func (e *ErrorFrame) Error() string { |
|||
return fmt.Sprintf("Error Code: %s, Error Message: %s", e.Code, e.Message) |
|||
} |
|||
|
|||
type ObjectSelectResult struct { |
|||
TotalFrameLength int32 |
|||
TotalHeaderLength int32 |
|||
NextFrame bool |
|||
FrameType int |
|||
Payload []byte |
|||
DataFrame DataFrame |
|||
ProgressFrame ProgressFrame |
|||
StatsFrame StatsFrame |
|||
ErrorFrame *ErrorFrame |
|||
} |
|||
|
|||
type ObjectSelectResponse struct { |
|||
StatusCode int |
|||
Headers http.Header |
|||
Body io.ReadCloser |
|||
Frame *ObjectSelectResult |
|||
Finish bool |
|||
} |
|||
|
|||
func (osr *ObjectSelectResponse) Read(p []byte) (n int, err error) { |
|||
n, err = osr.readFrames(p) |
|||
return |
|||
} |
|||
func (osr *ObjectSelectResponse) Close() error { |
|||
return osr.Body.Close() |
|||
} |
|||
|
|||
func (osr *ObjectSelectResponse) readFrames(p []byte) (int, error) { |
|||
if osr.Finish { |
|||
return 0, io.EOF |
|||
} |
|||
if osr.Frame.ErrorFrame != nil { |
|||
return 0, osr.Frame.ErrorFrame |
|||
} |
|||
|
|||
var err error |
|||
var nlen int |
|||
dlen := len(p) |
|||
|
|||
for nlen < dlen { |
|||
if osr.Frame.NextFrame == true { |
|||
osr.Frame.NextFrame = false |
|||
err := osr.analysisPrelude() |
|||
if err != nil { |
|||
return nlen, err |
|||
} |
|||
err = osr.analysisHeader() |
|||
if err != nil { |
|||
return nlen, err |
|||
} |
|||
} |
|||
switch osr.Frame.FrameType { |
|||
case kRecordsFrameType: |
|||
n, err := osr.analysisRecords(p[nlen:]) |
|||
if err != nil { |
|||
return nlen, err |
|||
} |
|||
nlen += n |
|||
case kContinuationFrameType: |
|||
err = osr.payloadChecksum("ContinuationFrame") |
|||
if err != nil { |
|||
return nlen, err |
|||
} |
|||
case kProgressFrameType: |
|||
err := osr.analysisXml(&osr.Frame.ProgressFrame) |
|||
if err != nil { |
|||
return nlen, err |
|||
} |
|||
case kStatsFrameType: |
|||
err := osr.analysisXml(&osr.Frame.StatsFrame) |
|||
if err != nil { |
|||
return nlen, err |
|||
} |
|||
case kEndFrameType: |
|||
err = osr.payloadChecksum("EndFrame") |
|||
if err != nil { |
|||
return nlen, err |
|||
} |
|||
osr.Finish = true |
|||
return nlen, io.EOF |
|||
case kErrorFrameType: |
|||
return nlen, osr.Frame.ErrorFrame |
|||
} |
|||
} |
|||
return nlen, err |
|||
} |
|||
|
|||
func (osr *ObjectSelectResponse) analysisPrelude() error { |
|||
frame := make([]byte, 12) |
|||
_, err := osr.fixedLengthRead(frame, kReadTimeout) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
|
|||
var preludeCRC uint32 |
|||
bytesToInt(frame[0:4], &osr.Frame.TotalFrameLength) |
|||
bytesToInt(frame[4:8], &osr.Frame.TotalHeaderLength) |
|||
bytesToInt(frame[8:12], &preludeCRC) |
|||
osr.Frame.Payload = append(osr.Frame.Payload, frame...) |
|||
|
|||
return checksum(frame[0:8], preludeCRC, "Prelude") |
|||
} |
|||
|
|||
func (osr *ObjectSelectResponse) analysisHeader() error { |
|||
var nlen int32 |
|||
headers := make(map[string]string) |
|||
for nlen < osr.Frame.TotalHeaderLength { |
|||
var headerNameLen int8 |
|||
var headerValueLen int16 |
|||
bHeaderNameLen := make([]byte, 1) |
|||
_, err := osr.fixedLengthRead(bHeaderNameLen, kReadTimeout) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
nlen += 1 |
|||
bytesToInt(bHeaderNameLen, &headerNameLen) |
|||
osr.Frame.Payload = append(osr.Frame.Payload, bHeaderNameLen...) |
|||
|
|||
bHeaderName := make([]byte, headerNameLen) |
|||
_, err = osr.fixedLengthRead(bHeaderName, kReadTimeout) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
nlen += int32(headerNameLen) |
|||
headerName := string(bHeaderName) |
|||
osr.Frame.Payload = append(osr.Frame.Payload, bHeaderName...) |
|||
|
|||
bValueTypeLen := make([]byte, 3) |
|||
_, err = osr.fixedLengthRead(bValueTypeLen, kReadTimeout) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
nlen += 3 |
|||
bytesToInt(bValueTypeLen[1:], &headerValueLen) |
|||
osr.Frame.Payload = append(osr.Frame.Payload, bValueTypeLen...) |
|||
|
|||
bHeaderValue := make([]byte, headerValueLen) |
|||
_, err = osr.fixedLengthRead(bHeaderValue, kReadTimeout) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
nlen += int32(headerValueLen) |
|||
headers[headerName] = string(bHeaderValue) |
|||
osr.Frame.Payload = append(osr.Frame.Payload, bHeaderValue...) |
|||
} |
|||
htype, ok := headers[kMessageType] |
|||
if !ok { |
|||
return fmt.Errorf("header parse failed, no message-type, headers: %+v\n", headers) |
|||
} |
|||
switch { |
|||
case htype == "error": |
|||
osr.Frame.FrameType = kErrorFrameType |
|||
osr.Frame.ErrorFrame = &ErrorFrame{} |
|||
osr.Frame.ErrorFrame.Code, _ = headers[":error-code"] |
|||
osr.Frame.ErrorFrame.Message, _ = headers[":error-message"] |
|||
case htype == "event": |
|||
hevent, ok := headers[kEventType] |
|||
if !ok { |
|||
return fmt.Errorf("header parse failed, no event-type, headers: %+v\n", headers) |
|||
} |
|||
switch { |
|||
case hevent == "Records": |
|||
hContentType, ok := headers[kContentType] |
|||
if ok { |
|||
osr.Frame.DataFrame.ContentType = hContentType |
|||
} |
|||
osr.Frame.FrameType = kRecordsFrameType |
|||
case hevent == "Cont": |
|||
osr.Frame.FrameType = kContinuationFrameType |
|||
case hevent == "Progress": |
|||
osr.Frame.FrameType = kProgressFrameType |
|||
case hevent == "Stats": |
|||
osr.Frame.FrameType = kStatsFrameType |
|||
case hevent == "End": |
|||
osr.Frame.FrameType = kEndFrameType |
|||
default: |
|||
return fmt.Errorf("header parse failed, invalid event-type, headers: %+v\n", headers) |
|||
} |
|||
default: |
|||
return fmt.Errorf("header parse failed, invalid message-type: headers: %+v\n", headers) |
|||
} |
|||
return nil |
|||
} |
|||
|
|||
func (osr *ObjectSelectResponse) analysisRecords(data []byte) (int, error) { |
|||
var needReadLength int32 |
|||
dlen := int32(len(data)) |
|||
restLen := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength - osr.Frame.DataFrame.ConsumedBytesLength |
|||
if dlen <= restLen { |
|||
needReadLength = dlen |
|||
} else { |
|||
needReadLength = restLen |
|||
} |
|||
n, err := osr.fixedLengthRead(data[:needReadLength], kReadTimeout) |
|||
if err != nil { |
|||
return n, fmt.Errorf("read data frame error: %s", err.Error()) |
|||
} |
|||
osr.Frame.DataFrame.ConsumedBytesLength += int32(n) |
|||
osr.Frame.Payload = append(osr.Frame.Payload, data[:needReadLength]...) |
|||
// 读完了一帧数据并填充到data中了
|
|||
if osr.Frame.DataFrame.ConsumedBytesLength == osr.Frame.TotalFrameLength-16-osr.Frame.TotalHeaderLength { |
|||
osr.Frame.DataFrame.ConsumedBytesLength = 0 |
|||
err = osr.payloadChecksum("RecordFrame") |
|||
} |
|||
return n, err |
|||
} |
|||
|
|||
func (osr *ObjectSelectResponse) analysisXml(frame interface{}) error { |
|||
payloadLength := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength |
|||
bFrame := make([]byte, payloadLength) |
|||
_, err := osr.fixedLengthRead(bFrame, kReadTimeout) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
err = xml.Unmarshal(bFrame, frame) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
osr.Frame.Payload = append(osr.Frame.Payload, bFrame...) |
|||
return osr.payloadChecksum("XmlFrame") |
|||
} |
|||
|
|||
// 调用payloadChecksum时,表示该帧已读完,开始读取下一帧内容
|
|||
func (osr *ObjectSelectResponse) payloadChecksum(ftype string) error { |
|||
bcrc := make([]byte, 4) |
|||
_, err := osr.fixedLengthRead(bcrc, kReadTimeout) |
|||
if err != nil { |
|||
return err |
|||
} |
|||
var res uint32 |
|||
bytesToInt(bcrc, &res) |
|||
err = checksum(osr.Frame.Payload, res, ftype) |
|||
|
|||
osr.Frame.NextFrame = true |
|||
osr.Frame.Payload = []byte{} |
|||
|
|||
return err |
|||
} |
|||
|
|||
type chanReadIO struct { |
|||
readLen int |
|||
err error |
|||
} |
|||
|
|||
func (osr *ObjectSelectResponse) fixedLengthRead(p []byte, read_timeout int64) (int, error) { |
|||
timeout := time.Duration(read_timeout) |
|||
r := osr.Body |
|||
ch := make(chan chanReadIO, 1) |
|||
defer close(ch) |
|||
go func(p []byte) { |
|||
var needLen int |
|||
readChan := chanReadIO{} |
|||
needLen = len(p) |
|||
for { |
|||
n, err := r.Read(p[readChan.readLen:needLen]) |
|||
readChan.readLen += n |
|||
if err != nil { |
|||
readChan.err = err |
|||
ch <- readChan |
|||
return |
|||
} |
|||
|
|||
if readChan.readLen == needLen { |
|||
break |
|||
} |
|||
} |
|||
ch <- readChan |
|||
}(p) |
|||
|
|||
select { |
|||
case <-time.After(time.Second * timeout): |
|||
return 0, fmt.Errorf("requestId: %s, readLen timeout, timeout is %d(second),need read:%d", "sr.Headers.Get(HTTPHeaderOssRequestID)", timeout, len(p)) |
|||
case result := <-ch: |
|||
return result.readLen, result.err |
|||
} |
|||
} |
|||
|
|||
func bytesToInt(b []byte, ret interface{}) { |
|||
binBuf := bytes.NewBuffer(b) |
|||
binary.Read(binBuf, binary.BigEndian, ret) |
|||
} |
|||
|
|||
func checksum(b []byte, rec uint32, ftype string) error { |
|||
c := crc32.ChecksumIEEE(b) |
|||
if c != rec { |
|||
return fmt.Errorf("parse type: %v, checksum failed, cal: %v, rec: %v\n", ftype, c, rec) |
|||
} |
|||
return nil |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue