websocket 增加多分组 fork https://github.com/olahol/melody
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
2.5 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. package melody
  2. import (
  3. "errors"
  4. "github.com/gorilla/websocket"
  5. "net/http"
  6. "time"
  7. )
  8. // A melody session.
  9. type Session struct {
  10. Request *http.Request
  11. params map[string]interface{}
  12. conn *websocket.Conn
  13. output chan *envelope
  14. melody *Melody
  15. }
  16. func (s *Session) writeMessage(message *envelope) {
  17. select {
  18. case s.output <- message:
  19. default:
  20. s.melody.errorHandler(s, errors.New("Message buffer full"))
  21. }
  22. }
  23. func (s *Session) writeRaw(message *envelope) error {
  24. s.conn.SetWriteDeadline(time.Now().Add(s.melody.Config.WriteWait))
  25. err := s.conn.WriteMessage(message.t, message.msg)
  26. if err != nil {
  27. return err
  28. }
  29. if message.t == websocket.CloseMessage {
  30. err := s.conn.Close()
  31. if err != nil {
  32. return err
  33. }
  34. }
  35. return nil
  36. }
  37. func (s *Session) close() {
  38. s.writeRaw(&envelope{t: websocket.CloseMessage, msg: []byte{}})
  39. }
  40. func (s *Session) ping() {
  41. s.writeRaw(&envelope{t: websocket.PingMessage, msg: []byte{}})
  42. }
  43. func (s *Session) writePump() {
  44. defer s.conn.Close()
  45. ticker := time.NewTicker(s.melody.Config.PingPeriod)
  46. defer ticker.Stop()
  47. loop:
  48. for {
  49. select {
  50. case msg, ok := <-s.output:
  51. if !ok {
  52. s.close()
  53. break loop
  54. }
  55. if err := s.writeRaw(msg); err != nil {
  56. s.melody.errorHandler(s, err)
  57. break loop
  58. }
  59. case <-ticker.C:
  60. s.ping()
  61. }
  62. }
  63. }
  64. func (s *Session) readPump() {
  65. defer s.conn.Close()
  66. s.conn.SetReadLimit(s.melody.Config.MaxMessageSize)
  67. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  68. s.conn.SetPongHandler(func(string) error {
  69. s.conn.SetReadDeadline(time.Now().Add(s.melody.Config.PongWait))
  70. return nil
  71. })
  72. for {
  73. t, message, err := s.conn.ReadMessage()
  74. if err != nil {
  75. s.melody.errorHandler(s, err)
  76. break
  77. }
  78. if t == websocket.TextMessage {
  79. s.melody.messageHandler(s, message)
  80. }
  81. if t == websocket.BinaryMessage {
  82. s.melody.messageHandlerBinary(s, message)
  83. }
  84. }
  85. }
  86. // Write message to session.
  87. func (s *Session) Write(msg []byte) {
  88. s.writeMessage(&envelope{t: websocket.TextMessage, msg: msg})
  89. }
  90. // Write binary message to session.
  91. func (s *Session) WriteBinary(msg []byte) {
  92. s.writeMessage(&envelope{t: websocket.BinaryMessage, msg: msg})
  93. }
  94. // Close session.
  95. func (s *Session) Close() {
  96. s.writeMessage(&envelope{t: websocket.CloseMessage, msg: []byte{}})
  97. }
  98. // Set session param
  99. func (s *Session) SetParam(key string, value interface{}) {
  100. s.params[key] = value
  101. }
  102. // Get session param
  103. func (s *Session) GetParam(key string) (bool, interface{}) {
  104. return s.params[key]
  105. }