Browse Source

update

master
黄梓健 5 years ago
parent
commit
1223dc084d
  1. 10
      channel.go
  2. 14
      examples/channel/main.go
  3. 23
      hub.go
  4. 8
      session.go

10
channel.go

@ -1,5 +1,15 @@
package melody
type Channel struct {
name string
online int
session *Session
}
func (c *Channel) Online() int {
return c.online
}
func (c *Channel) Name() string {
return c.name
}

14
examples/channel/main.go

@ -21,15 +21,25 @@ func main() {
if err := session.Subscribe(ch); err != nil {
log.Println(err)
}
session.Set("channel", ch)
})
m.HandleMessage(func(session *melody.Session, msg []byte) {
if err := session.Publish(msg); err != nil {
log.Println(err)
channel, ok1 := session.Get("channel")
if ok1 {
_ = m.BroadcastFilter(msg, func(s *melody.Session) bool {
ch, ok2 := s.Get("channel")
if ok2 && ch.(string) == channel.(string) {
return true
}
return false
})
}
})
m.HandleSentMessage(func(session *melody.Session, bytes []byte) {
log.Printf("%+v", session.Channel().Online())
log.Printf("%+v", string(bytes))
})
router.Run(":8080")

23
hub.go

@ -12,8 +12,9 @@ type hub struct {
exit chan *envelope
open bool
rwmutex *sync.RWMutex
// extends
channels map[string]*Session
channels map[string]*Channel
subscribe chan *Session
publish chan *envelope
unsubscribe chan *Session
@ -29,7 +30,7 @@ func newHub() *hub {
open: true,
rwmutex: &sync.RWMutex{},
// extends
channels: make(map[string]*Session),
channels: make(map[string]*Channel),
subscribe: make(chan *Session),
publish: make(chan *envelope),
unsubscribe: make(chan *Session),
@ -77,12 +78,17 @@ loop:
// extends
h.rwmutex.Lock()
if _, ok := h.channels[s.channel]; !ok {
h.channels[s.channel] = s
h.channels[s.channel] = &Channel{
session: s,
online: 1,
name: s.channel,
}
} else {
h.channels[s.channel].prev = s // 原来的上一个指向现在的
s.next = h.channels[s.channel] // 现在的下一个是原来的
h.channels[s.channel].session.prev = s // 原来的上一个指向现在的
s.next = h.channels[s.channel].session // 现在的下一个是原来的
s.prev = nil // 成为队头
h.channels[s.channel] = s // 现在的替代原来的位置
h.channels[s.channel].session = s // 现在的替代原来的位置
h.channels[s.channel].online++ // 增加人数
}
h.rwmutex.Unlock()
case s := <-h.unsubscribe:
@ -94,15 +100,16 @@ loop:
if s.prev != nil {
s.prev.next = s.next
} else {
h.channels[s.channel] = s.next
h.channels[s.channel].session = s.next
}
h.channels[s.channel].online--
s.channel = "" // 置空
h.rwmutex.Unlock()
}
case m := <-h.publish:
h.rwmutex.RLock()
if _, ok := h.channels[m.c]; ok {
for s := h.channels[m.c]; s != nil; s = s.next {
for s := h.channels[m.c].session; s != nil; s = s.next {
if m.filter != nil {
if m.filter(s) {
s.writeMessage(m)

8
session.go

@ -293,6 +293,10 @@ func (s *Session) IsSubscribed() bool {
return s.channel != ""
}
func (s *Session) SubscribeName() string {
return s.channel
func (s *Session) Channel() *Channel {
if c, ok := s.melody.hub.channels[s.channel]; ok {
return c
} else {
return nil
}
}
Loading…
Cancel
Save