You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

160 lines
3.8 KiB

4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
  1. package cos
  2. import (
  3. "fmt"
  4. "hash"
  5. "io"
  6. )
  7. type ProgressEventType int
  8. const (
  9. // 数据开始传输
  10. ProgressStartedEvent ProgressEventType = iota
  11. // 数据传输中
  12. ProgressDataEvent
  13. // 数据传输完成, 但不能表示对应API调用完成
  14. ProgressCompletedEvent
  15. // 只有在数据传输时发生错误才会返回
  16. ProgressFailedEvent
  17. )
  18. type ProgressEvent struct {
  19. EventType ProgressEventType
  20. RWBytes int64
  21. ConsumedBytes int64
  22. TotalBytes int64
  23. Err error
  24. }
  25. func newProgressEvent(eventType ProgressEventType, rwBytes, consumed, total int64, err ...error) *ProgressEvent {
  26. event := &ProgressEvent{
  27. EventType: eventType,
  28. RWBytes: rwBytes,
  29. ConsumedBytes: consumed,
  30. TotalBytes: total,
  31. }
  32. if len(err) > 0 {
  33. event.Err = err[0]
  34. }
  35. return event
  36. }
  37. // 用户自定义Listener需要实现该方法
  38. type ProgressListener interface {
  39. ProgressChangedCallback(event *ProgressEvent)
  40. }
  41. func progressCallback(listener ProgressListener, event *ProgressEvent) {
  42. if listener != nil && event != nil {
  43. listener.ProgressChangedCallback(event)
  44. }
  45. }
  46. type teeReader struct {
  47. reader io.Reader
  48. writer io.Writer
  49. consumedBytes int64
  50. totalBytes int64
  51. listener ProgressListener
  52. disableCheckSum bool
  53. }
  54. func (r *teeReader) Read(p []byte) (int, error) {
  55. if r.consumedBytes == 0 {
  56. event := newProgressEvent(ProgressStartedEvent, 0, r.consumedBytes, r.totalBytes)
  57. progressCallback(r.listener, event)
  58. }
  59. n, err := r.reader.Read(p)
  60. if err != nil && err != io.EOF {
  61. event := newProgressEvent(ProgressFailedEvent, 0, r.consumedBytes, r.totalBytes, err)
  62. progressCallback(r.listener, event)
  63. }
  64. if n > 0 {
  65. r.consumedBytes += int64(n)
  66. if r.writer != nil {
  67. if n, err := r.writer.Write(p[:n]); err != nil {
  68. return n, err
  69. }
  70. }
  71. if r.listener != nil {
  72. event := newProgressEvent(ProgressDataEvent, int64(n), r.consumedBytes, r.totalBytes)
  73. progressCallback(r.listener, event)
  74. }
  75. }
  76. if err == io.EOF {
  77. event := newProgressEvent(ProgressCompletedEvent, int64(n), r.consumedBytes, r.totalBytes)
  78. progressCallback(r.listener, event)
  79. }
  80. return n, err
  81. }
  82. func (r *teeReader) Close() error {
  83. if rc, ok := r.reader.(io.ReadCloser); ok {
  84. return rc.Close()
  85. }
  86. return nil
  87. }
  88. func (r *teeReader) Size() int64 {
  89. return r.totalBytes
  90. }
  91. func (r *teeReader) Crc64() uint64 {
  92. if r.writer != nil {
  93. if th, ok := r.writer.(hash.Hash64); ok {
  94. return th.Sum64()
  95. }
  96. }
  97. return 0
  98. }
  99. func (r *teeReader) Sum() []byte {
  100. if r.writer != nil {
  101. if th, ok := r.writer.(hash.Hash); ok {
  102. return th.Sum(nil)
  103. }
  104. }
  105. return []byte{}
  106. }
  107. func TeeReader(reader io.Reader, writer io.Writer, total int64, listener ProgressListener) *teeReader {
  108. return &teeReader{
  109. reader: reader,
  110. writer: writer,
  111. consumedBytes: 0,
  112. totalBytes: total,
  113. listener: listener,
  114. disableCheckSum: false,
  115. }
  116. }
  117. type FixedLengthReader interface {
  118. io.Reader
  119. Size() int64
  120. }
  121. type DefaultProgressListener struct {
  122. }
  123. func (l *DefaultProgressListener) ProgressChangedCallback(event *ProgressEvent) {
  124. switch event.EventType {
  125. case ProgressStartedEvent:
  126. fmt.Printf("Transfer Start [ConsumedBytes/TotalBytes: %d/%d]\n",
  127. event.ConsumedBytes, event.TotalBytes)
  128. case ProgressDataEvent:
  129. fmt.Printf("\rTransfer Data [ConsumedBytes/TotalBytes: %d/%d, %d%%]",
  130. event.ConsumedBytes, event.TotalBytes, event.ConsumedBytes*100/event.TotalBytes)
  131. case ProgressCompletedEvent:
  132. fmt.Printf("\nTransfer Complete [ConsumedBytes/TotalBytes: %d/%d]\n",
  133. event.ConsumedBytes, event.TotalBytes)
  134. case ProgressFailedEvent:
  135. fmt.Printf("\nTransfer Failed [ConsumedBytes/TotalBytes: %d/%d] [Err: %v]\n",
  136. event.ConsumedBytes, event.TotalBytes, event.Err)
  137. default:
  138. fmt.Printf("Progress Changed Error: unknown progress event type\n")
  139. }
  140. }