package ws import ( "errors" "fmt" "sync" "github.com/rs/zerolog/log" "gopkg.in/olahol/melody.v1" ) // 根据主活动划分房间 ==> 适配现有的可以 // 在主活动开始的时候 // HandleConnect 处理房间号 // HandleMessage 处理逻辑函数 包括登录, 同步 .... 发送到特定的用户 // HandleDisconnect 处理房间删除, 链接删除 // 登录, 用户注册 // tag 或者 ?activity_id=? 作为一个标志 // 同步消息 => 发送给所有的用户 使用协程池 // NewWebSocket // Connect // Disconnect // Message // Run var N = NewNode() type Node struct { conns map[*melody.Session]*Client main *Client // 通过client 发送信息 rooms map[string]*Room // 房间 handles map[string]logic // 逻辑函数 rwmux *sync.RWMutex } func NewNode() *Node { node := new(Node) node.conns = make(map[*melody.Session]*Client, 0) node.main = new(Client) node.rooms = make(map[string]*Room, 0) node.handles = make(map[string]logic, 0) node.rwmux = new(sync.RWMutex) return node } func (t *Node) handleMelodyConnect(client *Client) { log.Printf("[websocket] [ws] [room:%s] [client:%s] %s\n", client.RoomId, client.Id, client.Request.URL) } // 处理msg func (t *Node) handleMelodyMessage(client *Client, msg *Message) { if !client.Online && msg.Type != TypeLogin { _ = client.WriteJson(map[string]interface{}{ "msg": "尚未登录", "code": 504, }) return } if fn, ok := t.handles[msg.Type]; ok { fn(client, msg) } log.Printf("[websocket] [message] [room:%s] [client:%s] %v\n", client.RoomId, client.Id, msg) } // 删掉注册的 func (t *Node) handleMelodyDisconnect(client *Client) { //if client.Online && strings.HasPrefix(client.Id, IDCustomer) { // if client.Pid == 0 { // 主账号下线 通知子账号 // msg := &Message{ // Type: TypeNotice, // Dest: 0, // Tag: IDCustomer, // RoomId: client.RoomId, // From: client.Id, // Data: map[string]interface{}{ // "id": client.AccountId, // "content": "主账号下线", // }, // } // t.Send(msg) // } else { // 子账号下线 通知主账号 // msg := &Message{ // Type: TypeNotice, // Dest: client.Pid, // Tag: IDCustomer, // RoomId: client.RoomId, // From: client.Id, // Data: map[string]interface{}{ // "id": client.AccountId, // "content": "子账号下线", // }, // } // t.Send(msg) // } //} // 释放内存 if room, ok := t.rooms[client.RoomId]; ok { room.DeleteClient(client.Id) if room.Len() <= 0 { t.DeleteRoom(room.Id) } } log.Printf("[websocket] [disconnect] [room:%s] [client:%s] online=>%v\n", client.RoomId, client.Id, client.Online) } func (t *Node) SetMiddleware(name string, fn logic) { t.handles[name] = fn } func (t *Node) RegisterClient(c *Client) error { if room, ok := t.rooms[c.RoomId]; ok { room.rwmux.Lock() defer room.rwmux.Unlock() if oc, exist := room.clients[c.Id]; exist { oc.WriteJson(&Message{ Type: "close", Data: map[string]interface{}{ "error_code": 302, "error_msg": "链接已被替换", }, }) oc.Close() } room.clients[c.Id] = c return nil } return errors.New("加入房间失败") } // 所有人同步 func (t *Node) BroadcastAll(msg *Message, c *Client) { // 发送给连上ws的所有人 m := msg.Encode() for _, client := range t.conns { if client == c { continue } _ = client.Write(m) } } func (t *Node) Send(msg *Message) { if msg.Dest == 0 && msg.Tag == "" { // 代表发送给所有人 t.Broadcast(msg) } else if msg.Dest == 0 && msg.Tag != "" { // 代表发给某个团体 t.BroadcastTag(msg) } else if msg.Dest != 0 && msg.Tag != "" { t.BroadcastDest(msg) } log.Printf("[websocket] [send] msg=>%+v", msg) } // 发送给所有的 // delete 删除 room client func (t *Node) Broadcast(msg *Message) { m := msg.Encode() if room, ok := t.rooms[msg.RoomId]; ok && room != nil { for _, client := range room.clients { if client.Id != msg.From { _ = client.Write(m) } } } } func (t *Node) BroadcastTag(msg *Message) { m := msg.Encode() if room, ok := t.rooms[msg.RoomId]; ok { for _, client := range room.clients { if client != nil { if client.AccountType == msg.Tag && client.Id != msg.From { _ = client.Write(m) } } } } } func (t *Node) BroadcastDest(msg *Message) { m := msg.Encode() if room, ok := t.rooms[msg.RoomId]; ok { client := room.clients[fmt.Sprintf("%s:%d", msg.Tag, msg.Dest)] if client != nil { if client.Id != msg.From { _ = client.Write(m) } } } } func (t *Node) DeleteRoom(roomId string) { t.rwmux.Lock() defer t.rwmux.Unlock() delete(t.rooms, roomId) }