package cos

import (
	"bytes"
	"context"
	"encoding/binary"
	"encoding/xml"
	"fmt"
	"hash/crc32"
	"io"
	"io/ioutil"
	"net/http"
	"os"
	"time"
)

type JSONInputSerialization struct {
	Type string `xml:"Type"`
}

type CSVInputSerialization struct {
	RecordDelimiter            string `xml:"RecordDelimiter,omitempty"`
	FieldDelimiter             string `xml:"FieldDelimiter,omitempty"`
	QuoteCharacter             string `xml:"QuoteCharacter,omitempty"`
	QuoteEscapeCharacter       string `xml:"QuoteEscapeCharacter,omitempty"`
	AllowQuotedRecordDelimiter string `xml:"AllowQuotedRecordDelimiter,omitempty"`
	FileHeaderInfo             string `xml:"FileHeaderInfo,omitempty"`
	Comments                   string `xml:"Comments,omitempty"`
}

type SelectInputSerialization struct {
	CompressionType string                  `xml:"CompressionType,omitempty"`
	CSV             *CSVInputSerialization  `xml:"CSV,omitempty"`
	JSON            *JSONInputSerialization `xml:"JSON,omitempty"`
}

type JSONOutputSerialization struct {
	RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
}

type CSVOutputSerialization struct {
	QuoteFileds          string `xml:"QuoteFileds,omitempty"`
	RecordDelimiter      string `xml:"RecordDelimiter,omitempty"`
	FieldDelimiter       string `xml:"FieldDelimiter,omitempty"`
	QuoteCharacter       string `xml:"QuoteCharacter,omitempty"`
	QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"`
}

type SelectOutputSerialization struct {
	CSV  *CSVOutputSerialization  `xml:"CSV,omitempty"`
	JSON *JSONOutputSerialization `xml:"JSON,omitempty"`
}

type ObjectSelectOptions struct {
	XMLName             xml.Name                   `xml:"SelectRequest"`
	Expression          string                     `xml:"Expression"`
	ExpressionType      string                     `xml:"ExpressionType"`
	InputSerialization  *SelectInputSerialization  `xml:"InputSerialization"`
	OutputSerialization *SelectOutputSerialization `xml:"OutputSerialization"`
	RequestProgress     string                     `xml:"RequestProgress>Enabled,omitempty"`
}

func (s *ObjectService) Select(ctx context.Context, name string, opt *ObjectSelectOptions) (io.ReadCloser, error) {
	u := fmt.Sprintf("/%s?select&select-type=2", encodeURIComponent(name))
	sendOpt := sendOptions{
		baseURL:          s.client.BaseURL.BucketURL,
		uri:              u,
		method:           http.MethodPost,
		body:             opt,
		disableCloseBody: true,
	}
	resp, err := s.client.send(ctx, &sendOpt)
	if err != nil {
		return nil, err
	}
	result := &ObjectSelectResponse{
		Headers:    resp.Header,
		Body:       resp.Body,
		StatusCode: resp.StatusCode,
		Frame: &ObjectSelectResult{
			NextFrame: true,
			Payload:   []byte{},
		},
		Finish: false,
	}

	return result, nil
}

func (s *ObjectService) SelectToFile(ctx context.Context, name, file string, opt *ObjectSelectOptions) (*ObjectSelectResponse, error) {
	resp, err := s.Select(ctx, name, opt)
	if err != nil {
		return nil, err
	}
	res, _ := resp.(*ObjectSelectResponse)
	defer func() {
		io.Copy(ioutil.Discard, resp)
		resp.Close()
	}()

	fd, err := os.OpenFile(file, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(0664))
	if err != nil {
		return res, err
	}

	_, err = io.Copy(fd, resp)
	fd.Close()
	res.Finish = true
	return res, err
}

const (
	kReadTimeout = 3
	kMessageType = ":message-type"
	kEventType   = ":event-type"
	kContentType = ":content-type"

	kRecordsFrameType = iota
	kContinuationFrameType
	kProgressFrameType
	kStatsFrameType
	kEndFrameType
	kErrorFrameType
)

type ProgressFrame struct {
	XMLName        xml.Name `xml:"Progress"`
	BytesScanned   int      `xml:"BytesScanned"`
	BytesProcessed int      `xml:"BytesProcessed"`
	BytesReturned  int      `xml:"BytesReturned"`
}

type StatsFrame struct {
	XMLName        xml.Name `xml:"Stats"`
	BytesScanned   int      `xml:"BytesScanned"`
	BytesProcessed int      `xml:"BytesProcessed"`
	BytesReturned  int      `xml:"BytesReturned"`
}

type DataFrame struct {
	ContentType         string
	ConsumedBytesLength int32
	LeftBytesLength     int32
}

type ErrorFrame struct {
	Code    string
	Message string
}

func (e *ErrorFrame) Error() string {
	return fmt.Sprintf("Error Code: %s, Error Message: %s", e.Code, e.Message)
}

type ObjectSelectResult struct {
	TotalFrameLength  int32
	TotalHeaderLength int32
	NextFrame         bool
	FrameType         int
	Payload           []byte
	DataFrame         DataFrame
	ProgressFrame     ProgressFrame
	StatsFrame        StatsFrame
	ErrorFrame        *ErrorFrame
}

type ObjectSelectResponse struct {
	StatusCode int
	Headers    http.Header
	Body       io.ReadCloser
	Frame      *ObjectSelectResult
	Finish     bool
}

func (osr *ObjectSelectResponse) Read(p []byte) (n int, err error) {
	n, err = osr.readFrames(p)
	return
}
func (osr *ObjectSelectResponse) Close() error {
	return osr.Body.Close()
}

func (osr *ObjectSelectResponse) readFrames(p []byte) (int, error) {
	if osr.Finish {
		return 0, io.EOF
	}
	if osr.Frame.ErrorFrame != nil {
		return 0, osr.Frame.ErrorFrame
	}

	var err error
	var nlen int
	dlen := len(p)

	for nlen < dlen {
		if osr.Frame.NextFrame == true {
			osr.Frame.NextFrame = false
			err := osr.analysisPrelude()
			if err != nil {
				return nlen, err
			}
			err = osr.analysisHeader()
			if err != nil {
				return nlen, err
			}
		}
		switch osr.Frame.FrameType {
		case kRecordsFrameType:
			n, err := osr.analysisRecords(p[nlen:])
			if err != nil {
				return nlen, err
			}
			nlen += n
		case kContinuationFrameType:
			err = osr.payloadChecksum("ContinuationFrame")
			if err != nil {
				return nlen, err
			}
		case kProgressFrameType:
			err := osr.analysisXml(&osr.Frame.ProgressFrame)
			if err != nil {
				return nlen, err
			}
		case kStatsFrameType:
			err := osr.analysisXml(&osr.Frame.StatsFrame)
			if err != nil {
				return nlen, err
			}
		case kEndFrameType:
			err = osr.payloadChecksum("EndFrame")
			if err != nil {
				return nlen, err
			}
			osr.Finish = true
			return nlen, io.EOF
		case kErrorFrameType:
			return nlen, osr.Frame.ErrorFrame
		}
	}
	return nlen, err
}

func (osr *ObjectSelectResponse) analysisPrelude() error {
	frame := make([]byte, 12)
	_, err := osr.fixedLengthRead(frame, kReadTimeout)
	if err != nil {
		return err
	}

	var preludeCRC uint32
	bytesToInt(frame[0:4], &osr.Frame.TotalFrameLength)
	bytesToInt(frame[4:8], &osr.Frame.TotalHeaderLength)
	bytesToInt(frame[8:12], &preludeCRC)
	osr.Frame.Payload = append(osr.Frame.Payload, frame...)

	return checksum(frame[0:8], preludeCRC, "Prelude")
}

func (osr *ObjectSelectResponse) analysisHeader() error {
	var nlen int32
	headers := make(map[string]string)
	for nlen < osr.Frame.TotalHeaderLength {
		var headerNameLen int8
		var headerValueLen int16
		bHeaderNameLen := make([]byte, 1)
		_, err := osr.fixedLengthRead(bHeaderNameLen, kReadTimeout)
		if err != nil {
			return err
		}
		nlen += 1
		bytesToInt(bHeaderNameLen, &headerNameLen)
		osr.Frame.Payload = append(osr.Frame.Payload, bHeaderNameLen...)

		bHeaderName := make([]byte, headerNameLen)
		_, err = osr.fixedLengthRead(bHeaderName, kReadTimeout)
		if err != nil {
			return err
		}
		nlen += int32(headerNameLen)
		headerName := string(bHeaderName)
		osr.Frame.Payload = append(osr.Frame.Payload, bHeaderName...)

		bValueTypeLen := make([]byte, 3)
		_, err = osr.fixedLengthRead(bValueTypeLen, kReadTimeout)
		if err != nil {
			return err
		}
		nlen += 3
		bytesToInt(bValueTypeLen[1:], &headerValueLen)
		osr.Frame.Payload = append(osr.Frame.Payload, bValueTypeLen...)

		bHeaderValue := make([]byte, headerValueLen)
		_, err = osr.fixedLengthRead(bHeaderValue, kReadTimeout)
		if err != nil {
			return err
		}
		nlen += int32(headerValueLen)
		headers[headerName] = string(bHeaderValue)
		osr.Frame.Payload = append(osr.Frame.Payload, bHeaderValue...)
	}
	htype, ok := headers[kMessageType]
	if !ok {
		return fmt.Errorf("header parse failed, no message-type, headers: %+v\n", headers)
	}
	switch {
	case htype == "error":
		osr.Frame.FrameType = kErrorFrameType
		osr.Frame.ErrorFrame = &ErrorFrame{}
		osr.Frame.ErrorFrame.Code, _ = headers[":error-code"]
		osr.Frame.ErrorFrame.Message, _ = headers[":error-message"]
	case htype == "event":
		hevent, ok := headers[kEventType]
		if !ok {
			return fmt.Errorf("header parse failed, no event-type, headers: %+v\n", headers)
		}
		switch {
		case hevent == "Records":
			hContentType, ok := headers[kContentType]
			if ok {
				osr.Frame.DataFrame.ContentType = hContentType
			}
			osr.Frame.FrameType = kRecordsFrameType
		case hevent == "Cont":
			osr.Frame.FrameType = kContinuationFrameType
		case hevent == "Progress":
			osr.Frame.FrameType = kProgressFrameType
		case hevent == "Stats":
			osr.Frame.FrameType = kStatsFrameType
		case hevent == "End":
			osr.Frame.FrameType = kEndFrameType
		default:
			return fmt.Errorf("header parse failed, invalid event-type, headers: %+v\n", headers)
		}
	default:
		return fmt.Errorf("header parse failed, invalid message-type: headers: %+v\n", headers)
	}
	return nil
}

func (osr *ObjectSelectResponse) analysisRecords(data []byte) (int, error) {
	var needReadLength int32
	dlen := int32(len(data))
	restLen := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength - osr.Frame.DataFrame.ConsumedBytesLength
	if dlen <= restLen {
		needReadLength = dlen
	} else {
		needReadLength = restLen
	}
	n, err := osr.fixedLengthRead(data[:needReadLength], kReadTimeout)
	if err != nil {
		return n, fmt.Errorf("read data frame error: %s", err.Error())
	}
	osr.Frame.DataFrame.ConsumedBytesLength += int32(n)
	osr.Frame.Payload = append(osr.Frame.Payload, data[:needReadLength]...)
	// 读完了一帧数据并填充到data中了
	if osr.Frame.DataFrame.ConsumedBytesLength == osr.Frame.TotalFrameLength-16-osr.Frame.TotalHeaderLength {
		osr.Frame.DataFrame.ConsumedBytesLength = 0
		err = osr.payloadChecksum("RecordFrame")
	}
	return n, err
}

func (osr *ObjectSelectResponse) analysisXml(frame interface{}) error {
	payloadLength := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength
	bFrame := make([]byte, payloadLength)
	_, err := osr.fixedLengthRead(bFrame, kReadTimeout)
	if err != nil {
		return err
	}
	err = xml.Unmarshal(bFrame, frame)
	if err != nil {
		return err
	}
	osr.Frame.Payload = append(osr.Frame.Payload, bFrame...)
	return osr.payloadChecksum("XmlFrame")
}

// 调用payloadChecksum时,表示该帧已读完,开始读取下一帧内容
func (osr *ObjectSelectResponse) payloadChecksum(ftype string) error {
	bcrc := make([]byte, 4)
	_, err := osr.fixedLengthRead(bcrc, kReadTimeout)
	if err != nil {
		return err
	}
	var res uint32
	bytesToInt(bcrc, &res)
	err = checksum(osr.Frame.Payload, res, ftype)

	osr.Frame.NextFrame = true
	osr.Frame.Payload = []byte{}

	return err
}

type chanReadIO struct {
	readLen int
	err     error
}

func (osr *ObjectSelectResponse) fixedLengthRead(p []byte, read_timeout int64) (int, error) {
	timeout := time.Duration(read_timeout)
	r := osr.Body
	ch := make(chan chanReadIO, 1)
	defer close(ch)
	go func(p []byte) {
		var needLen int
		readChan := chanReadIO{}
		needLen = len(p)
		for {
			n, err := r.Read(p[readChan.readLen:needLen])
			readChan.readLen += n
			if err != nil {
				readChan.err = err
				ch <- readChan
				return
			}

			if readChan.readLen == needLen {
				break
			}
		}
		ch <- readChan
	}(p)

	select {
	case <-time.After(time.Second * timeout):
		return 0, fmt.Errorf("requestId: %s, readLen timeout, timeout is %d(second),need read:%d", "sr.Headers.Get(HTTPHeaderOssRequestID)", timeout, len(p))
	case result := <-ch:
		return result.readLen, result.err
	}
}

func bytesToInt(b []byte, ret interface{}) {
	binBuf := bytes.NewBuffer(b)
	binary.Read(binBuf, binary.BigEndian, ret)
}

func checksum(b []byte, rec uint32, ftype string) error {
	c := crc32.ChecksumIEEE(b)
	if c != rec {
		return fmt.Errorf("parse type: %v, checksum failed, cal: %v, rec: %v\n", ftype, c, rec)
	}
	return nil
}