websocket 增加多分组 fork https://github.com/olahol/melody
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

188 lines
3.6 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package melody
  2. import (
  3. "errors"
  4. "github.com/gorilla/websocket"
  5. "net/http"
  6. "sync"
  7. "time"
  8. )
  9. // Session wrapper around websocket connections.
  10. type Session struct {
  11. Request *http.Request
  12. Keys map[string]interface{}
  13. conn *websocket.Conn
  14. output chan *envelope
  15. melody *Melody
  16. open bool
  17. rwmutex *sync.RWMutex
  18. }
  19. func (s *Session) writeMessage(message *envelope) {
  20. if s.closed() {
  21. s.melody.errorHandler(s, errors.New("Tried to write to closed a session."))
  22. return
  23. }
  24. select {
  25. case s.output <- message:
  26. default:
  27. s.melody.errorHandler(s, errors.New("Session message buffer is full."))
  28. }
  29. }
  30. func (s *Session) writeRaw(message *envelope) error {
  31. if s.closed() {
  32. return errors.New("Trie to write to a closed session.")
  33. }
  34. s.conn.SetWriteDeadline(time.Now().Add(s.melody.Config.WriteWait))
  35. err := s.conn.WriteMessage(message.t, message.msg)
  36. if err != nil {
  37. return err
  38. }
  39. return nil
  40. }
  41. func (s *Session) closed() bool {
  42. s.rwmutex.RLock()
  43. defer s.rwmutex.RUnlock()
  44. return !s.open
  45. }
  46. func (s *Session) close() {
  47. if !s.closed() {
  48. s.rwmutex.Lock()
  49. s.open = false
  50. s.conn.Close()
  51. close(s.output)
  52. s.rwmutex.Unlock()
  53. }
  54. }
  55. func (s *Session) ping() {
  56. s.writeRaw(&envelope{t: websocket.PingMessage, msg: []byte{}})
  57. }
  58. func (s *Session) writePump() {
  59. ticker := time.NewTicker(s.melody.Config.PingPeriod)
  60. defer ticker.Stop()
  61. loop:
  62. for {
  63. select {
  64. case msg, ok := <-s.output:
  65. if !ok {
  66. break loop
  67. }
  68. err := s.writeRaw(msg)
  69. if err != nil {
  70. s.melody.errorHandler(s, err)
  71. break loop
  72. }
  73. if msg.t == websocket.CloseMessage {
  74. break loop
  75. }
  76. case <-ticker.C:
  77. s.ping()
  78. }
  79. }
  80. }
  81. func (s *Session) readPump() {
  82. s.conn.SetReadLimit(s.melody.Config.MaxMessageSize)
  83. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  84. s.conn.SetPongHandler(func(string) error {
  85. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  86. s.melody.pongHandler(s)
  87. return nil
  88. })
  89. for {
  90. t, message, err := s.conn.ReadMessage()
  91. if err != nil {
  92. s.melody.errorHandler(s, err)
  93. break
  94. }
  95. if t == websocket.TextMessage {
  96. s.melody.messageHandler(s, message)
  97. }
  98. if t == websocket.BinaryMessage {
  99. s.melody.messageHandlerBinary(s, message)
  100. }
  101. }
  102. }
  103. // Write writes message to session.
  104. func (s *Session) Write(msg []byte) error {
  105. if s.closed() {
  106. return errors.New("Session is closed.")
  107. }
  108. s.writeMessage(&envelope{t: websocket.TextMessage, msg: msg})
  109. return nil
  110. }
  111. // WriteBinary writes a binary message to session.
  112. func (s *Session) WriteBinary(msg []byte) error {
  113. if s.closed() {
  114. return errors.New("Session is closed.")
  115. }
  116. s.writeMessage(&envelope{t: websocket.BinaryMessage, msg: msg})
  117. return nil
  118. }
  119. // Close closes session.
  120. func (s *Session) Close() error {
  121. if s.closed() {
  122. return errors.New("Session is already closed.")
  123. }
  124. s.writeMessage(&envelope{t: websocket.CloseMessage, msg: []byte{}})
  125. return nil
  126. }
  127. // Set is used to store a new key/value pair exclusivelly for this session.
  128. // It also lazy initializes s.Keys if it was not used previously.
  129. func (s *Session) Set(key string, value interface{}) {
  130. if s.Keys == nil {
  131. s.Keys = make(map[string]interface{})
  132. }
  133. s.Keys[key] = value
  134. }
  135. // Get returns the value for the given key, ie: (value, true).
  136. // If the value does not exists it returns (nil, false)
  137. func (s *Session) Get(key string) (value interface{}, exists bool) {
  138. if s.Keys != nil {
  139. value, exists = s.Keys[key]
  140. }
  141. return
  142. }
  143. // MustGet returns the value for the given key if it exists, otherwise it panics.
  144. func (s *Session) MustGet(key string) interface{} {
  145. if value, exists := s.Get(key); exists {
  146. return value
  147. }
  148. panic("Key \"" + key + "\" does not exist")
  149. }