You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
445 lines
12 KiB
445 lines
12 KiB
package cos
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/xml"
|
|
"fmt"
|
|
"hash/crc32"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
)
|
|
|
|
type JSONInputSerialization struct {
|
|
Type string `xml:"Type,omitempty"`
|
|
}
|
|
|
|
type CSVInputSerialization struct {
|
|
RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
|
|
FieldDelimiter string `xml:"FieldDelimiter,omitempty"`
|
|
QuoteCharacter string `xml:"QuoteCharacter,omitempty"`
|
|
QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"`
|
|
AllowQuotedRecordDelimiter string `xml:"AllowQuotedRecordDelimiter,omitempty"`
|
|
FileHeaderInfo string `xml:"FileHeaderInfo,omitempty"`
|
|
Comments string `xml:"Comments,omitempty"`
|
|
}
|
|
|
|
type SelectInputSerialization struct {
|
|
CompressionType string `xml:"CompressionType,omitempty"`
|
|
CSV *CSVInputSerialization `xml:"CSV,omitempty"`
|
|
JSON *JSONInputSerialization `xml:"JSON,omitempty"`
|
|
}
|
|
|
|
type JSONOutputSerialization struct {
|
|
RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
|
|
}
|
|
|
|
type CSVOutputSerialization struct {
|
|
QuoteFields string `xml:"QuoteFields,omitempty"`
|
|
RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
|
|
FieldDelimiter string `xml:"FieldDelimiter,omitempty"`
|
|
QuoteCharacter string `xml:"QuoteCharacter,omitempty"`
|
|
QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"`
|
|
}
|
|
|
|
type SelectOutputSerialization struct {
|
|
CSV *CSVOutputSerialization `xml:"CSV,omitempty"`
|
|
JSON *JSONOutputSerialization `xml:"JSON,omitempty"`
|
|
}
|
|
|
|
type ObjectSelectOptions struct {
|
|
XMLName xml.Name `xml:"SelectRequest"`
|
|
Expression string `xml:"Expression"`
|
|
ExpressionType string `xml:"ExpressionType"`
|
|
InputSerialization *SelectInputSerialization `xml:"InputSerialization"`
|
|
OutputSerialization *SelectOutputSerialization `xml:"OutputSerialization"`
|
|
RequestProgress string `xml:"RequestProgress>Enabled,omitempty"`
|
|
}
|
|
|
|
func (s *ObjectService) Select(ctx context.Context, name string, opt *ObjectSelectOptions) (io.ReadCloser, error) {
|
|
u := fmt.Sprintf("/%s?select&select-type=2", encodeURIComponent(name))
|
|
sendOpt := sendOptions{
|
|
baseURL: s.client.BaseURL.BucketURL,
|
|
uri: u,
|
|
method: http.MethodPost,
|
|
body: opt,
|
|
disableCloseBody: true,
|
|
}
|
|
resp, err := s.client.send(ctx, &sendOpt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
result := &ObjectSelectResponse{
|
|
Headers: resp.Header,
|
|
Body: resp.Body,
|
|
StatusCode: resp.StatusCode,
|
|
Frame: &ObjectSelectResult{
|
|
NextFrame: true,
|
|
Payload: []byte{},
|
|
},
|
|
Finish: false,
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (s *ObjectService) SelectToFile(ctx context.Context, name, file string, opt *ObjectSelectOptions) (*ObjectSelectResponse, error) {
|
|
resp, err := s.Select(ctx, name, opt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
res, _ := resp.(*ObjectSelectResponse)
|
|
defer func() {
|
|
io.Copy(ioutil.Discard, resp)
|
|
resp.Close()
|
|
}()
|
|
|
|
fd, err := os.OpenFile(file, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(0664))
|
|
if err != nil {
|
|
return res, err
|
|
}
|
|
|
|
_, err = io.Copy(fd, resp)
|
|
fd.Close()
|
|
res.Finish = true
|
|
return res, err
|
|
}
|
|
|
|
const (
|
|
kReadTimeout = 3
|
|
kMessageType = ":message-type"
|
|
kEventType = ":event-type"
|
|
kContentType = ":content-type"
|
|
|
|
kRecordsFrameType = iota
|
|
kContinuationFrameType
|
|
kProgressFrameType
|
|
kStatsFrameType
|
|
kEndFrameType
|
|
kErrorFrameType
|
|
)
|
|
|
|
type ProgressFrame struct {
|
|
XMLName xml.Name `xml:"Progress"`
|
|
BytesScanned int `xml:"BytesScanned"`
|
|
BytesProcessed int `xml:"BytesProcessed"`
|
|
BytesReturned int `xml:"BytesReturned"`
|
|
}
|
|
|
|
type StatsFrame struct {
|
|
XMLName xml.Name `xml:"Stats"`
|
|
BytesScanned int `xml:"BytesScanned"`
|
|
BytesProcessed int `xml:"BytesProcessed"`
|
|
BytesReturned int `xml:"BytesReturned"`
|
|
}
|
|
|
|
type DataFrame struct {
|
|
ContentType string
|
|
ConsumedBytesLength int32
|
|
LeftBytesLength int32
|
|
}
|
|
|
|
type ErrorFrame struct {
|
|
Code string
|
|
Message string
|
|
}
|
|
|
|
func (e *ErrorFrame) Error() string {
|
|
return fmt.Sprintf("Error Code: %s, Error Message: %s", e.Code, e.Message)
|
|
}
|
|
|
|
type ObjectSelectResult struct {
|
|
TotalFrameLength int32
|
|
TotalHeaderLength int32
|
|
NextFrame bool
|
|
FrameType int
|
|
Payload []byte
|
|
DataFrame DataFrame
|
|
ProgressFrame ProgressFrame
|
|
StatsFrame StatsFrame
|
|
ErrorFrame *ErrorFrame
|
|
}
|
|
|
|
type ObjectSelectResponse struct {
|
|
StatusCode int
|
|
Headers http.Header
|
|
Body io.ReadCloser
|
|
Frame *ObjectSelectResult
|
|
Finish bool
|
|
}
|
|
|
|
func (osr *ObjectSelectResponse) Read(p []byte) (n int, err error) {
|
|
n, err = osr.readFrames(p)
|
|
return
|
|
}
|
|
func (osr *ObjectSelectResponse) Close() error {
|
|
return osr.Body.Close()
|
|
}
|
|
|
|
func (osr *ObjectSelectResponse) readFrames(p []byte) (int, error) {
|
|
if osr.Finish {
|
|
return 0, io.EOF
|
|
}
|
|
if osr.Frame.ErrorFrame != nil {
|
|
return 0, osr.Frame.ErrorFrame
|
|
}
|
|
|
|
var err error
|
|
var nlen int
|
|
dlen := len(p)
|
|
|
|
for nlen < dlen {
|
|
if osr.Frame.NextFrame == true {
|
|
osr.Frame.NextFrame = false
|
|
err := osr.analysisPrelude()
|
|
if err != nil {
|
|
return nlen, err
|
|
}
|
|
err = osr.analysisHeader()
|
|
if err != nil {
|
|
return nlen, err
|
|
}
|
|
}
|
|
switch osr.Frame.FrameType {
|
|
case kRecordsFrameType:
|
|
n, err := osr.analysisRecords(p[nlen:])
|
|
if err != nil {
|
|
return nlen, err
|
|
}
|
|
nlen += n
|
|
case kContinuationFrameType:
|
|
err = osr.payloadChecksum("ContinuationFrame")
|
|
if err != nil {
|
|
return nlen, err
|
|
}
|
|
case kProgressFrameType:
|
|
err := osr.analysisXml(&osr.Frame.ProgressFrame)
|
|
if err != nil {
|
|
return nlen, err
|
|
}
|
|
case kStatsFrameType:
|
|
err := osr.analysisXml(&osr.Frame.StatsFrame)
|
|
if err != nil {
|
|
return nlen, err
|
|
}
|
|
case kEndFrameType:
|
|
err = osr.payloadChecksum("EndFrame")
|
|
if err != nil {
|
|
return nlen, err
|
|
}
|
|
osr.Finish = true
|
|
return nlen, io.EOF
|
|
case kErrorFrameType:
|
|
return nlen, osr.Frame.ErrorFrame
|
|
}
|
|
}
|
|
return nlen, err
|
|
}
|
|
|
|
func (osr *ObjectSelectResponse) analysisPrelude() error {
|
|
frame := make([]byte, 12)
|
|
_, err := osr.fixedLengthRead(frame, kReadTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var preludeCRC uint32
|
|
bytesToInt(frame[0:4], &osr.Frame.TotalFrameLength)
|
|
bytesToInt(frame[4:8], &osr.Frame.TotalHeaderLength)
|
|
bytesToInt(frame[8:12], &preludeCRC)
|
|
osr.Frame.Payload = append(osr.Frame.Payload, frame...)
|
|
|
|
return checksum(frame[0:8], preludeCRC, "Prelude")
|
|
}
|
|
|
|
func (osr *ObjectSelectResponse) analysisHeader() error {
|
|
var nlen int32
|
|
headers := make(map[string]string)
|
|
for nlen < osr.Frame.TotalHeaderLength {
|
|
var headerNameLen int8
|
|
var headerValueLen int16
|
|
bHeaderNameLen := make([]byte, 1)
|
|
_, err := osr.fixedLengthRead(bHeaderNameLen, kReadTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
nlen += 1
|
|
bytesToInt(bHeaderNameLen, &headerNameLen)
|
|
osr.Frame.Payload = append(osr.Frame.Payload, bHeaderNameLen...)
|
|
|
|
bHeaderName := make([]byte, headerNameLen)
|
|
_, err = osr.fixedLengthRead(bHeaderName, kReadTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
nlen += int32(headerNameLen)
|
|
headerName := string(bHeaderName)
|
|
osr.Frame.Payload = append(osr.Frame.Payload, bHeaderName...)
|
|
|
|
bValueTypeLen := make([]byte, 3)
|
|
_, err = osr.fixedLengthRead(bValueTypeLen, kReadTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
nlen += 3
|
|
bytesToInt(bValueTypeLen[1:], &headerValueLen)
|
|
osr.Frame.Payload = append(osr.Frame.Payload, bValueTypeLen...)
|
|
|
|
bHeaderValue := make([]byte, headerValueLen)
|
|
_, err = osr.fixedLengthRead(bHeaderValue, kReadTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
nlen += int32(headerValueLen)
|
|
headers[headerName] = string(bHeaderValue)
|
|
osr.Frame.Payload = append(osr.Frame.Payload, bHeaderValue...)
|
|
}
|
|
htype, ok := headers[kMessageType]
|
|
if !ok {
|
|
return fmt.Errorf("header parse failed, no message-type, headers: %+v\n", headers)
|
|
}
|
|
switch {
|
|
case htype == "error":
|
|
osr.Frame.FrameType = kErrorFrameType
|
|
osr.Frame.ErrorFrame = &ErrorFrame{}
|
|
osr.Frame.ErrorFrame.Code, _ = headers[":error-code"]
|
|
osr.Frame.ErrorFrame.Message, _ = headers[":error-message"]
|
|
case htype == "event":
|
|
hevent, ok := headers[kEventType]
|
|
if !ok {
|
|
return fmt.Errorf("header parse failed, no event-type, headers: %+v\n", headers)
|
|
}
|
|
switch {
|
|
case hevent == "Records":
|
|
hContentType, ok := headers[kContentType]
|
|
if ok {
|
|
osr.Frame.DataFrame.ContentType = hContentType
|
|
}
|
|
osr.Frame.FrameType = kRecordsFrameType
|
|
case hevent == "Cont":
|
|
osr.Frame.FrameType = kContinuationFrameType
|
|
case hevent == "Progress":
|
|
osr.Frame.FrameType = kProgressFrameType
|
|
case hevent == "Stats":
|
|
osr.Frame.FrameType = kStatsFrameType
|
|
case hevent == "End":
|
|
osr.Frame.FrameType = kEndFrameType
|
|
default:
|
|
return fmt.Errorf("header parse failed, invalid event-type, headers: %+v\n", headers)
|
|
}
|
|
default:
|
|
return fmt.Errorf("header parse failed, invalid message-type: headers: %+v\n", headers)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (osr *ObjectSelectResponse) analysisRecords(data []byte) (int, error) {
|
|
var needReadLength int32
|
|
dlen := int32(len(data))
|
|
restLen := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength - osr.Frame.DataFrame.ConsumedBytesLength
|
|
if dlen <= restLen {
|
|
needReadLength = dlen
|
|
} else {
|
|
needReadLength = restLen
|
|
}
|
|
n, err := osr.fixedLengthRead(data[:needReadLength], kReadTimeout)
|
|
if err != nil {
|
|
return n, fmt.Errorf("read data frame error: %s", err.Error())
|
|
}
|
|
osr.Frame.DataFrame.ConsumedBytesLength += int32(n)
|
|
osr.Frame.Payload = append(osr.Frame.Payload, data[:needReadLength]...)
|
|
// 读完了一帧数据并填充到data中了
|
|
if osr.Frame.DataFrame.ConsumedBytesLength == osr.Frame.TotalFrameLength-16-osr.Frame.TotalHeaderLength {
|
|
osr.Frame.DataFrame.ConsumedBytesLength = 0
|
|
err = osr.payloadChecksum("RecordFrame")
|
|
}
|
|
return n, err
|
|
}
|
|
|
|
func (osr *ObjectSelectResponse) analysisXml(frame interface{}) error {
|
|
payloadLength := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength
|
|
bFrame := make([]byte, payloadLength)
|
|
_, err := osr.fixedLengthRead(bFrame, kReadTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = xml.Unmarshal(bFrame, frame)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
osr.Frame.Payload = append(osr.Frame.Payload, bFrame...)
|
|
return osr.payloadChecksum("XmlFrame")
|
|
}
|
|
|
|
// 调用payloadChecksum时,表示该帧已读完,开始读取下一帧内容
|
|
func (osr *ObjectSelectResponse) payloadChecksum(ftype string) error {
|
|
bcrc := make([]byte, 4)
|
|
_, err := osr.fixedLengthRead(bcrc, kReadTimeout)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var res uint32
|
|
bytesToInt(bcrc, &res)
|
|
err = checksum(osr.Frame.Payload, res, ftype)
|
|
|
|
osr.Frame.NextFrame = true
|
|
osr.Frame.Payload = []byte{}
|
|
|
|
return err
|
|
}
|
|
|
|
type chanReadIO struct {
|
|
readLen int
|
|
err error
|
|
}
|
|
|
|
func (osr *ObjectSelectResponse) fixedLengthRead(p []byte, read_timeout int64) (int, error) {
|
|
timeout := time.Duration(read_timeout)
|
|
r := osr.Body
|
|
ch := make(chan chanReadIO, 1)
|
|
go func(p []byte) {
|
|
var needLen int
|
|
readChan := chanReadIO{}
|
|
needLen = len(p)
|
|
for {
|
|
n, err := r.Read(p[readChan.readLen:needLen])
|
|
readChan.readLen += n
|
|
if err != nil {
|
|
readChan.err = err
|
|
ch <- readChan
|
|
close(ch)
|
|
return
|
|
}
|
|
|
|
if readChan.readLen == needLen {
|
|
break
|
|
}
|
|
}
|
|
ch <- readChan
|
|
close(ch)
|
|
}(p)
|
|
|
|
select {
|
|
case <-time.After(time.Second * timeout):
|
|
return 0, fmt.Errorf("requestId: %s, readLen timeout, timeout is %d(second),need read:%d", osr.Headers.Get("x-cos-request-id"), timeout, len(p))
|
|
case result := <-ch:
|
|
return result.readLen, result.err
|
|
}
|
|
}
|
|
|
|
func bytesToInt(b []byte, ret interface{}) {
|
|
binBuf := bytes.NewBuffer(b)
|
|
binary.Read(binBuf, binary.BigEndian, ret)
|
|
}
|
|
|
|
func checksum(b []byte, rec uint32, ftype string) error {
|
|
c := crc32.ChecksumIEEE(b)
|
|
if c != rec {
|
|
return fmt.Errorf("parse type: %v, checksum failed, cal: %v, rec: %v\n", ftype, c, rec)
|
|
}
|
|
return nil
|
|
}
|