websocket 增加多分组 fork https://github.com/olahol/melody
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

219 lines
4.4 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
8 years ago
10 years ago
10 years ago
9 years ago
8 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
8 years ago
  1. package melody
  2. import (
  3. "errors"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. )
  9. // Session wrapper around websocket connections.
  10. type Session struct {
  11. Request *http.Request
  12. Keys map[string]interface{}
  13. conn *websocket.Conn
  14. output chan *envelope
  15. melody *Melody
  16. open bool
  17. rwmutex *sync.RWMutex
  18. }
  19. func (s *Session) writeMessage(message *envelope) {
  20. if s.closed() {
  21. s.melody.errorHandler(s, errors.New("tried to write to closed a session"))
  22. return
  23. }
  24. select {
  25. case s.output <- message:
  26. default:
  27. s.melody.errorHandler(s, errors.New("session message buffer is full"))
  28. }
  29. }
  30. func (s *Session) writeRaw(message *envelope) error {
  31. if s.closed() {
  32. return errors.New("tried to write to a closed session")
  33. }
  34. s.conn.SetWriteDeadline(time.Now().Add(s.melody.Config.WriteWait))
  35. err := s.conn.WriteMessage(message.t, message.msg)
  36. if err != nil {
  37. return err
  38. }
  39. return nil
  40. }
  41. func (s *Session) closed() bool {
  42. s.rwmutex.RLock()
  43. defer s.rwmutex.RUnlock()
  44. return !s.open
  45. }
  46. func (s *Session) close() {
  47. if !s.closed() {
  48. s.rwmutex.Lock()
  49. s.open = false
  50. s.conn.Close()
  51. close(s.output)
  52. s.rwmutex.Unlock()
  53. }
  54. }
  55. func (s *Session) ping() {
  56. s.writeRaw(&envelope{t: websocket.PingMessage, msg: []byte{}})
  57. }
  58. func (s *Session) writePump() {
  59. ticker := time.NewTicker(s.melody.Config.PingPeriod)
  60. defer ticker.Stop()
  61. loop:
  62. for {
  63. select {
  64. case msg, ok := <-s.output:
  65. if !ok {
  66. break loop
  67. }
  68. err := s.writeRaw(msg)
  69. if err != nil {
  70. s.melody.errorHandler(s, err)
  71. break loop
  72. }
  73. if msg.t == websocket.CloseMessage {
  74. break loop
  75. }
  76. if msg.t == websocket.TextMessage {
  77. s.melody.messageSentHandler(s, msg.msg)
  78. }
  79. if msg.t == websocket.BinaryMessage {
  80. s.melody.messageSentHandlerBinary(s, msg.msg)
  81. }
  82. case <-ticker.C:
  83. s.ping()
  84. }
  85. }
  86. }
  87. func (s *Session) readPump() {
  88. s.conn.SetReadLimit(s.melody.Config.MaxMessageSize)
  89. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  90. s.conn.SetPongHandler(func(string) error {
  91. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  92. s.melody.pongHandler(s)
  93. return nil
  94. })
  95. if s.melody.closeHandler != nil {
  96. s.conn.SetCloseHandler(func(code int, text string) error {
  97. return s.melody.closeHandler(s, code, text)
  98. })
  99. }
  100. for {
  101. t, message, err := s.conn.ReadMessage()
  102. if err != nil {
  103. s.melody.errorHandler(s, err)
  104. break
  105. }
  106. if t == websocket.TextMessage {
  107. s.melody.messageHandler(s, message)
  108. }
  109. if t == websocket.BinaryMessage {
  110. s.melody.messageHandlerBinary(s, message)
  111. }
  112. }
  113. }
  114. // Write writes message to session.
  115. func (s *Session) Write(msg []byte) error {
  116. if s.closed() {
  117. return errors.New("session is closed")
  118. }
  119. s.writeMessage(&envelope{t: websocket.TextMessage, msg: msg})
  120. return nil
  121. }
  122. // WriteBinary writes a binary message to session.
  123. func (s *Session) WriteBinary(msg []byte) error {
  124. if s.closed() {
  125. return errors.New("session is closed")
  126. }
  127. s.writeMessage(&envelope{t: websocket.BinaryMessage, msg: msg})
  128. return nil
  129. }
  130. // Close closes session.
  131. func (s *Session) Close() error {
  132. if s.closed() {
  133. return errors.New("session is already closed")
  134. }
  135. s.writeMessage(&envelope{t: websocket.CloseMessage, msg: []byte{}})
  136. return nil
  137. }
  138. // CloseWithMsg closes the session with the provided payload.
  139. // Use the FormatCloseMessage function to format a proper close message payload.
  140. func (s *Session) CloseWithMsg(msg []byte) error {
  141. if s.closed() {
  142. return errors.New("session is already closed")
  143. }
  144. s.writeMessage(&envelope{t: websocket.CloseMessage, msg: msg})
  145. return nil
  146. }
  147. // Set is used to store a new key/value pair exclusivelly for this session.
  148. // It also lazy initializes s.Keys if it was not used previously.
  149. func (s *Session) Set(key string, value interface{}) {
  150. if s.Keys == nil {
  151. s.Keys = make(map[string]interface{})
  152. }
  153. s.Keys[key] = value
  154. }
  155. // Get returns the value for the given key, ie: (value, true).
  156. // If the value does not exists it returns (nil, false)
  157. func (s *Session) Get(key string) (value interface{}, exists bool) {
  158. if s.Keys != nil {
  159. value, exists = s.Keys[key]
  160. }
  161. return
  162. }
  163. // MustGet returns the value for the given key if it exists, otherwise it panics.
  164. func (s *Session) MustGet(key string) interface{} {
  165. if value, exists := s.Get(key); exists {
  166. return value
  167. }
  168. panic("Key \"" + key + "\" does not exist")
  169. }
  170. // IsClosed returns the status of the connection.
  171. func (s *Session) IsClosed() bool {
  172. return s.closed()
  173. }