websocket 增加多分组 fork https://github.com/olahol/melody
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

213 lines
4.2 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
8 years ago
8 years ago
10 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
8 years ago
10 years ago
10 years ago
9 years ago
8 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
8 years ago
  1. package melody
  2. import (
  3. "errors"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. )
  9. // Session wrapper around websocket connections.
  10. type Session struct {
  11. Request *http.Request
  12. Keys map[string]interface{}
  13. conn *websocket.Conn
  14. output chan *envelope
  15. melody *Melody
  16. open bool
  17. rwmutex *sync.RWMutex
  18. }
  19. func (s *Session) writeMessage(message *envelope) {
  20. if s.closed() {
  21. s.melody.errorHandler(s, errors.New("tried to write to closed a session"))
  22. return
  23. }
  24. select {
  25. case s.output <- message:
  26. default:
  27. s.melody.errorHandler(s, errors.New("session message buffer is full"))
  28. }
  29. }
  30. func (s *Session) writeRaw(message *envelope) error {
  31. if s.closed() {
  32. return errors.New("trie to write to a closed session")
  33. }
  34. s.conn.SetWriteDeadline(time.Now().Add(s.melody.Config.WriteWait))
  35. err := s.conn.WriteMessage(message.t, message.msg)
  36. if err != nil {
  37. return err
  38. }
  39. return nil
  40. }
  41. func (s *Session) closed() bool {
  42. s.rwmutex.RLock()
  43. defer s.rwmutex.RUnlock()
  44. return !s.open
  45. }
  46. func (s *Session) close() {
  47. if !s.closed() {
  48. s.rwmutex.Lock()
  49. s.open = false
  50. s.conn.Close()
  51. close(s.output)
  52. s.rwmutex.Unlock()
  53. }
  54. }
  55. func (s *Session) ping() {
  56. s.writeRaw(&envelope{t: websocket.PingMessage, msg: []byte{}})
  57. }
  58. func (s *Session) writePump() {
  59. ticker := time.NewTicker(s.melody.Config.PingPeriod)
  60. defer ticker.Stop()
  61. loop:
  62. for {
  63. select {
  64. case msg, ok := <-s.output:
  65. if !ok {
  66. break loop
  67. }
  68. err := s.writeRaw(msg)
  69. if err != nil {
  70. s.melody.errorHandler(s, err)
  71. break loop
  72. }
  73. if msg.t == websocket.CloseMessage {
  74. break loop
  75. }
  76. if msg.t == websocket.TextMessage {
  77. s.melody.messageSentHandler(s, msg.msg)
  78. }
  79. if msg.t == websocket.BinaryMessage {
  80. s.melody.messageSentHandlerBinary(s, msg.msg)
  81. }
  82. case <-ticker.C:
  83. s.ping()
  84. }
  85. }
  86. }
  87. func (s *Session) readPump() {
  88. s.conn.SetReadLimit(s.melody.Config.MaxMessageSize)
  89. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  90. s.conn.SetPongHandler(func(string) error {
  91. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  92. s.melody.pongHandler(s)
  93. return nil
  94. })
  95. s.conn.SetCloseHandler(func(code int, text string) error {
  96. s.melody.closeHandler(s, code, text)
  97. return nil
  98. })
  99. for {
  100. t, message, err := s.conn.ReadMessage()
  101. if err != nil {
  102. s.melody.errorHandler(s, err)
  103. break
  104. }
  105. if t == websocket.TextMessage {
  106. s.melody.messageHandler(s, message)
  107. }
  108. if t == websocket.BinaryMessage {
  109. s.melody.messageHandlerBinary(s, message)
  110. }
  111. }
  112. }
  113. // Write writes message to session.
  114. func (s *Session) Write(msg []byte) error {
  115. if s.closed() {
  116. return errors.New("session is closed")
  117. }
  118. s.writeMessage(&envelope{t: websocket.TextMessage, msg: msg})
  119. return nil
  120. }
  121. // WriteBinary writes a binary message to session.
  122. func (s *Session) WriteBinary(msg []byte) error {
  123. if s.closed() {
  124. return errors.New("session is closed")
  125. }
  126. s.writeMessage(&envelope{t: websocket.BinaryMessage, msg: msg})
  127. return nil
  128. }
  129. // Close closes session.
  130. func (s *Session) Close() error {
  131. if s.closed() {
  132. return errors.New("session is already closed")
  133. }
  134. s.writeMessage(&envelope{t: websocket.CloseMessage, msg: []byte{}})
  135. return nil
  136. }
  137. // CloseWithMsg closes the session with the provided payload.
  138. // Use the FormatCloseMessage function to format a proper close message payload.
  139. func (s *Session) CloseWithMsg(msg []byte) error {
  140. if s.closed() {
  141. return errors.New("session is already closed")
  142. }
  143. s.writeMessage(&envelope{t: websocket.CloseMessage, msg: msg})
  144. return nil
  145. }
  146. // Set is used to store a new key/value pair exclusivelly for this session.
  147. // It also lazy initializes s.Keys if it was not used previously.
  148. func (s *Session) Set(key string, value interface{}) {
  149. if s.Keys == nil {
  150. s.Keys = make(map[string]interface{})
  151. }
  152. s.Keys[key] = value
  153. }
  154. // Get returns the value for the given key, ie: (value, true).
  155. // If the value does not exists it returns (nil, false)
  156. func (s *Session) Get(key string) (value interface{}, exists bool) {
  157. if s.Keys != nil {
  158. value, exists = s.Keys[key]
  159. }
  160. return
  161. }
  162. // MustGet returns the value for the given key if it exists, otherwise it panics.
  163. func (s *Session) MustGet(key string) interface{} {
  164. if value, exists := s.Get(key); exists {
  165. return value
  166. }
  167. panic("Key \"" + key + "\" does not exist")
  168. }