websocket 增加多分组 fork https://github.com/olahol/melody
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

302 lines
6.3 KiB

10 years ago
10 years ago
10 years ago
10 years ago
5 years ago
10 years ago
8 years ago
8 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
8 years ago
10 years ago
10 years ago
9 years ago
8 years ago
10 years ago
10 years ago
8 years ago
10 years ago
10 years ago
8 years ago
5 years ago
5 years ago
5 years ago
  1. package melody
  2. import (
  3. "errors"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. )
  9. // Session wrapper around websocket connections.
  10. type Session struct {
  11. Request *http.Request
  12. Keys map[string]interface{}
  13. conn *websocket.Conn
  14. output chan *envelope
  15. melody *Melody
  16. open bool
  17. rwmutex *sync.RWMutex
  18. // extends
  19. channel string
  20. next *Session
  21. prev *Session
  22. }
  23. func (s *Session) writeMessage(message *envelope) {
  24. if s.closed() {
  25. s.melody.errorHandler(s, errors.New("tried to write to closed a session"))
  26. return
  27. }
  28. select {
  29. case s.output <- message:
  30. default:
  31. s.melody.errorHandler(s, errors.New("session message buffer is full"))
  32. }
  33. }
  34. func (s *Session) writeRaw(message *envelope) error {
  35. if s.closed() {
  36. return errors.New("tried to write to a closed session")
  37. }
  38. s.conn.SetWriteDeadline(time.Now().Add(s.melody.Config.WriteWait))
  39. err := s.conn.WriteMessage(message.t, message.msg)
  40. if err != nil {
  41. return err
  42. }
  43. return nil
  44. }
  45. func (s *Session) closed() bool {
  46. s.rwmutex.RLock()
  47. defer s.rwmutex.RUnlock()
  48. return !s.open
  49. }
  50. func (s *Session) close() {
  51. if !s.closed() {
  52. s.rwmutex.Lock()
  53. s.open = false
  54. s.conn.Close()
  55. close(s.output)
  56. s.rwmutex.Unlock()
  57. }
  58. }
  59. func (s *Session) ping() {
  60. s.writeRaw(&envelope{t: websocket.PingMessage, msg: []byte{}})
  61. }
  62. func (s *Session) writePump() {
  63. ticker := time.NewTicker(s.melody.Config.PingPeriod)
  64. defer ticker.Stop()
  65. loop:
  66. for {
  67. select {
  68. case msg, ok := <-s.output:
  69. if !ok {
  70. break loop
  71. }
  72. err := s.writeRaw(msg)
  73. if err != nil {
  74. s.melody.errorHandler(s, err)
  75. break loop
  76. }
  77. if msg.t == websocket.CloseMessage {
  78. break loop
  79. }
  80. if msg.t == websocket.TextMessage {
  81. s.melody.messageSentHandler(s, msg.msg)
  82. }
  83. if msg.t == websocket.BinaryMessage {
  84. s.melody.messageSentHandlerBinary(s, msg.msg)
  85. }
  86. case <-ticker.C:
  87. s.ping()
  88. }
  89. }
  90. }
  91. func (s *Session) readPump() {
  92. s.conn.SetReadLimit(s.melody.Config.MaxMessageSize)
  93. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  94. s.conn.SetPongHandler(func(string) error {
  95. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  96. s.melody.pongHandler(s)
  97. return nil
  98. })
  99. if s.melody.closeHandler != nil {
  100. s.conn.SetCloseHandler(func(code int, text string) error {
  101. return s.melody.closeHandler(s, code, text)
  102. })
  103. }
  104. for {
  105. t, message, err := s.conn.ReadMessage()
  106. if err != nil {
  107. s.melody.errorHandler(s, err)
  108. break
  109. }
  110. if t == websocket.TextMessage {
  111. s.melody.messageHandler(s, message)
  112. }
  113. if t == websocket.BinaryMessage {
  114. s.melody.messageHandlerBinary(s, message)
  115. }
  116. }
  117. }
  118. // Write writes message to session.
  119. func (s *Session) Write(msg []byte) error {
  120. if s.closed() {
  121. return errors.New("session is closed")
  122. }
  123. s.writeMessage(&envelope{t: websocket.TextMessage, msg: msg})
  124. return nil
  125. }
  126. // WriteBinary writes a binary message to session.
  127. func (s *Session) WriteBinary(msg []byte) error {
  128. if s.closed() {
  129. return errors.New("session is closed")
  130. }
  131. s.writeMessage(&envelope{t: websocket.BinaryMessage, msg: msg})
  132. return nil
  133. }
  134. // Close closes session.
  135. func (s *Session) Close() error {
  136. if s.closed() {
  137. return errors.New("session is already closed")
  138. }
  139. s.writeMessage(&envelope{t: websocket.CloseMessage, msg: []byte{}})
  140. return nil
  141. }
  142. // CloseWithMsg closes the session with the provided payload.
  143. // Use the FormatCloseMessage function to format a proper close message payload.
  144. func (s *Session) CloseWithMsg(msg []byte) error {
  145. if s.closed() {
  146. return errors.New("session is already closed")
  147. }
  148. s.writeMessage(&envelope{t: websocket.CloseMessage, msg: msg})
  149. return nil
  150. }
  151. // Set is used to store a new key/value pair exclusivelly for this session.
  152. // It also lazy initializes s.Keys if it was not used previously.
  153. func (s *Session) Set(key string, value interface{}) {
  154. if s.Keys == nil {
  155. s.Keys = make(map[string]interface{})
  156. }
  157. s.Keys[key] = value
  158. }
  159. // Get returns the value for the given key, ie: (value, true).
  160. // If the value does not exists it returns (nil, false)
  161. func (s *Session) Get(key string) (value interface{}, exists bool) {
  162. if s.Keys != nil {
  163. value, exists = s.Keys[key]
  164. }
  165. return
  166. }
  167. // MustGet returns the value for the given key if it exists, otherwise it panics.
  168. func (s *Session) MustGet(key string) interface{} {
  169. if value, exists := s.Get(key); exists {
  170. return value
  171. }
  172. panic("Key \"" + key + "\" does not exist")
  173. }
  174. // IsClosed returns the status of the connection.
  175. func (s *Session) IsClosed() bool {
  176. return s.closed()
  177. }
  178. // extends
  179. func (s *Session) Subscribe(c string) error {
  180. //
  181. if s.closed() {
  182. return errors.New("tried to write to closed a session")
  183. }
  184. if s.channel != "" && s.channel != c { // 存在订阅 并与接下来的channel无法共存
  185. return errors.New("session already subscribe channel")
  186. } else {
  187. s.channel = c
  188. s.melody.hub.subscribe <- s
  189. }
  190. return nil
  191. }
  192. func (s *Session) Unsubscribe() error {
  193. if s.closed() {
  194. return errors.New("tried to write to closed a session")
  195. }
  196. if s.channel == "" {
  197. return errors.New("session not yet subscribe channel")
  198. } else {
  199. s.melody.hub.unsubscribe <- s
  200. }
  201. return nil
  202. }
  203. func (s *Session) Publish(msg []byte) error {
  204. return s.PublishFilter(msg, nil)
  205. }
  206. func (s *Session) PublishFilter(msg []byte, fn func(*Session) bool) error {
  207. if s.melody.hub.closed() {
  208. return errors.New("melody instance is closed")
  209. }
  210. message := &envelope{t: websocket.TextMessage, msg: msg, c: s.channel, filter: fn}
  211. s.melody.hub.publish <- message
  212. return nil
  213. }
  214. func (s *Session) PublishOthers(msg []byte) error {
  215. return s.PublishFilter(msg, func(session *Session) bool {
  216. return s != session
  217. })
  218. }
  219. func (s *Session) PublishBinary(msg []byte) error {
  220. return s.PublishBinaryFilter(msg, nil)
  221. }
  222. func (s *Session) PublishBinaryOthers(msg []byte) error {
  223. return s.PublishBinaryFilter(msg, func(session *Session) bool {
  224. return s != session
  225. })
  226. }
  227. func (s *Session) PublishBinaryFilter(msg []byte, fn func(*Session) bool) error {
  228. if s.melody.hub.closed() {
  229. return errors.New("melody instance is closed")
  230. }
  231. message := &envelope{t: websocket.BinaryMessage, msg: msg, c: s.channel, filter: fn}
  232. s.melody.hub.publish <- message
  233. return nil
  234. }
  235. func (s *Session) IsSubscribed() bool {
  236. return s.channel != ""
  237. }
  238. func (s *Session) Channel() *Channel {
  239. if c, ok := s.melody.hub.channels[s.channel]; ok {
  240. return c
  241. } else {
  242. return nil
  243. }
  244. }