websocket 增加多分组 fork https://github.com/olahol/melody
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

138 lines
2.9 KiB

package melody
import (
"sync"
)
type hub struct {
sessions map[*Session]bool
broadcast chan *envelope
register chan *Session
unregister chan *Session
exit chan *envelope
open bool
rwmutex *sync.RWMutex
// extends
channels map[string]*Channel
subscribe chan *Session
publish chan *envelope
unsubscribe chan *Session
}
func newHub() *hub {
return &hub{
sessions: make(map[*Session]bool),
broadcast: make(chan *envelope),
register: make(chan *Session),
unregister: make(chan *Session),
exit: make(chan *envelope),
open: true,
rwmutex: &sync.RWMutex{},
// extends
channels: make(map[string]*Channel),
subscribe: make(chan *Session),
publish: make(chan *envelope),
unsubscribe: make(chan *Session),
}
}
func (h *hub) run() {
loop:
for {
select {
case s := <-h.register:
h.rwmutex.Lock()
h.sessions[s] = true
h.rwmutex.Unlock()
case s := <-h.unregister:
if _, ok := h.sessions[s]; ok {
h.rwmutex.Lock()
delete(h.sessions, s)
h.rwmutex.Unlock()
}
case m := <-h.broadcast:
h.rwmutex.RLock()
for s := range h.sessions {
if m.filter != nil {
if m.filter(s) {
s.writeMessage(m)
}
} else {
s.writeMessage(m)
}
}
h.rwmutex.RUnlock()
case m := <-h.exit:
h.rwmutex.Lock()
for s := range h.sessions {
s.writeMessage(m)
delete(h.sessions, s)
s.Close()
}
h.open = false
h.rwmutex.Unlock()
break loop
case s := <-h.subscribe:
// extends
h.rwmutex.Lock()
if _, ok := h.channels[s.channel]; !ok {
h.channels[s.channel] = &Channel{
session: s,
online: 1,
name: s.channel,
}
} else {
h.channels[s.channel].session.prev = s // 原来的上一个指向现在的
s.next = h.channels[s.channel].session // 现在的下一个是原来的
s.prev = nil // 成为队头
h.channels[s.channel].session = s // 现在的替代原来的位置
h.channels[s.channel].online++ // 增加人数
}
h.rwmutex.Unlock()
case s := <-h.unsubscribe:
if _, ok := h.channels[s.channel]; ok {
h.rwmutex.Lock()
if s.next != nil {
s.next.prev = s.prev
}
if s.prev != nil {
s.prev.next = s.next
} else {
h.channels[s.channel].session = s.next
}
h.channels[s.channel].online--
s.channel = "" // 置空
h.rwmutex.Unlock()
}
case m := <-h.publish:
h.rwmutex.RLock()
if _, ok := h.channels[m.c]; ok {
for s := h.channels[m.c].session; s != nil; s = s.next {
if m.filter != nil {
if m.filter(s) {
s.writeMessage(m)
}
} else {
s.writeMessage(m)
}
}
}
h.rwmutex.RUnlock()
}
}
}
func (h *hub) closed() bool {
h.rwmutex.RLock()
defer h.rwmutex.RUnlock()
return !h.open
}
func (h *hub) len() int {
h.rwmutex.RLock()
defer h.rwmutex.RUnlock()
return len(h.sessions)
}