websocket 增加多分组 fork https://github.com/olahol/melody
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
2.9 KiB

10 years ago
10 years ago
5 years ago
5 years ago
5 years ago
5 years ago
10 years ago
5 years ago
5 years ago
5 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
10 years ago
  1. package melody
  2. import (
  3. "sync"
  4. )
  5. type hub struct {
  6. sessions map[*Session]bool
  7. broadcast chan *envelope
  8. register chan *Session
  9. unregister chan *Session
  10. exit chan *envelope
  11. open bool
  12. rwmutex *sync.RWMutex
  13. // extends
  14. channels map[string]*Channel
  15. subscribe chan *Session
  16. publish chan *envelope
  17. unsubscribe chan *Session
  18. }
  19. func newHub() *hub {
  20. return &hub{
  21. sessions: make(map[*Session]bool),
  22. broadcast: make(chan *envelope),
  23. register: make(chan *Session),
  24. unregister: make(chan *Session),
  25. exit: make(chan *envelope),
  26. open: true,
  27. rwmutex: &sync.RWMutex{},
  28. // extends
  29. channels: make(map[string]*Channel),
  30. subscribe: make(chan *Session),
  31. publish: make(chan *envelope),
  32. unsubscribe: make(chan *Session),
  33. }
  34. }
  35. func (h *hub) run() {
  36. loop:
  37. for {
  38. select {
  39. case s := <-h.register:
  40. h.rwmutex.Lock()
  41. h.sessions[s] = true
  42. h.rwmutex.Unlock()
  43. case s := <-h.unregister:
  44. if _, ok := h.sessions[s]; ok {
  45. h.rwmutex.Lock()
  46. delete(h.sessions, s)
  47. h.rwmutex.Unlock()
  48. }
  49. case m := <-h.broadcast:
  50. h.rwmutex.RLock()
  51. for s := range h.sessions {
  52. if m.filter != nil {
  53. if m.filter(s) {
  54. s.writeMessage(m)
  55. }
  56. } else {
  57. s.writeMessage(m)
  58. }
  59. }
  60. h.rwmutex.RUnlock()
  61. case m := <-h.exit:
  62. h.rwmutex.Lock()
  63. for s := range h.sessions {
  64. s.writeMessage(m)
  65. delete(h.sessions, s)
  66. s.Close()
  67. }
  68. h.open = false
  69. h.rwmutex.Unlock()
  70. break loop
  71. case s := <-h.subscribe:
  72. // extends
  73. h.rwmutex.Lock()
  74. if _, ok := h.channels[s.channel]; !ok {
  75. h.channels[s.channel] = &Channel{
  76. session: s,
  77. online: 1,
  78. name: s.channel,
  79. }
  80. } else {
  81. h.channels[s.channel].session.prev = s // 原来的上一个指向现在的
  82. s.next = h.channels[s.channel].session // 现在的下一个是原来的
  83. s.prev = nil // 成为队头
  84. h.channels[s.channel].session = s // 现在的替代原来的位置
  85. h.channels[s.channel].online++ // 增加人数
  86. }
  87. h.rwmutex.Unlock()
  88. case s := <-h.unsubscribe:
  89. if _, ok := h.channels[s.channel]; ok {
  90. h.rwmutex.Lock()
  91. if s.next != nil {
  92. s.next.prev = s.prev
  93. }
  94. if s.prev != nil {
  95. s.prev.next = s.next
  96. } else {
  97. h.channels[s.channel].session = s.next
  98. }
  99. h.channels[s.channel].online--
  100. s.channel = "" // 置空
  101. h.rwmutex.Unlock()
  102. }
  103. case m := <-h.publish:
  104. h.rwmutex.RLock()
  105. if _, ok := h.channels[m.c]; ok {
  106. for s := h.channels[m.c].session; s != nil; s = s.next {
  107. if m.filter != nil {
  108. if m.filter(s) {
  109. s.writeMessage(m)
  110. }
  111. } else {
  112. s.writeMessage(m)
  113. }
  114. }
  115. }
  116. h.rwmutex.RUnlock()
  117. }
  118. }
  119. }
  120. func (h *hub) closed() bool {
  121. h.rwmutex.RLock()
  122. defer h.rwmutex.RUnlock()
  123. return !h.open
  124. }
  125. func (h *hub) len() int {
  126. h.rwmutex.RLock()
  127. defer h.rwmutex.RUnlock()
  128. return len(h.sessions)
  129. }