commit 11225169f3b7a9d9d02953249eaa8756c2aa3529 Author: tommy <3405129587@qq.com> Date: Mon Mar 9 09:56:31 2020 +0800 互动同步 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6ec77ab --- /dev/null +++ b/.gitignore @@ -0,0 +1,22 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +.idea +hudongzhuanjia-go-build +hudongzhuanjia +go_build_main_go +.vscode +.log + + +services/pay/bindata.go \ No newline at end of file diff --git a/build.bat b/build.bat new file mode 100644 index 0000000..95ade2e --- /dev/null +++ b/build.bat @@ -0,0 +1,5 @@ +SET CGO_ENABLED=0 +SET GOOS=linux +SET GOARCH=amd64 + +go build main.go diff --git a/config.json b/config.json new file mode 100644 index 0000000..7de2ec3 --- /dev/null +++ b/config.json @@ -0,0 +1,8 @@ +{ + "port": "20182", + "host": "127.0.0.1", + "cross_domain": "*", + "post_max_memory": 1024000, + "api_router": "/PcClient/*", + "update_path": "example_update" +} \ No newline at end of file diff --git a/connect/client.go b/connect/client.go new file mode 100644 index 0000000..4fcbe64 --- /dev/null +++ b/connect/client.go @@ -0,0 +1,37 @@ +package connect + +import ( + "gopkg.in/olahol/melody.v1" +) + +type Client struct { + Id string // account_typ:account_id + AccountId int64 + AccountType string + Pid int64 + Online bool + RoomId string + node *Node + *melody.Session +} + +func NewClient(session *melody.Session, node *Node, roomId string) *Client { + client := new(Client) + client.Session = session + client.node = node + client.Online = false + client.RoomId = roomId + return client +} + +func (c *Client) BroadcastAll(msg *Message) { + c.node.BroadcastAll(msg, c) +} + +func (c *Client) Register() error { + return c.node.RegisterClient(c) +} + +func (c *Client) Send(msg *Message) { + c.node.Send(msg) +} diff --git a/connect/define.go b/connect/define.go new file mode 100644 index 0000000..28747d3 --- /dev/null +++ b/connect/define.go @@ -0,0 +1,14 @@ +package connect + +type logic func(client *Client, msg *Message) + +const IDCustomer = "customer" +const IDUser = "user" +const IDEntry = "entry" +const TypeNotice = "notice" +const TypeLogin = "login" + +const LogicLogin = "login" +const LogicSync = "sync" + +const PoolSize = 10000 diff --git a/connect/http.go b/connect/http.go new file mode 100644 index 0000000..e8c9570 --- /dev/null +++ b/connect/http.go @@ -0,0 +1,34 @@ +package connect + +import ( + "fmt" + "github.com/ouxuanserver/osmanthuswine/src/core" + "time" +) + +type MessageCtl struct { + core.Controller +} + +var now = time.Now() + +func (t *MessageCtl) Checkin() { + t.DisplayByData(map[string]interface{}{ + "version": now, + "status": "success", + }) +} + +func (t *MessageCtl) Receive() { + msg := new(Message) + err := t.RequestToStruct(msg) + if err != nil { + t.DisplayByError(fmt.Sprintf("信息发送失败, error: %v", err), 0) + } + go N.Send(msg) + // 通知特定的大屏幕和用户 + t.DisplayByData(map[string]interface{}{ + "date": time.Now(), + "msg": "success", + }) +} diff --git a/connect/logics/login.go b/connect/logics/login.go new file mode 100644 index 0000000..6295934 --- /dev/null +++ b/connect/logics/login.go @@ -0,0 +1,65 @@ +package logics + +import ( + "fmt" + "hudongzhuanjia/hdws/connect" + "hudongzhuanjia/libs/jwt" +) + +func init() { + connect.N.RegisterLogic(connect.LogicLogin, loginFunc) +} + +func loginFunc(c *connect.Client, msg *connect.Message) { + if _, ok := msg.Data["token"]; !ok { + c.Write([]byte("token不能为空")) + return + } + claims, err := jwt.ParseAccessToken(msg.Data["token"].(string)) + if err != nil { + c.Write([]byte(err.Error())) + return + } + c.Online = true + c.Id = fmt.Sprintf("%s:%d", claims.AccountType, claims.AccountId) + c.AccountId = claims.AccountId + c.AccountType = claims.AccountType + c.Pid = claims.CustomerPid + if err := c.Register(); err != nil { + c.Write([]byte(err.Error())) + return + } + + if claims.AccountType == connect.IDCustomer { + if c.Pid == 0 { // 主账号发给子账号 + msg := &connect.Message{ + Type: connect.TypeNotice, + Dest: 0, + Tag: connect.IDCustomer, + RoomId: c.RoomId, + From: c.Id, + Data: map[string]interface{}{ + "content": "主账号上线", + "nickname": "", + "username": claims.Username, + }, + } + c.Send(msg) + } else { // 子账号发给主账号 + msg := &connect.Message{ + Type: connect.TypeNotice, + Dest: claims.CustomerPid, + Tag: connect.IDCustomer, + RoomId: c.RoomId, + From: c.Id, + Data: map[string]interface{}{ + "content": "子账号上线", + "nickname": "", + "username": claims.Username, + }, + } + c.Send(msg) + } + } + c.Write([]byte("登录成功")) +} diff --git a/connect/logics/msg.go b/connect/logics/msg.go new file mode 100644 index 0000000..be7eb82 --- /dev/null +++ b/connect/logics/msg.go @@ -0,0 +1,12 @@ +package logics + +import "hudongzhuanjia/hdws/connect" + +func init() { + connect.N.RegisterLogic("msg", msgFunc) +} + +// 简单明了 +func msgFunc(c *connect.Client, msg *connect.Message) { + c.Send(msg) +} diff --git a/connect/logics/sync.go b/connect/logics/sync.go new file mode 100644 index 0000000..682029a --- /dev/null +++ b/connect/logics/sync.go @@ -0,0 +1,72 @@ +package logics + +import ( + "encoding/json" + "hudongzhuanjia/hdws/connect" + "hudongzhuanjia/models" + "time" +) + +func init() { + connect.N.RegisterLogic(connect.LogicSync, syncFunc) +} + +func syncFunc(c *connect.Client, msg *connect.Message) { + if c.AccountType != connect.IDCustomer && c.Pid != 0 { + c.Write([]byte("此账号无此权限")) + return + } + + operation, ok := msg.Data["operation"] + if !ok { + operation = "" + } + areaId, ok := msg.Data["area_id"] + if !ok { + areaId = 0.0 + } + moduleActivityId, ok := msg.Data["module_activity_id"] + if !ok { + moduleActivityId = 0.0 + } + moduleActivityType, ok := msg.Data["module_activity_type"] + if !ok { + moduleActivityType = "" + } + activityId, ok := msg.Data["activity_id"] + if !ok { + activityId = 0.0 + } + extraData := []byte("") + if otherData, ok := msg.Data["other_data"]; ok { + extraData, _ = json.Marshal(otherData) + } + + // 发送给所有的账户 + msg.Tag = "" + msg.Dest = 0 + msg.RoomId = c.RoomId + go c.Send(msg) + + op := &models.CustomerOperation{ + CustomerId: c.AccountId, + Operation: operation.(string), + AreaId: int64(areaId.(float64)), + ModuleActivityId: int64(moduleActivityId.(float64)), + ModuleActivityType: moduleActivityType.(string), + ActivityId: int64(activityId.(float64)), + ExtraData: string(extraData), + UpdatedAt: time.Now(), + CreatedAt: time.Now(), + } + err := models.Save(map[string]interface{}{ + "activity_id=": activityId, + "is_delete=": 0, + }, op) + if err != nil { + c.Write([]byte("同步失败")) + return + } + + c.Write([]byte("同步成功")) +} diff --git a/connect/message.go b/connect/message.go new file mode 100644 index 0000000..48d8330 --- /dev/null +++ b/connect/message.go @@ -0,0 +1,21 @@ +package connect + +import "encoding/json" + +type Message struct { + Type string `json:"type"` // 对应处理函数 / sync 同步函数,存入数据库 / msg 发送消息,不存入数据库 / login 登录校验某个客户是否存在 + From string `json:"-"` // 发送方 ==> 自行处理, 无需发送 + Dest int64 `json:"dest"` // 目标id / 0 代表所有 非0代表某一个 + Tag string `json:"tag"` // 目标类型 customer代表客户 user代表用户 entry代表录入人员 + RoomId string `json:"room_id"` // 房间id // activity_id + Data map[string]interface{} `json:"data"` +} + +func (m *Message) Encode() []byte { + buf, _ := json.Marshal(m) + return buf +} + +func (m *Message) Decode(buf []byte) error { + return json.Unmarshal(buf, m) +} diff --git a/connect/node.go b/connect/node.go new file mode 100644 index 0000000..ccde4d0 --- /dev/null +++ b/connect/node.go @@ -0,0 +1,187 @@ +package connect + +import ( + "encoding/json" + "errors" + "fmt" + "gopkg.in/olahol/melody.v1" + "hudongzhuanjia/logger" + "strings" + "sync" +) + +// 根据主活动划分房间 ==> 适配现有的可以 +// 在主活动开始的时候 +// HandleConnect 处理房间号 +// HandleMessage 处理逻辑函数 包括登录, 同步 .... 发送到特定的用户 +// HandleDisconnect 处理房间删除, 链接删除 + +// 登录, 用户注册 +// tag 或者 ?activity_id=? 作为一个标志 + +// 同步消息 => 发送给所有的用户 使用协程池 + +// NewWebSocket +// Connect +// Disconnect +// Message +// Run + +var N = NewNode() + +type Node struct { + conns map[*melody.Session]*Client + main *Client // 通过client 发送信息 + rooms map[string]*Room // 房间 + handles map[string]logic // 逻辑函数 + rwmux *sync.RWMutex +} + +func NewNode() *Node { + node := new(Node) + node.conns = make(map[*melody.Session]*Client, 0) + node.main = new(Client) + node.rooms = make(map[string]*Room, 0) + node.handles = make(map[string]logic, 0) + node.rwmux = new(sync.RWMutex) + return node +} + +func (t *Node) handleMelodyConnect(client *Client) { + logger.Sugar.Infof("[websocket] [connect] [room:%s] [client:%s] %s", client.RoomId, client.Id, client.Request.URL) +} + +// 处理msg +func (t *Node) handleMelodyMessage(client *Client, msg *Message) { + if !client.Online && msg.Type != TypeLogin { + client.Write([]byte("尚未登录")) + return + } + + if fn, ok := t.handles[msg.Type]; ok { + fn(client, msg) + } + logger.Sugar.Infof("[websocket] [message] [room:%s] [client:%s] %v", client.RoomId, client.Id, msg) +} + +// 删掉注册的 +func (t *Node) handleMelodyDisconnect(client *Client) { + if client.Online && strings.HasPrefix(client.Id, IDCustomer) { + if client.Pid == 0 { // 主账号下线 通知子账号 + msg := &Message{ + Type: TypeNotice, + Dest: 0, + Tag: IDCustomer, + RoomId: client.RoomId, + From: client.Id, + Data: map[string]interface{}{ + "id": client.AccountId, + "content": "主账号下线", + }, + } + t.Send(msg) + } else { // 子账号下线 通知主账号 + msg := &Message{ + Type: TypeNotice, + Dest: client.Pid, + Tag: IDCustomer, + RoomId: client.RoomId, + From: client.Id, + Data: map[string]interface{}{ + "id": client.AccountId, + "content": "子账号下线", + }, + } + t.Send(msg) + } + } + // 释放内存 + if room, ok := t.rooms[client.RoomId]; ok { + room.DeleteClient(client.Id) + if room.Len() <= 0 { + t.DeleteRoom(room.Id) + } + } + logger.Sugar.Infof("[websocket] [disconnect] [room:%s] [client:%s] online=>%v", client.RoomId, client.Id, client.Online) +} + +func (t *Node) RegisterLogic(name string, fn logic) { + t.handles[name] = fn +} + +func (t *Node) RegisterClient(c *Client) error { + if room, ok := t.rooms[c.RoomId]; ok { + room.rwmux.Lock() + defer room.rwmux.Unlock() + room.clients[c.Id] = c + return nil + } + return errors.New("加入房间失败") +} + +// 所有人同步 +func (t *Node) BroadcastAll(msg *Message, c *Client) { + // 发送给连上ws的所有人 + m, err := json.Marshal(msg) + if err != nil { + c.Write([]byte(err.Error())) + return + } + for _, client := range t.conns { + if client == c { + continue + } + client.Write(m) + } +} + +func (t *Node) Send(msg *Message) { + if msg.Dest == 0 && msg.Tag == "" { // 代表发送给所有人 + t.Broadcast(msg) + } else if msg.Dest == 0 && msg.Tag != "" { // 代表发给某个团体 + t.BroadcastTag(msg) + } else if msg.Dest != 0 && msg.Tag != "" { + t.BroadcastDest(msg) + } else { + logger.Sugar.Infof("[websocket] [send] msg=>%v", msg) + } +} + +// 发送给所有的 +// delete 删除 room client +func (t *Node) Broadcast(msg *Message) { + m := msg.Encode() + if room, ok := t.rooms[msg.RoomId]; ok && room != nil { + for _, client := range room.clients { + if client.Id != msg.From { + client.Write(m) + } + } + } +} + +func (t *Node) BroadcastTag(msg *Message) { + m := msg.Encode() + if room, ok := t.rooms[msg.RoomId]; ok { + for _, client := range room.clients { + if client.AccountType == msg.Tag && client.Id != msg.From { + client.Write(m) + } + } + } +} + +func (t *Node) BroadcastDest(msg *Message) { + if room, ok := t.rooms[msg.RoomId]; ok { + client := room.clients[fmt.Sprintf("%s:%d", msg.Tag, msg.Dest)] + if client.Id != msg.From { + client.Write(msg.Encode()) + } + } +} + +func (t *Node) DeleteRoom(roomId string) { + t.rwmux.Lock() + defer t.rwmux.Unlock() + delete(t.rooms, roomId) +} diff --git a/connect/room.go b/connect/room.go new file mode 100644 index 0000000..c34b3a1 --- /dev/null +++ b/connect/room.go @@ -0,0 +1,31 @@ +package connect + +import ( + "sync" +) + +type Room struct { + Id string + clients map[string]*Client + rwmux *sync.RWMutex +} + +func NewRoom(id string) *Room { + room := new(Room) + room.Id = id + room.clients = make(map[string]*Client, 0) + room.rwmux = new(sync.RWMutex) + return room +} + +func (t *Room) Len() int { + t.rwmux.RLock() + defer t.rwmux.RUnlock() + return len(t.clients) +} + +func (t *Room) DeleteClient(clientId string) { + t.rwmux.Lock() + defer t.rwmux.Unlock() + delete(t.clients, clientId) +} diff --git a/connect/ws.go b/connect/ws.go new file mode 100644 index 0000000..56ef710 --- /dev/null +++ b/connect/ws.go @@ -0,0 +1,76 @@ +package connect + +import ( + "encoding/json" + "github.com/ouxuanserver/osmanthuswine/src/core" + "gopkg.in/olahol/melody.v1" + "math" +) + +type WsCtl struct { + core.WebSocket + node *Node +} + +func (ws *WsCtl) HandleConnect(s *melody.Session) { + if ws.node == nil { + ws.node = N + ws.GetMelody().Config.MaxMessageSize = math.MaxInt16 + ws.GetMelody().Config.MessageBufferSize = math.MaxInt16 + } + ws.node.rwmux.Lock() + defer ws.node.rwmux.Unlock() + roomId := s.Request.URL.Query().Get("activity_id") + if _, ok := ws.node.rooms[roomId]; !ok { // 创建房间 + ws.node.rooms[roomId] = NewRoom(roomId) + } + client := NewClient(s, ws.node, roomId) + ws.node.conns[s] = client + ws.node.handleMelodyConnect(client) +} + +func (ws *WsCtl) HandleMessage(s *melody.Session, buf []byte) { + msg := new(Message) + err := json.Unmarshal(buf, msg) + if err != nil { + s.Write([]byte(err.Error())) + return + } + + client, ok := ws.node.conns[s] // 鉴定某个链接是否登录 + if !ok { + s.Write([]byte("链接无效")) + return + } + + ws.node.handleMelodyMessage(client, msg) +} + +func (ws *WsCtl) HandleDisconnect(s *melody.Session) { + if client, ok := ws.node.conns[s]; ok { + ws.node.handleMelodyDisconnect(client) + } + ws.node.rwmux.Lock() + defer ws.node.rwmux.Unlock() + delete(ws.node.conns, s) +} + +func (ws *WsCtl) HandleError(s *melody.Session, err error) { + +} + +func (ws *WsCtl) HandleMessageBinary(s *melody.Session, msg []byte) { + +} + +func (ws *WsCtl) HandlePong(s *melody.Session) { + +} + +func (ws *WsCtl) HandleSentMessage(s *melody.Session, msg []byte) { + +} + +func (ws *WsCtl) HandleSentMessageBinary(*melody.Session, []byte) { + +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..9038dd3 --- /dev/null +++ b/main.go @@ -0,0 +1,15 @@ +package main + +import ( + "github.com/ouxuanserver/osmanthuswine" + "github.com/ouxuanserver/osmanthuswine/src/core" + "hudongzhuanjia/hdws/connect" + _ "hudongzhuanjia/hdws/connect/logics" +) + +func main() { + // hdws + core.GetInstanceRouterManage().Registered(new(connect.WsCtl)) + core.GetInstanceRouterManage().Registered(new(connect.MessageCtl)) + osmanthuswine.Run() +} diff --git a/private.json b/private.json new file mode 100644 index 0000000..af8778c --- /dev/null +++ b/private.json @@ -0,0 +1,11 @@ +{ + "db": { + "host": "gz-cdb-onvbcfqn.sql.tencentcdb.com", + "port": "61232", + "user": "root", + "password": "Pox2210XXa@", + "name": "hudongzhuanjia", + "prefix": "ox_", + "max_open_conn": 500 + } +} \ No newline at end of file