websocket 增加多分组 fork https://github.com/olahol/melody
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

160 lines
3.2 KiB

10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
9 years ago
10 years ago
9 years ago
10 years ago
9 years ago
10 years ago
  1. package melody
  2. import (
  3. "errors"
  4. "net/http"
  5. "sync"
  6. "time"
  7. "github.com/gorilla/websocket"
  8. )
  9. // Session is wrapper around websocket connections.
  10. type Session struct {
  11. Request *http.Request
  12. Keys map[string]interface{}
  13. conn *websocket.Conn
  14. output chan *envelope
  15. melody *Melody
  16. lock *sync.Mutex
  17. }
  18. func (s *Session) writeMessage(message *envelope) {
  19. select {
  20. case s.output <- message:
  21. default:
  22. s.melody.errorHandler(s, errors.New("Message buffer full"))
  23. }
  24. }
  25. func (s *Session) writeRaw(message *envelope) error {
  26. s.conn.SetWriteDeadline(time.Now().Add(s.melody.Config.WriteWait))
  27. err := s.conn.WriteMessage(message.t, message.msg)
  28. if err != nil {
  29. return err
  30. }
  31. if message.t == websocket.CloseMessage {
  32. err := s.conn.Close()
  33. if err != nil {
  34. return err
  35. }
  36. }
  37. return nil
  38. }
  39. func (s *Session) close() {
  40. s.writeRaw(&envelope{t: websocket.CloseMessage, msg: []byte{}})
  41. }
  42. func (s *Session) ping() {
  43. s.writeRaw(&envelope{t: websocket.PingMessage, msg: []byte{}})
  44. }
  45. func (s *Session) writePump() {
  46. defer s.conn.Close()
  47. ticker := time.NewTicker(s.melody.Config.PingPeriod)
  48. defer ticker.Stop()
  49. loop:
  50. for {
  51. select {
  52. case msg, ok := <-s.output:
  53. if !ok {
  54. s.close()
  55. break loop
  56. }
  57. if err := s.writeRaw(msg); err != nil {
  58. s.melody.errorHandler(s, err)
  59. break loop
  60. }
  61. s.melody.messageSentHandler(s, msg.msg)
  62. case <-ticker.C:
  63. s.ping()
  64. }
  65. }
  66. }
  67. func (s *Session) readPump() {
  68. defer s.conn.Close()
  69. s.conn.SetReadLimit(s.melody.Config.MaxMessageSize)
  70. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  71. s.conn.SetPongHandler(func(string) error {
  72. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  73. s.melody.pongHandler(s)
  74. return nil
  75. })
  76. for {
  77. t, message, err := s.conn.ReadMessage()
  78. if err != nil {
  79. s.melody.errorHandler(s, err)
  80. break
  81. }
  82. if t == websocket.TextMessage {
  83. s.melody.messageHandler(s, message)
  84. }
  85. if t == websocket.BinaryMessage {
  86. s.melody.messageHandlerBinary(s, message)
  87. }
  88. }
  89. }
  90. // Write writes message to session.
  91. func (s *Session) Write(msg []byte) {
  92. s.writeMessage(&envelope{t: websocket.TextMessage, msg: msg})
  93. }
  94. // WriteBinary writes a binary message to session.
  95. func (s *Session) WriteBinary(msg []byte) {
  96. s.writeMessage(&envelope{t: websocket.BinaryMessage, msg: msg})
  97. }
  98. // Close closes a session.
  99. func (s *Session) Close() {
  100. s.writeMessage(&envelope{t: websocket.CloseMessage, msg: []byte{}})
  101. }
  102. // Set is used to store a new key/value pair exclusivelly for this session.
  103. // It also lazy initializes s.Keys if it was not used previously.
  104. func (s *Session) Set(key string, value interface{}) {
  105. s.lock.Lock()
  106. defer s.lock.Unlock()
  107. if s.Keys == nil {
  108. s.Keys = make(map[string]interface{})
  109. }
  110. s.Keys[key] = value
  111. }
  112. // Get returns the value for the given key, ie: (value, true).
  113. // If the value does not exists it returns (nil, false)
  114. func (s *Session) Get(key string) (value interface{}, exists bool) {
  115. s.lock.Lock()
  116. defer s.lock.Unlock()
  117. if s.Keys != nil {
  118. value, exists = s.Keys[key]
  119. }
  120. return
  121. }
  122. // MustGet returns the value for the given key if it exists, otherwise it panics.
  123. func (s *Session) MustGet(key string) interface{} {
  124. if value, exists := s.Get(key); exists {
  125. return value
  126. }
  127. panic("Key \"" + key + "\" does not exist")
  128. }