Browse Source

Issue #3: Add close method

master
Ola Holmström 9 years ago
parent
commit
ff425ac175
  1. 2
      .travis.yml
  2. 4
      CHANGELOG.md
  3. 15
      hub.go
  4. 7
      melody.go
  5. 15
      melody_test.go
  6. 5
      session.go

2
.travis.yml

@ -1,6 +1,8 @@
language: go language: go
sudo: required
go: go:
- 1.4 - 1.4
- 1.5
before_install: before_install:
- go get github.com/axw/gocov/gocov - go get github.com/axw/gocov/gocov
- go get github.com/mattn/goveralls - go get github.com/mattn/goveralls

4
CHANGELOG.md

@ -1,3 +1,7 @@
## 2015-09-03
* Add `Close` method to melody instance.
### 2015-06-10 ### 2015-06-10
* Support for binary messages. * Support for binary messages.

15
hub.go

@ -5,6 +5,8 @@ type hub struct {
broadcast chan *envelope broadcast chan *envelope
register chan *Session register chan *Session
unregister chan *Session unregister chan *Session
exit chan bool
open bool
} }
func newHub() *hub { func newHub() *hub {
@ -13,10 +15,13 @@ func newHub() *hub {
broadcast: make(chan *envelope), broadcast: make(chan *envelope),
register: make(chan *Session), register: make(chan *Session),
unregister: make(chan *Session), unregister: make(chan *Session),
exit: make(chan bool),
open: true,
} }
} }
func (h *hub) run() { func (h *hub) run() {
loop:
for { for {
select { select {
case s := <-h.register: case s := <-h.register:
@ -24,8 +29,8 @@ func (h *hub) run() {
case s := <-h.unregister: case s := <-h.unregister:
if _, ok := h.sessions[s]; ok { if _, ok := h.sessions[s]; ok {
delete(h.sessions, s) delete(h.sessions, s)
close(s.output)
s.conn.Close() s.conn.Close()
close(s.output)
} }
case m := <-h.broadcast: case m := <-h.broadcast:
for s := range h.sessions { for s := range h.sessions {
@ -37,6 +42,14 @@ func (h *hub) run() {
go s.writeMessage(m) go s.writeMessage(m)
} }
} }
case <-h.exit:
for s := range h.sessions {
delete(h.sessions, s)
s.conn.Close()
close(s.output)
}
h.open = false
break loop
} }
} }
} }

7
melody.go

@ -88,7 +88,9 @@ func (m *Melody) HandleRequest(w http.ResponseWriter, r *http.Request) {
session.readPump(m.messageHandler, m.messageHandlerBinary, m.errorHandler) session.readPump(m.messageHandler, m.messageHandlerBinary, m.errorHandler)
if m.hub.open {
m.hub.unregister <- session m.hub.unregister <- session
}
go m.disconnectHandler(session) go m.disconnectHandler(session)
} }
@ -111,3 +113,8 @@ func (m *Melody) BroadcastOthers(msg []byte, s *Session) {
return s != q return s != q
}) })
} }
// Closes the melody instance and all connected sessions.
func (m *Melody) Close() {
m.hub.exit <- true
}

15
melody_test.go

@ -325,3 +325,18 @@ func TestBroadcastFilter(t *testing.T) {
t.Errorf("should not be false") t.Errorf("should not be false")
} }
} }
func TestStop(t *testing.T) {
noecho := NewTestServer()
server := httptest.NewServer(noecho)
defer server.Close()
conn, err := NewDialer(server.URL)
defer conn.Close()
if err != nil {
t.Error(err)
}
noecho.m.Close()
}

5
session.go

@ -60,16 +60,17 @@ func (s *Session) writePump(errorHandler handleErrorFunc) {
ticker := time.NewTicker(s.config.PingPeriod) ticker := time.NewTicker(s.config.PingPeriod)
defer ticker.Stop() defer ticker.Stop()
loop:
for { for {
select { select {
case msg, ok := <-s.output: case msg, ok := <-s.output:
if !ok { if !ok {
s.close() s.close()
return
break loop
} }
if err := s.writeRaw(msg); err != nil { if err := s.writeRaw(msg); err != nil {
go errorHandler(s, err) go errorHandler(s, err)
return
break loop
} }
case <-ticker.C: case <-ticker.C:
s.ping() s.ping()

Loading…
Cancel
Save