Browse Source

add sub pub

master
黄梓健 5 years ago
parent
commit
95895044a2
  1. 4
      .gitignore
  2. 5
      channel.go
  3. 2
      envelope.go
  4. 36
      examples/channel/main.go
  5. 51
      hub.go
  6. 57
      melody.go
  7. 79
      session.go

4
.gitignore

@ -3,3 +3,7 @@ benchmark
*.swp
coverage.out
Makefile
go.sum
go.mod
todo.md
.idea

5
channel.go

@ -0,0 +1,5 @@
package melody
type Channel struct {
}

2
envelope.go

@ -4,4 +4,6 @@ type envelope struct {
t int
msg []byte
filter filterFunc
// extends
c string
}

36
examples/channel/main.go

@ -0,0 +1,36 @@
package main
import (
"log"
"melody"
"github.com/gin-gonic/gin"
)
func main() {
m := melody.New()
router := gin.Default()
router.GET("/chat", func(context *gin.Context) {
if err := m.HandleRequest(context.Writer, context.Request); err != nil {
log.Println(err)
}
})
m.HandleConnect(func(session *melody.Session) {
ch := session.Request.URL.Query().Get("channel")
if err := session.Subscribe(ch); err != nil {
log.Println(err)
}
})
m.HandleMessage(func(session *melody.Session, msg []byte) {
if err := session.Publish(msg); err != nil {
log.Println(err)
}
})
m.HandleSentMessage(func(session *melody.Session, bytes []byte) {
log.Printf("%+v", string(bytes))
})
router.Run(":8080")
}

51
hub.go

@ -12,6 +12,11 @@ type hub struct {
exit chan *envelope
open bool
rwmutex *sync.RWMutex
// extends
channels map[string]*Session
subscribe chan *Session
publish chan *envelope
unsubscribe chan *Session
}
func newHub() *hub {
@ -23,6 +28,11 @@ func newHub() *hub {
exit: make(chan *envelope),
open: true,
rwmutex: &sync.RWMutex{},
// extends
channels: make(map[string]*Session),
subscribe: make(chan *Session),
publish: make(chan *envelope),
unsubscribe: make(chan *Session),
}
}
@ -62,6 +72,47 @@ loop:
h.open = false
h.rwmutex.Unlock()
break loop
case s := <-h.subscribe:
// extends
h.rwmutex.Lock()
if _, ok := h.channels[s.channel]; !ok {
h.channels[s.channel] = s
} else {
h.channels[s.channel].prev = s // 原来的上一个指向现在的
s.next = h.channels[s.channel] // 现在的下一个是原来的
s.prev = nil // 成为队头
h.channels[s.channel] = s // 现在的替代原来的位置
}
h.rwmutex.Unlock()
case s := <-h.unsubscribe:
if _, ok := h.channels[s.channel]; ok {
h.rwmutex.Lock()
if s.next != nil {
s.next.prev = s.prev
}
if s.prev != nil {
s.prev.next = s.next
} else {
h.channels[s.channel] = s.next
}
s.channel = "" // 置空
h.rwmutex.Unlock()
}
case m := <-h.publish:
h.rwmutex.RLock()
if _, ok := h.channels[m.c]; ok {
for s := h.channels[m.c]; s != nil; s = s.next {
if m.filter != nil {
if m.filter(s) {
s.writeMessage(m)
}
} else {
s.writeMessage(m)
}
}
}
h.rwmutex.RUnlock()
}
}
}

57
melody.go

@ -181,6 +181,10 @@ func (m *Melody) HandleRequestWithKeys(w http.ResponseWriter, r *http.Request, k
melody: m,
open: true,
rwmutex: &sync.RWMutex{},
// extends
next: nil,
prev: nil,
channel: "",
}
m.hub.register <- session
@ -311,3 +315,56 @@ func (m *Melody) IsClosed() bool {
func FormatCloseMessage(closeCode int, text string) []byte {
return websocket.FormatCloseMessage(closeCode, text)
}
// extends
func (m *Melody) Subscribe(s *Session, c string) error {
if m.hub.closed() {
return errors.New("melody instance is already closed")
}
return s.Subscribe(c)
}
func (m *Melody) Unsubscribe(s *Session, c string) error {
if m.hub.closed() {
return errors.New("melody instance is already closed")
}
return s.Subscribe(c)
}
func (m *Melody) Publish(msg []byte, c string) error {
return m.PublishFilter(msg, c, nil)
}
func (m *Melody) PublishOthers(msg []byte, c string, s *Session) error {
return m.PublishFilter(msg, c, func(session *Session) bool {
return s != session
})
}
func (m *Melody) PublishFilter(msg []byte, c string, fn func(*Session) bool) error {
if m.hub.closed() {
return errors.New("melody instance is already closed")
}
message := &envelope{t: websocket.TextMessage, msg: msg, c: c, filter: fn}
m.hub.publish <- message
return nil
}
func (m *Melody) PublishBinary(msg []byte, c string) error {
return m.PublishBinaryFilter(msg, c, nil)
}
func (m *Melody) PublishBinaryOthers(msg []byte, c string, s *Session) error {
return m.PublishFilter(msg, c, func(session *Session) bool {
return s != session
})
}
func (m *Melody) PublishBinaryFilter(msg []byte, c string, fn func(*Session) bool) error {
if m.hub.closed() {
return errors.New("melody instance is already closed")
}
message := &envelope{t: websocket.BinaryMessage, msg: msg, c: c, filter: fn}
m.hub.publish <- message
return nil
}

79
session.go

@ -18,6 +18,10 @@ type Session struct {
melody *Melody
open bool
rwmutex *sync.RWMutex
// extends
channel string
next *Session
prev *Session
}
func (s *Session) writeMessage(message *envelope) {
@ -217,3 +221,78 @@ func (s *Session) MustGet(key string) interface{} {
func (s *Session) IsClosed() bool {
return s.closed()
}
// extends
func (s *Session) Subscribe(c string) error {
//
if s.closed() {
return errors.New("tried to write to closed a session")
}
if s.channel != "" && s.channel != c { // 存在订阅 并与接下来的channel无法共存
return errors.New("session already subscribe channel")
} else {
s.channel = c
s.melody.hub.subscribe <- s
}
return nil
}
func (s *Session) Unsubscribe() error {
if s.closed() {
return errors.New("tried to write to closed a session")
}
if s.channel == "" {
return errors.New("session not yet subscribe channel")
} else {
s.melody.hub.unsubscribe <- s
}
return nil
}
func (s *Session) Publish(msg []byte) error {
return s.PublishFilter(msg, nil)
}
func (s *Session) PublishFilter(msg []byte, fn func(*Session) bool) error {
if s.melody.hub.closed() {
return errors.New("melody instance is closed")
}
message := &envelope{t: websocket.TextMessage, msg: msg, c: s.channel, filter: fn}
s.melody.hub.publish <- message
return nil
}
func (s *Session) PublishOthers(msg []byte) error {
return s.PublishFilter(msg, func(session *Session) bool {
return s != session
})
}
func (s *Session) PublishBinary(msg []byte) error {
return s.PublishBinaryFilter(msg, nil)
}
func (s *Session) PublishBinaryOthers(msg []byte) error {
return s.PublishBinaryFilter(msg, func(session *Session) bool {
return s != session
})
}
func (s *Session) PublishBinaryFilter(msg []byte, fn func(*Session) bool) error {
if s.melody.hub.closed() {
return errors.New("melody instance is closed")
}
message := &envelope{t: websocket.BinaryMessage, msg: msg, c: s.channel, filter: fn}
s.melody.hub.publish <- message
return nil
}
func (s *Session) IsSubscribed() bool {
return s.channel != ""
}
func (s *Session) SubscribeName() string {
return s.channel
}
Loading…
Cancel
Save