You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

445 lines
12 KiB

  1. package cos
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/binary"
  6. "encoding/xml"
  7. "fmt"
  8. "hash/crc32"
  9. "io"
  10. "io/ioutil"
  11. "net/http"
  12. "os"
  13. "time"
  14. )
  15. type JSONInputSerialization struct {
  16. Type string `xml:"Type,omitempty"`
  17. }
  18. type CSVInputSerialization struct {
  19. RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
  20. FieldDelimiter string `xml:"FieldDelimiter,omitempty"`
  21. QuoteCharacter string `xml:"QuoteCharacter,omitempty"`
  22. QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"`
  23. AllowQuotedRecordDelimiter string `xml:"AllowQuotedRecordDelimiter,omitempty"`
  24. FileHeaderInfo string `xml:"FileHeaderInfo,omitempty"`
  25. Comments string `xml:"Comments,omitempty"`
  26. }
  27. type SelectInputSerialization struct {
  28. CompressionType string `xml:"CompressionType,omitempty"`
  29. CSV *CSVInputSerialization `xml:"CSV,omitempty"`
  30. JSON *JSONInputSerialization `xml:"JSON,omitempty"`
  31. }
  32. type JSONOutputSerialization struct {
  33. RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
  34. }
  35. type CSVOutputSerialization struct {
  36. QuoteFields string `xml:"QuoteFields,omitempty"`
  37. RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
  38. FieldDelimiter string `xml:"FieldDelimiter,omitempty"`
  39. QuoteCharacter string `xml:"QuoteCharacter,omitempty"`
  40. QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"`
  41. }
  42. type SelectOutputSerialization struct {
  43. CSV *CSVOutputSerialization `xml:"CSV,omitempty"`
  44. JSON *JSONOutputSerialization `xml:"JSON,omitempty"`
  45. }
  46. type ObjectSelectOptions struct {
  47. XMLName xml.Name `xml:"SelectRequest"`
  48. Expression string `xml:"Expression"`
  49. ExpressionType string `xml:"ExpressionType"`
  50. InputSerialization *SelectInputSerialization `xml:"InputSerialization"`
  51. OutputSerialization *SelectOutputSerialization `xml:"OutputSerialization"`
  52. RequestProgress string `xml:"RequestProgress>Enabled,omitempty"`
  53. }
  54. func (s *ObjectService) Select(ctx context.Context, name string, opt *ObjectSelectOptions) (io.ReadCloser, error) {
  55. u := fmt.Sprintf("/%s?select&select-type=2", encodeURIComponent(name))
  56. sendOpt := sendOptions{
  57. baseURL: s.client.BaseURL.BucketURL,
  58. uri: u,
  59. method: http.MethodPost,
  60. body: opt,
  61. disableCloseBody: true,
  62. }
  63. resp, err := s.client.send(ctx, &sendOpt)
  64. if err != nil {
  65. return nil, err
  66. }
  67. result := &ObjectSelectResponse{
  68. Headers: resp.Header,
  69. Body: resp.Body,
  70. StatusCode: resp.StatusCode,
  71. Frame: &ObjectSelectResult{
  72. NextFrame: true,
  73. Payload: []byte{},
  74. },
  75. Finish: false,
  76. }
  77. return result, nil
  78. }
  79. func (s *ObjectService) SelectToFile(ctx context.Context, name, file string, opt *ObjectSelectOptions) (*ObjectSelectResponse, error) {
  80. resp, err := s.Select(ctx, name, opt)
  81. if err != nil {
  82. return nil, err
  83. }
  84. res, _ := resp.(*ObjectSelectResponse)
  85. defer func() {
  86. io.Copy(ioutil.Discard, resp)
  87. resp.Close()
  88. }()
  89. fd, err := os.OpenFile(file, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(0664))
  90. if err != nil {
  91. return res, err
  92. }
  93. _, err = io.Copy(fd, resp)
  94. fd.Close()
  95. res.Finish = true
  96. return res, err
  97. }
  98. const (
  99. kReadTimeout = 3
  100. kMessageType = ":message-type"
  101. kEventType = ":event-type"
  102. kContentType = ":content-type"
  103. kRecordsFrameType = iota
  104. kContinuationFrameType
  105. kProgressFrameType
  106. kStatsFrameType
  107. kEndFrameType
  108. kErrorFrameType
  109. )
  110. type ProgressFrame struct {
  111. XMLName xml.Name `xml:"Progress"`
  112. BytesScanned int `xml:"BytesScanned"`
  113. BytesProcessed int `xml:"BytesProcessed"`
  114. BytesReturned int `xml:"BytesReturned"`
  115. }
  116. type StatsFrame struct {
  117. XMLName xml.Name `xml:"Stats"`
  118. BytesScanned int `xml:"BytesScanned"`
  119. BytesProcessed int `xml:"BytesProcessed"`
  120. BytesReturned int `xml:"BytesReturned"`
  121. }
  122. type DataFrame struct {
  123. ContentType string
  124. ConsumedBytesLength int32
  125. LeftBytesLength int32
  126. }
  127. type ErrorFrame struct {
  128. Code string
  129. Message string
  130. }
  131. func (e *ErrorFrame) Error() string {
  132. return fmt.Sprintf("Error Code: %s, Error Message: %s", e.Code, e.Message)
  133. }
  134. type ObjectSelectResult struct {
  135. TotalFrameLength int32
  136. TotalHeaderLength int32
  137. NextFrame bool
  138. FrameType int
  139. Payload []byte
  140. DataFrame DataFrame
  141. ProgressFrame ProgressFrame
  142. StatsFrame StatsFrame
  143. ErrorFrame *ErrorFrame
  144. }
  145. type ObjectSelectResponse struct {
  146. StatusCode int
  147. Headers http.Header
  148. Body io.ReadCloser
  149. Frame *ObjectSelectResult
  150. Finish bool
  151. }
  152. func (osr *ObjectSelectResponse) Read(p []byte) (n int, err error) {
  153. n, err = osr.readFrames(p)
  154. return
  155. }
  156. func (osr *ObjectSelectResponse) Close() error {
  157. return osr.Body.Close()
  158. }
  159. func (osr *ObjectSelectResponse) readFrames(p []byte) (int, error) {
  160. if osr.Finish {
  161. return 0, io.EOF
  162. }
  163. if osr.Frame.ErrorFrame != nil {
  164. return 0, osr.Frame.ErrorFrame
  165. }
  166. var err error
  167. var nlen int
  168. dlen := len(p)
  169. for nlen < dlen {
  170. if osr.Frame.NextFrame == true {
  171. osr.Frame.NextFrame = false
  172. err := osr.analysisPrelude()
  173. if err != nil {
  174. return nlen, err
  175. }
  176. err = osr.analysisHeader()
  177. if err != nil {
  178. return nlen, err
  179. }
  180. }
  181. switch osr.Frame.FrameType {
  182. case kRecordsFrameType:
  183. n, err := osr.analysisRecords(p[nlen:])
  184. if err != nil {
  185. return nlen, err
  186. }
  187. nlen += n
  188. case kContinuationFrameType:
  189. err = osr.payloadChecksum("ContinuationFrame")
  190. if err != nil {
  191. return nlen, err
  192. }
  193. case kProgressFrameType:
  194. err := osr.analysisXml(&osr.Frame.ProgressFrame)
  195. if err != nil {
  196. return nlen, err
  197. }
  198. case kStatsFrameType:
  199. err := osr.analysisXml(&osr.Frame.StatsFrame)
  200. if err != nil {
  201. return nlen, err
  202. }
  203. case kEndFrameType:
  204. err = osr.payloadChecksum("EndFrame")
  205. if err != nil {
  206. return nlen, err
  207. }
  208. osr.Finish = true
  209. return nlen, io.EOF
  210. case kErrorFrameType:
  211. return nlen, osr.Frame.ErrorFrame
  212. }
  213. }
  214. return nlen, err
  215. }
  216. func (osr *ObjectSelectResponse) analysisPrelude() error {
  217. frame := make([]byte, 12)
  218. _, err := osr.fixedLengthRead(frame, kReadTimeout)
  219. if err != nil {
  220. return err
  221. }
  222. var preludeCRC uint32
  223. bytesToInt(frame[0:4], &osr.Frame.TotalFrameLength)
  224. bytesToInt(frame[4:8], &osr.Frame.TotalHeaderLength)
  225. bytesToInt(frame[8:12], &preludeCRC)
  226. osr.Frame.Payload = append(osr.Frame.Payload, frame...)
  227. return checksum(frame[0:8], preludeCRC, "Prelude")
  228. }
  229. func (osr *ObjectSelectResponse) analysisHeader() error {
  230. var nlen int32
  231. headers := make(map[string]string)
  232. for nlen < osr.Frame.TotalHeaderLength {
  233. var headerNameLen int8
  234. var headerValueLen int16
  235. bHeaderNameLen := make([]byte, 1)
  236. _, err := osr.fixedLengthRead(bHeaderNameLen, kReadTimeout)
  237. if err != nil {
  238. return err
  239. }
  240. nlen += 1
  241. bytesToInt(bHeaderNameLen, &headerNameLen)
  242. osr.Frame.Payload = append(osr.Frame.Payload, bHeaderNameLen...)
  243. bHeaderName := make([]byte, headerNameLen)
  244. _, err = osr.fixedLengthRead(bHeaderName, kReadTimeout)
  245. if err != nil {
  246. return err
  247. }
  248. nlen += int32(headerNameLen)
  249. headerName := string(bHeaderName)
  250. osr.Frame.Payload = append(osr.Frame.Payload, bHeaderName...)
  251. bValueTypeLen := make([]byte, 3)
  252. _, err = osr.fixedLengthRead(bValueTypeLen, kReadTimeout)
  253. if err != nil {
  254. return err
  255. }
  256. nlen += 3
  257. bytesToInt(bValueTypeLen[1:], &headerValueLen)
  258. osr.Frame.Payload = append(osr.Frame.Payload, bValueTypeLen...)
  259. bHeaderValue := make([]byte, headerValueLen)
  260. _, err = osr.fixedLengthRead(bHeaderValue, kReadTimeout)
  261. if err != nil {
  262. return err
  263. }
  264. nlen += int32(headerValueLen)
  265. headers[headerName] = string(bHeaderValue)
  266. osr.Frame.Payload = append(osr.Frame.Payload, bHeaderValue...)
  267. }
  268. htype, ok := headers[kMessageType]
  269. if !ok {
  270. return fmt.Errorf("header parse failed, no message-type, headers: %+v\n", headers)
  271. }
  272. switch {
  273. case htype == "error":
  274. osr.Frame.FrameType = kErrorFrameType
  275. osr.Frame.ErrorFrame = &ErrorFrame{}
  276. osr.Frame.ErrorFrame.Code, _ = headers[":error-code"]
  277. osr.Frame.ErrorFrame.Message, _ = headers[":error-message"]
  278. case htype == "event":
  279. hevent, ok := headers[kEventType]
  280. if !ok {
  281. return fmt.Errorf("header parse failed, no event-type, headers: %+v\n", headers)
  282. }
  283. switch {
  284. case hevent == "Records":
  285. hContentType, ok := headers[kContentType]
  286. if ok {
  287. osr.Frame.DataFrame.ContentType = hContentType
  288. }
  289. osr.Frame.FrameType = kRecordsFrameType
  290. case hevent == "Cont":
  291. osr.Frame.FrameType = kContinuationFrameType
  292. case hevent == "Progress":
  293. osr.Frame.FrameType = kProgressFrameType
  294. case hevent == "Stats":
  295. osr.Frame.FrameType = kStatsFrameType
  296. case hevent == "End":
  297. osr.Frame.FrameType = kEndFrameType
  298. default:
  299. return fmt.Errorf("header parse failed, invalid event-type, headers: %+v\n", headers)
  300. }
  301. default:
  302. return fmt.Errorf("header parse failed, invalid message-type: headers: %+v\n", headers)
  303. }
  304. return nil
  305. }
  306. func (osr *ObjectSelectResponse) analysisRecords(data []byte) (int, error) {
  307. var needReadLength int32
  308. dlen := int32(len(data))
  309. restLen := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength - osr.Frame.DataFrame.ConsumedBytesLength
  310. if dlen <= restLen {
  311. needReadLength = dlen
  312. } else {
  313. needReadLength = restLen
  314. }
  315. n, err := osr.fixedLengthRead(data[:needReadLength], kReadTimeout)
  316. if err != nil {
  317. return n, fmt.Errorf("read data frame error: %s", err.Error())
  318. }
  319. osr.Frame.DataFrame.ConsumedBytesLength += int32(n)
  320. osr.Frame.Payload = append(osr.Frame.Payload, data[:needReadLength]...)
  321. // 读完了一帧数据并填充到data中了
  322. if osr.Frame.DataFrame.ConsumedBytesLength == osr.Frame.TotalFrameLength-16-osr.Frame.TotalHeaderLength {
  323. osr.Frame.DataFrame.ConsumedBytesLength = 0
  324. err = osr.payloadChecksum("RecordFrame")
  325. }
  326. return n, err
  327. }
  328. func (osr *ObjectSelectResponse) analysisXml(frame interface{}) error {
  329. payloadLength := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength
  330. bFrame := make([]byte, payloadLength)
  331. _, err := osr.fixedLengthRead(bFrame, kReadTimeout)
  332. if err != nil {
  333. return err
  334. }
  335. err = xml.Unmarshal(bFrame, frame)
  336. if err != nil {
  337. return err
  338. }
  339. osr.Frame.Payload = append(osr.Frame.Payload, bFrame...)
  340. return osr.payloadChecksum("XmlFrame")
  341. }
  342. // 调用payloadChecksum时,表示该帧已读完,开始读取下一帧内容
  343. func (osr *ObjectSelectResponse) payloadChecksum(ftype string) error {
  344. bcrc := make([]byte, 4)
  345. _, err := osr.fixedLengthRead(bcrc, kReadTimeout)
  346. if err != nil {
  347. return err
  348. }
  349. var res uint32
  350. bytesToInt(bcrc, &res)
  351. err = checksum(osr.Frame.Payload, res, ftype)
  352. osr.Frame.NextFrame = true
  353. osr.Frame.Payload = []byte{}
  354. return err
  355. }
  356. type chanReadIO struct {
  357. readLen int
  358. err error
  359. }
  360. func (osr *ObjectSelectResponse) fixedLengthRead(p []byte, read_timeout int64) (int, error) {
  361. timeout := time.Duration(read_timeout)
  362. r := osr.Body
  363. ch := make(chan chanReadIO, 1)
  364. go func(p []byte) {
  365. var needLen int
  366. readChan := chanReadIO{}
  367. needLen = len(p)
  368. for {
  369. n, err := r.Read(p[readChan.readLen:needLen])
  370. readChan.readLen += n
  371. if err != nil {
  372. readChan.err = err
  373. ch <- readChan
  374. close(ch)
  375. return
  376. }
  377. if readChan.readLen == needLen {
  378. break
  379. }
  380. }
  381. ch <- readChan
  382. close(ch)
  383. }(p)
  384. select {
  385. case <-time.After(time.Second * timeout):
  386. return 0, fmt.Errorf("requestId: %s, readLen timeout, timeout is %d(second),need read:%d", osr.Headers.Get("x-cos-request-id"), timeout, len(p))
  387. case result := <-ch:
  388. return result.readLen, result.err
  389. }
  390. }
  391. func bytesToInt(b []byte, ret interface{}) {
  392. binBuf := bytes.NewBuffer(b)
  393. binary.Read(binBuf, binary.BigEndian, ret)
  394. }
  395. func checksum(b []byte, rec uint32, ftype string) error {
  396. c := crc32.ChecksumIEEE(b)
  397. if c != rec {
  398. return fmt.Errorf("parse type: %v, checksum failed, cal: %v, rec: %v\n", ftype, c, rec)
  399. }
  400. return nil
  401. }