You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

187 lines
4.3 KiB

5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
  1. package connect
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/rs/zerolog/log"
  7. "gopkg.in/olahol/melody.v1"
  8. "strings"
  9. "sync"
  10. )
  11. // 根据主活动划分房间 ==> 适配现有的可以
  12. // 在主活动开始的时候
  13. // HandleConnect 处理房间号
  14. // HandleMessage 处理逻辑函数 包括登录, 同步 .... 发送到特定的用户
  15. // HandleDisconnect 处理房间删除, 链接删除
  16. // 登录, 用户注册
  17. // tag 或者 ?activity_id=? 作为一个标志
  18. // 同步消息 => 发送给所有的用户 使用协程池
  19. // NewWebSocket
  20. // Connect
  21. // Disconnect
  22. // Message
  23. // Run
  24. var N = NewNode()
  25. type Node struct {
  26. conns map[*melody.Session]*Client
  27. main *Client // 通过client 发送信息
  28. rooms map[string]*Room // 房间
  29. handles map[string]logic // 逻辑函数
  30. rwmux *sync.RWMutex
  31. }
  32. func NewNode() *Node {
  33. node := new(Node)
  34. node.conns = make(map[*melody.Session]*Client, 0)
  35. node.main = new(Client)
  36. node.rooms = make(map[string]*Room, 0)
  37. node.handles = make(map[string]logic, 0)
  38. node.rwmux = new(sync.RWMutex)
  39. return node
  40. }
  41. func (t *Node) handleMelodyConnect(client *Client) {
  42. log.Printf("[websocket] [connect] [room:%s] [client:%s] %s\n", client.RoomId, client.Id, client.Request.URL)
  43. }
  44. // 处理msg
  45. func (t *Node) handleMelodyMessage(client *Client, msg *Message) {
  46. if !client.Online && msg.Type != TypeLogin {
  47. client.Write([]byte("尚未登录"))
  48. return
  49. }
  50. if fn, ok := t.handles[msg.Type]; ok {
  51. fn(client, msg)
  52. }
  53. log.Printf("[websocket] [message] [room:%s] [client:%s] %v\n", client.RoomId, client.Id, msg)
  54. }
  55. // 删掉注册的
  56. func (t *Node) handleMelodyDisconnect(client *Client) {
  57. if client.Online && strings.HasPrefix(client.Id, IDCustomer) {
  58. if client.Pid == 0 { // 主账号下线 通知子账号
  59. msg := &Message{
  60. Type: TypeNotice,
  61. Dest: 0,
  62. Tag: IDCustomer,
  63. RoomId: client.RoomId,
  64. From: client.Id,
  65. Data: map[string]interface{}{
  66. "id": client.AccountId,
  67. "content": "主账号下线",
  68. },
  69. }
  70. t.Send(msg)
  71. } else { // 子账号下线 通知主账号
  72. msg := &Message{
  73. Type: TypeNotice,
  74. Dest: client.Pid,
  75. Tag: IDCustomer,
  76. RoomId: client.RoomId,
  77. From: client.Id,
  78. Data: map[string]interface{}{
  79. "id": client.AccountId,
  80. "content": "子账号下线",
  81. },
  82. }
  83. t.Send(msg)
  84. }
  85. }
  86. // 释放内存
  87. if room, ok := t.rooms[client.RoomId]; ok {
  88. room.DeleteClient(client.Id)
  89. if room.Len() <= 0 {
  90. t.DeleteRoom(room.Id)
  91. }
  92. }
  93. log.Printf("[websocket] [disconnect] [room:%s] [client:%s] online=>%v\n", client.RoomId, client.Id, client.Online)
  94. }
  95. func (t *Node) RegisterLogic(name string, fn logic) {
  96. t.handles[name] = fn
  97. }
  98. func (t *Node) RegisterClient(c *Client) error {
  99. if room, ok := t.rooms[c.RoomId]; ok {
  100. room.rwmux.Lock()
  101. defer room.rwmux.Unlock()
  102. room.clients[c.Id] = c
  103. return nil
  104. }
  105. return errors.New("加入房间失败")
  106. }
  107. // 所有人同步
  108. func (t *Node) BroadcastAll(msg *Message, c *Client) {
  109. // 发送给连上ws的所有人
  110. m, err := json.Marshal(msg)
  111. if err != nil {
  112. c.Write([]byte(err.Error()))
  113. return
  114. }
  115. for _, client := range t.conns {
  116. if client == c {
  117. continue
  118. }
  119. client.Write(m)
  120. }
  121. }
  122. func (t *Node) Send(msg *Message) {
  123. if msg.Dest == 0 && msg.Tag == "" { // 代表发送给所有人
  124. t.Broadcast(msg)
  125. } else if msg.Dest == 0 && msg.Tag != "" { // 代表发给某个团体
  126. t.BroadcastTag(msg)
  127. } else if msg.Dest != 0 && msg.Tag != "" {
  128. t.BroadcastDest(msg)
  129. } else {
  130. log.Printf("[websocket] [send] msg=>%v", msg)
  131. }
  132. }
  133. // 发送给所有的
  134. // delete 删除 room client
  135. func (t *Node) Broadcast(msg *Message) {
  136. m := msg.Encode()
  137. if room, ok := t.rooms[msg.RoomId]; ok && room != nil {
  138. for _, client := range room.clients {
  139. if client.Id != msg.From {
  140. client.Write(m)
  141. }
  142. }
  143. }
  144. }
  145. func (t *Node) BroadcastTag(msg *Message) {
  146. m := msg.Encode()
  147. if room, ok := t.rooms[msg.RoomId]; ok {
  148. for _, client := range room.clients {
  149. if client.AccountType == msg.Tag && client.Id != msg.From {
  150. client.Write(m)
  151. }
  152. }
  153. }
  154. }
  155. func (t *Node) BroadcastDest(msg *Message) {
  156. if room, ok := t.rooms[msg.RoomId]; ok {
  157. client := room.clients[fmt.Sprintf("%s:%d", msg.Tag, msg.Dest)]
  158. if client.Id != msg.From {
  159. client.Write(msg.Encode())
  160. }
  161. }
  162. }
  163. func (t *Node) DeleteRoom(roomId string) {
  164. t.rwmux.Lock()
  165. defer t.rwmux.Unlock()
  166. delete(t.rooms, roomId)
  167. }