websocket 增加多分组 fork https://github.com/olahol/melody
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

218 lines
4.4 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
8 years ago
10 years ago
10 years ago
9 years ago
8 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
8 years ago
  1. package melody
  2. import (
  3. "errors"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. )
  9. // Session wrapper around websocket connections.
  10. type Session struct {
  11. Request *http.Request
  12. Keys map[string]interface{}
  13. conn *websocket.Conn
  14. output chan *envelope
  15. melody *Melody
  16. open bool
  17. rwmutex *sync.RWMutex
  18. }
  19. func (s *Session) writeMessage(message *envelope) {
  20. if s.closed() {
  21. s.melody.errorHandler(s, errors.New("tried to write to closed a session"))
  22. return
  23. }
  24. select {
  25. case s.output <- message:
  26. default:
  27. s.melody.errorHandler(s, errors.New("session message buffer is full"))
  28. }
  29. }
  30. func (s *Session) writeRaw(message *envelope) error {
  31. if s.closed() {
  32. return errors.New("tried to write to a closed session")
  33. }
  34. s.conn.SetWriteDeadline(time.Now().Add(s.melody.Config.WriteWait))
  35. err := s.conn.WriteMessage(message.t, message.msg)
  36. if err != nil {
  37. return err
  38. }
  39. return nil
  40. }
  41. func (s *Session) closed() bool {
  42. s.rwmutex.RLock()
  43. defer s.rwmutex.RUnlock()
  44. return !s.open
  45. }
  46. func (s *Session) close() {
  47. if !s.closed() {
  48. s.rwmutex.Lock()
  49. s.open = false
  50. s.conn.Close()
  51. close(s.output)
  52. s.rwmutex.Unlock()
  53. }
  54. }
  55. func (s *Session) ping() {
  56. s.writeRaw(&envelope{t: websocket.PingMessage, msg: []byte{}})
  57. }
  58. func (s *Session) writePump() {
  59. ticker := time.NewTicker(s.melody.Config.PingPeriod)
  60. defer ticker.Stop()
  61. loop:
  62. for {
  63. select {
  64. case msg, ok := <-s.output:
  65. if !ok {
  66. break loop
  67. }
  68. err := s.writeRaw(msg)
  69. if err != nil {
  70. s.melody.errorHandler(s, err)
  71. break loop
  72. }
  73. if msg.t == websocket.CloseMessage {
  74. break loop
  75. }
  76. if msg.t == websocket.TextMessage {
  77. s.melody.messageSentHandler(s, msg.msg)
  78. }
  79. if msg.t == websocket.BinaryMessage {
  80. s.melody.messageSentHandlerBinary(s, msg.msg)
  81. }
  82. case <-ticker.C:
  83. s.ping()
  84. }
  85. }
  86. }
  87. func (s *Session) readPump() {
  88. s.conn.SetReadLimit(s.melody.Config.MaxMessageSize)
  89. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  90. s.conn.SetPongHandler(func(string) error {
  91. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  92. s.melody.pongHandler(s)
  93. return nil
  94. })
  95. s.conn.SetCloseHandler(func(code int, text string) error {
  96. s.melody.closeHandler(s, code, text)
  97. return nil
  98. })
  99. for {
  100. t, message, err := s.conn.ReadMessage()
  101. if err != nil {
  102. s.melody.errorHandler(s, err)
  103. break
  104. }
  105. if t == websocket.TextMessage {
  106. s.melody.messageHandler(s, message)
  107. }
  108. if t == websocket.BinaryMessage {
  109. s.melody.messageHandlerBinary(s, message)
  110. }
  111. }
  112. }
  113. // Write writes message to session.
  114. func (s *Session) Write(msg []byte) error {
  115. if s.closed() {
  116. return errors.New("session is closed")
  117. }
  118. s.writeMessage(&envelope{t: websocket.TextMessage, msg: msg})
  119. return nil
  120. }
  121. // WriteBinary writes a binary message to session.
  122. func (s *Session) WriteBinary(msg []byte) error {
  123. if s.closed() {
  124. return errors.New("session is closed")
  125. }
  126. s.writeMessage(&envelope{t: websocket.BinaryMessage, msg: msg})
  127. return nil
  128. }
  129. // Close closes session.
  130. func (s *Session) Close() error {
  131. if s.closed() {
  132. return errors.New("session is already closed")
  133. }
  134. s.writeMessage(&envelope{t: websocket.CloseMessage, msg: []byte{}})
  135. return nil
  136. }
  137. // CloseWithMsg closes the session with the provided payload.
  138. // Use the FormatCloseMessage function to format a proper close message payload.
  139. func (s *Session) CloseWithMsg(msg []byte) error {
  140. if s.closed() {
  141. return errors.New("session is already closed")
  142. }
  143. s.writeMessage(&envelope{t: websocket.CloseMessage, msg: msg})
  144. return nil
  145. }
  146. // Set is used to store a new key/value pair exclusivelly for this session.
  147. // It also lazy initializes s.Keys if it was not used previously.
  148. func (s *Session) Set(key string, value interface{}) {
  149. if s.Keys == nil {
  150. s.Keys = make(map[string]interface{})
  151. }
  152. s.Keys[key] = value
  153. }
  154. // Get returns the value for the given key, ie: (value, true).
  155. // If the value does not exists it returns (nil, false)
  156. func (s *Session) Get(key string) (value interface{}, exists bool) {
  157. if s.Keys != nil {
  158. value, exists = s.Keys[key]
  159. }
  160. return
  161. }
  162. // MustGet returns the value for the given key if it exists, otherwise it panics.
  163. func (s *Session) MustGet(key string) interface{} {
  164. if value, exists := s.Get(key); exists {
  165. return value
  166. }
  167. panic("Key \"" + key + "\" does not exist")
  168. }
  169. // IsClosed returns the status of the connection.
  170. func (s *Session) IsClosed() bool {
  171. return s.closed()
  172. }