From 1223dc084d44e45d2d27db3f0e75aa01c65a8844 Mon Sep 17 00:00:00 2001 From: tommy <3405129587@qq.com> Date: Fri, 6 Mar 2020 20:30:15 +0800 Subject: [PATCH] update --- channel.go | 12 +++++++++++- examples/channel/main.go | 14 ++++++++++++-- hub.go | 25 ++++++++++++++++--------- session.go | 8 ++++++-- 4 files changed, 45 insertions(+), 14 deletions(-) diff --git a/channel.go b/channel.go index d9e4a3e..b224eab 100644 --- a/channel.go +++ b/channel.go @@ -1,5 +1,15 @@ package melody type Channel struct { + name string + online int + session *Session +} -} \ No newline at end of file +func (c *Channel) Online() int { + return c.online +} + +func (c *Channel) Name() string { + return c.name +} diff --git a/examples/channel/main.go b/examples/channel/main.go index 9de8265..eb592de 100644 --- a/examples/channel/main.go +++ b/examples/channel/main.go @@ -21,15 +21,25 @@ func main() { if err := session.Subscribe(ch); err != nil { log.Println(err) } + session.Set("channel", ch) }) m.HandleMessage(func(session *melody.Session, msg []byte) { - if err := session.Publish(msg); err != nil { - log.Println(err) + channel, ok1 := session.Get("channel") + if ok1 { + _ = m.BroadcastFilter(msg, func(s *melody.Session) bool { + ch, ok2 := s.Get("channel") + if ok2 && ch.(string) == channel.(string) { + return true + } + return false + }) } + }) m.HandleSentMessage(func(session *melody.Session, bytes []byte) { + log.Printf("%+v", session.Channel().Online()) log.Printf("%+v", string(bytes)) }) router.Run(":8080") diff --git a/hub.go b/hub.go index ea2d194..039d212 100644 --- a/hub.go +++ b/hub.go @@ -12,8 +12,9 @@ type hub struct { exit chan *envelope open bool rwmutex *sync.RWMutex + // extends - channels map[string]*Session + channels map[string]*Channel subscribe chan *Session publish chan *envelope unsubscribe chan *Session @@ -29,7 +30,7 @@ func newHub() *hub { open: true, rwmutex: &sync.RWMutex{}, // extends - channels: make(map[string]*Session), + channels: make(map[string]*Channel), subscribe: make(chan *Session), publish: make(chan *envelope), unsubscribe: make(chan *Session), @@ -77,12 +78,17 @@ loop: // extends h.rwmutex.Lock() if _, ok := h.channels[s.channel]; !ok { - h.channels[s.channel] = s + h.channels[s.channel] = &Channel{ + session: s, + online: 1, + name: s.channel, + } } else { - h.channels[s.channel].prev = s // 原来的上一个指向现在的 - s.next = h.channels[s.channel] // 现在的下一个是原来的 - s.prev = nil // 成为队头 - h.channels[s.channel] = s // 现在的替代原来的位置 + h.channels[s.channel].session.prev = s // 原来的上一个指向现在的 + s.next = h.channels[s.channel].session // 现在的下一个是原来的 + s.prev = nil // 成为队头 + h.channels[s.channel].session = s // 现在的替代原来的位置 + h.channels[s.channel].online++ // 增加人数 } h.rwmutex.Unlock() case s := <-h.unsubscribe: @@ -94,15 +100,16 @@ loop: if s.prev != nil { s.prev.next = s.next } else { - h.channels[s.channel] = s.next + h.channels[s.channel].session = s.next } + h.channels[s.channel].online-- s.channel = "" // 置空 h.rwmutex.Unlock() } case m := <-h.publish: h.rwmutex.RLock() if _, ok := h.channels[m.c]; ok { - for s := h.channels[m.c]; s != nil; s = s.next { + for s := h.channels[m.c].session; s != nil; s = s.next { if m.filter != nil { if m.filter(s) { s.writeMessage(m) diff --git a/session.go b/session.go index 7ed6a1c..42f2662 100644 --- a/session.go +++ b/session.go @@ -293,6 +293,10 @@ func (s *Session) IsSubscribed() bool { return s.channel != "" } -func (s *Session) SubscribeName() string { - return s.channel +func (s *Session) Channel() *Channel { + if c, ok := s.melody.hub.channels[s.channel]; ok { + return c + } else { + return nil + } }