commit
11225169f3
15 changed files with 610 additions and 0 deletions
-
22.gitignore
-
5build.bat
-
8config.json
-
37connect/client.go
-
14connect/define.go
-
34connect/http.go
-
65connect/logics/login.go
-
12connect/logics/msg.go
-
72connect/logics/sync.go
-
21connect/message.go
-
187connect/node.go
-
31connect/room.go
-
76connect/ws.go
-
15main.go
-
11private.json
@ -0,0 +1,22 @@ |
|||
# Binaries for programs and plugins |
|||
*.exe |
|||
*.exe~ |
|||
*.dll |
|||
*.so |
|||
*.dylib |
|||
|
|||
# Test binary, built with `go test -c` |
|||
*.test |
|||
|
|||
# Output of the go coverage tool, specifically when used with LiteIDE |
|||
*.out |
|||
|
|||
.idea |
|||
hudongzhuanjia-go-build |
|||
hudongzhuanjia |
|||
go_build_main_go |
|||
.vscode |
|||
.log |
|||
|
|||
|
|||
services/pay/bindata.go |
@ -0,0 +1,5 @@ |
|||
SET CGO_ENABLED=0 |
|||
SET GOOS=linux |
|||
SET GOARCH=amd64 |
|||
|
|||
go build main.go |
@ -0,0 +1,8 @@ |
|||
{ |
|||
"port": "20182", |
|||
"host": "127.0.0.1", |
|||
"cross_domain": "*", |
|||
"post_max_memory": 1024000, |
|||
"api_router": "/PcClient/*", |
|||
"update_path": "example_update" |
|||
} |
@ -0,0 +1,37 @@ |
|||
package connect |
|||
|
|||
import ( |
|||
"gopkg.in/olahol/melody.v1" |
|||
) |
|||
|
|||
type Client struct { |
|||
Id string // account_typ:account_id
|
|||
AccountId int64 |
|||
AccountType string |
|||
Pid int64 |
|||
Online bool |
|||
RoomId string |
|||
node *Node |
|||
*melody.Session |
|||
} |
|||
|
|||
func NewClient(session *melody.Session, node *Node, roomId string) *Client { |
|||
client := new(Client) |
|||
client.Session = session |
|||
client.node = node |
|||
client.Online = false |
|||
client.RoomId = roomId |
|||
return client |
|||
} |
|||
|
|||
func (c *Client) BroadcastAll(msg *Message) { |
|||
c.node.BroadcastAll(msg, c) |
|||
} |
|||
|
|||
func (c *Client) Register() error { |
|||
return c.node.RegisterClient(c) |
|||
} |
|||
|
|||
func (c *Client) Send(msg *Message) { |
|||
c.node.Send(msg) |
|||
} |
@ -0,0 +1,14 @@ |
|||
package connect |
|||
|
|||
type logic func(client *Client, msg *Message) |
|||
|
|||
const IDCustomer = "customer" |
|||
const IDUser = "user" |
|||
const IDEntry = "entry" |
|||
const TypeNotice = "notice" |
|||
const TypeLogin = "login" |
|||
|
|||
const LogicLogin = "login" |
|||
const LogicSync = "sync" |
|||
|
|||
const PoolSize = 10000 |
@ -0,0 +1,34 @@ |
|||
package connect |
|||
|
|||
import ( |
|||
"fmt" |
|||
"github.com/ouxuanserver/osmanthuswine/src/core" |
|||
"time" |
|||
) |
|||
|
|||
type MessageCtl struct { |
|||
core.Controller |
|||
} |
|||
|
|||
var now = time.Now() |
|||
|
|||
func (t *MessageCtl) Checkin() { |
|||
t.DisplayByData(map[string]interface{}{ |
|||
"version": now, |
|||
"status": "success", |
|||
}) |
|||
} |
|||
|
|||
func (t *MessageCtl) Receive() { |
|||
msg := new(Message) |
|||
err := t.RequestToStruct(msg) |
|||
if err != nil { |
|||
t.DisplayByError(fmt.Sprintf("信息发送失败, error: %v", err), 0) |
|||
} |
|||
go N.Send(msg) |
|||
// 通知特定的大屏幕和用户
|
|||
t.DisplayByData(map[string]interface{}{ |
|||
"date": time.Now(), |
|||
"msg": "success", |
|||
}) |
|||
} |
@ -0,0 +1,65 @@ |
|||
package logics |
|||
|
|||
import ( |
|||
"fmt" |
|||
"hudongzhuanjia/hdws/connect" |
|||
"hudongzhuanjia/libs/jwt" |
|||
) |
|||
|
|||
func init() { |
|||
connect.N.RegisterLogic(connect.LogicLogin, loginFunc) |
|||
} |
|||
|
|||
func loginFunc(c *connect.Client, msg *connect.Message) { |
|||
if _, ok := msg.Data["token"]; !ok { |
|||
c.Write([]byte("token不能为空")) |
|||
return |
|||
} |
|||
claims, err := jwt.ParseAccessToken(msg.Data["token"].(string)) |
|||
if err != nil { |
|||
c.Write([]byte(err.Error())) |
|||
return |
|||
} |
|||
c.Online = true |
|||
c.Id = fmt.Sprintf("%s:%d", claims.AccountType, claims.AccountId) |
|||
c.AccountId = claims.AccountId |
|||
c.AccountType = claims.AccountType |
|||
c.Pid = claims.CustomerPid |
|||
if err := c.Register(); err != nil { |
|||
c.Write([]byte(err.Error())) |
|||
return |
|||
} |
|||
|
|||
if claims.AccountType == connect.IDCustomer { |
|||
if c.Pid == 0 { // 主账号发给子账号
|
|||
msg := &connect.Message{ |
|||
Type: connect.TypeNotice, |
|||
Dest: 0, |
|||
Tag: connect.IDCustomer, |
|||
RoomId: c.RoomId, |
|||
From: c.Id, |
|||
Data: map[string]interface{}{ |
|||
"content": "主账号上线", |
|||
"nickname": "", |
|||
"username": claims.Username, |
|||
}, |
|||
} |
|||
c.Send(msg) |
|||
} else { // 子账号发给主账号
|
|||
msg := &connect.Message{ |
|||
Type: connect.TypeNotice, |
|||
Dest: claims.CustomerPid, |
|||
Tag: connect.IDCustomer, |
|||
RoomId: c.RoomId, |
|||
From: c.Id, |
|||
Data: map[string]interface{}{ |
|||
"content": "子账号上线", |
|||
"nickname": "", |
|||
"username": claims.Username, |
|||
}, |
|||
} |
|||
c.Send(msg) |
|||
} |
|||
} |
|||
c.Write([]byte("登录成功")) |
|||
} |
@ -0,0 +1,12 @@ |
|||
package logics |
|||
|
|||
import "hudongzhuanjia/hdws/connect" |
|||
|
|||
func init() { |
|||
connect.N.RegisterLogic("msg", msgFunc) |
|||
} |
|||
|
|||
// 简单明了
|
|||
func msgFunc(c *connect.Client, msg *connect.Message) { |
|||
c.Send(msg) |
|||
} |
@ -0,0 +1,72 @@ |
|||
package logics |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"hudongzhuanjia/hdws/connect" |
|||
"hudongzhuanjia/models" |
|||
"time" |
|||
) |
|||
|
|||
func init() { |
|||
connect.N.RegisterLogic(connect.LogicSync, syncFunc) |
|||
} |
|||
|
|||
func syncFunc(c *connect.Client, msg *connect.Message) { |
|||
if c.AccountType != connect.IDCustomer && c.Pid != 0 { |
|||
c.Write([]byte("此账号无此权限")) |
|||
return |
|||
} |
|||
|
|||
operation, ok := msg.Data["operation"] |
|||
if !ok { |
|||
operation = "" |
|||
} |
|||
areaId, ok := msg.Data["area_id"] |
|||
if !ok { |
|||
areaId = 0.0 |
|||
} |
|||
moduleActivityId, ok := msg.Data["module_activity_id"] |
|||
if !ok { |
|||
moduleActivityId = 0.0 |
|||
} |
|||
moduleActivityType, ok := msg.Data["module_activity_type"] |
|||
if !ok { |
|||
moduleActivityType = "" |
|||
} |
|||
activityId, ok := msg.Data["activity_id"] |
|||
if !ok { |
|||
activityId = 0.0 |
|||
} |
|||
extraData := []byte("") |
|||
if otherData, ok := msg.Data["other_data"]; ok { |
|||
extraData, _ = json.Marshal(otherData) |
|||
} |
|||
|
|||
// 发送给所有的账户
|
|||
msg.Tag = "" |
|||
msg.Dest = 0 |
|||
msg.RoomId = c.RoomId |
|||
go c.Send(msg) |
|||
|
|||
op := &models.CustomerOperation{ |
|||
CustomerId: c.AccountId, |
|||
Operation: operation.(string), |
|||
AreaId: int64(areaId.(float64)), |
|||
ModuleActivityId: int64(moduleActivityId.(float64)), |
|||
ModuleActivityType: moduleActivityType.(string), |
|||
ActivityId: int64(activityId.(float64)), |
|||
ExtraData: string(extraData), |
|||
UpdatedAt: time.Now(), |
|||
CreatedAt: time.Now(), |
|||
} |
|||
err := models.Save(map[string]interface{}{ |
|||
"activity_id=": activityId, |
|||
"is_delete=": 0, |
|||
}, op) |
|||
if err != nil { |
|||
c.Write([]byte("同步失败")) |
|||
return |
|||
} |
|||
|
|||
c.Write([]byte("同步成功")) |
|||
} |
@ -0,0 +1,21 @@ |
|||
package connect |
|||
|
|||
import "encoding/json" |
|||
|
|||
type Message struct { |
|||
Type string `json:"type"` // 对应处理函数 / sync 同步函数,存入数据库 / msg 发送消息,不存入数据库 / login 登录校验某个客户是否存在
|
|||
From string `json:"-"` // 发送方 ==> 自行处理, 无需发送
|
|||
Dest int64 `json:"dest"` // 目标id / 0 代表所有 非0代表某一个
|
|||
Tag string `json:"tag"` // 目标类型 customer代表客户 user代表用户 entry代表录入人员
|
|||
RoomId string `json:"room_id"` // 房间id // activity_id
|
|||
Data map[string]interface{} `json:"data"` |
|||
} |
|||
|
|||
func (m *Message) Encode() []byte { |
|||
buf, _ := json.Marshal(m) |
|||
return buf |
|||
} |
|||
|
|||
func (m *Message) Decode(buf []byte) error { |
|||
return json.Unmarshal(buf, m) |
|||
} |
@ -0,0 +1,187 @@ |
|||
package connect |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"errors" |
|||
"fmt" |
|||
"gopkg.in/olahol/melody.v1" |
|||
"hudongzhuanjia/logger" |
|||
"strings" |
|||
"sync" |
|||
) |
|||
|
|||
// 根据主活动划分房间 ==> 适配现有的可以
|
|||
// 在主活动开始的时候
|
|||
// HandleConnect 处理房间号
|
|||
// HandleMessage 处理逻辑函数 包括登录, 同步 .... 发送到特定的用户
|
|||
// HandleDisconnect 处理房间删除, 链接删除
|
|||
|
|||
// 登录, 用户注册
|
|||
// tag 或者 ?activity_id=? 作为一个标志
|
|||
|
|||
// 同步消息 => 发送给所有的用户 使用协程池
|
|||
|
|||
// NewWebSocket
|
|||
// Connect
|
|||
// Disconnect
|
|||
// Message
|
|||
// Run
|
|||
|
|||
var N = NewNode() |
|||
|
|||
type Node struct { |
|||
conns map[*melody.Session]*Client |
|||
main *Client // 通过client 发送信息
|
|||
rooms map[string]*Room // 房间
|
|||
handles map[string]logic // 逻辑函数
|
|||
rwmux *sync.RWMutex |
|||
} |
|||
|
|||
func NewNode() *Node { |
|||
node := new(Node) |
|||
node.conns = make(map[*melody.Session]*Client, 0) |
|||
node.main = new(Client) |
|||
node.rooms = make(map[string]*Room, 0) |
|||
node.handles = make(map[string]logic, 0) |
|||
node.rwmux = new(sync.RWMutex) |
|||
return node |
|||
} |
|||
|
|||
func (t *Node) handleMelodyConnect(client *Client) { |
|||
logger.Sugar.Infof("[websocket] [connect] [room:%s] [client:%s] %s", client.RoomId, client.Id, client.Request.URL) |
|||
} |
|||
|
|||
// 处理msg
|
|||
func (t *Node) handleMelodyMessage(client *Client, msg *Message) { |
|||
if !client.Online && msg.Type != TypeLogin { |
|||
client.Write([]byte("尚未登录")) |
|||
return |
|||
} |
|||
|
|||
if fn, ok := t.handles[msg.Type]; ok { |
|||
fn(client, msg) |
|||
} |
|||
logger.Sugar.Infof("[websocket] [message] [room:%s] [client:%s] %v", client.RoomId, client.Id, msg) |
|||
} |
|||
|
|||
// 删掉注册的
|
|||
func (t *Node) handleMelodyDisconnect(client *Client) { |
|||
if client.Online && strings.HasPrefix(client.Id, IDCustomer) { |
|||
if client.Pid == 0 { // 主账号下线 通知子账号
|
|||
msg := &Message{ |
|||
Type: TypeNotice, |
|||
Dest: 0, |
|||
Tag: IDCustomer, |
|||
RoomId: client.RoomId, |
|||
From: client.Id, |
|||
Data: map[string]interface{}{ |
|||
"id": client.AccountId, |
|||
"content": "主账号下线", |
|||
}, |
|||
} |
|||
t.Send(msg) |
|||
} else { // 子账号下线 通知主账号
|
|||
msg := &Message{ |
|||
Type: TypeNotice, |
|||
Dest: client.Pid, |
|||
Tag: IDCustomer, |
|||
RoomId: client.RoomId, |
|||
From: client.Id, |
|||
Data: map[string]interface{}{ |
|||
"id": client.AccountId, |
|||
"content": "子账号下线", |
|||
}, |
|||
} |
|||
t.Send(msg) |
|||
} |
|||
} |
|||
// 释放内存
|
|||
if room, ok := t.rooms[client.RoomId]; ok { |
|||
room.DeleteClient(client.Id) |
|||
if room.Len() <= 0 { |
|||
t.DeleteRoom(room.Id) |
|||
} |
|||
} |
|||
logger.Sugar.Infof("[websocket] [disconnect] [room:%s] [client:%s] online=>%v", client.RoomId, client.Id, client.Online) |
|||
} |
|||
|
|||
func (t *Node) RegisterLogic(name string, fn logic) { |
|||
t.handles[name] = fn |
|||
} |
|||
|
|||
func (t *Node) RegisterClient(c *Client) error { |
|||
if room, ok := t.rooms[c.RoomId]; ok { |
|||
room.rwmux.Lock() |
|||
defer room.rwmux.Unlock() |
|||
room.clients[c.Id] = c |
|||
return nil |
|||
} |
|||
return errors.New("加入房间失败") |
|||
} |
|||
|
|||
// 所有人同步
|
|||
func (t *Node) BroadcastAll(msg *Message, c *Client) { |
|||
// 发送给连上ws的所有人
|
|||
m, err := json.Marshal(msg) |
|||
if err != nil { |
|||
c.Write([]byte(err.Error())) |
|||
return |
|||
} |
|||
for _, client := range t.conns { |
|||
if client == c { |
|||
continue |
|||
} |
|||
client.Write(m) |
|||
} |
|||
} |
|||
|
|||
func (t *Node) Send(msg *Message) { |
|||
if msg.Dest == 0 && msg.Tag == "" { // 代表发送给所有人
|
|||
t.Broadcast(msg) |
|||
} else if msg.Dest == 0 && msg.Tag != "" { // 代表发给某个团体
|
|||
t.BroadcastTag(msg) |
|||
} else if msg.Dest != 0 && msg.Tag != "" { |
|||
t.BroadcastDest(msg) |
|||
} else { |
|||
logger.Sugar.Infof("[websocket] [send] msg=>%v", msg) |
|||
} |
|||
} |
|||
|
|||
// 发送给所有的
|
|||
// delete 删除 room client
|
|||
func (t *Node) Broadcast(msg *Message) { |
|||
m := msg.Encode() |
|||
if room, ok := t.rooms[msg.RoomId]; ok && room != nil { |
|||
for _, client := range room.clients { |
|||
if client.Id != msg.From { |
|||
client.Write(m) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (t *Node) BroadcastTag(msg *Message) { |
|||
m := msg.Encode() |
|||
if room, ok := t.rooms[msg.RoomId]; ok { |
|||
for _, client := range room.clients { |
|||
if client.AccountType == msg.Tag && client.Id != msg.From { |
|||
client.Write(m) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (t *Node) BroadcastDest(msg *Message) { |
|||
if room, ok := t.rooms[msg.RoomId]; ok { |
|||
client := room.clients[fmt.Sprintf("%s:%d", msg.Tag, msg.Dest)] |
|||
if client.Id != msg.From { |
|||
client.Write(msg.Encode()) |
|||
} |
|||
} |
|||
} |
|||
|
|||
func (t *Node) DeleteRoom(roomId string) { |
|||
t.rwmux.Lock() |
|||
defer t.rwmux.Unlock() |
|||
delete(t.rooms, roomId) |
|||
} |
@ -0,0 +1,31 @@ |
|||
package connect |
|||
|
|||
import ( |
|||
"sync" |
|||
) |
|||
|
|||
type Room struct { |
|||
Id string |
|||
clients map[string]*Client |
|||
rwmux *sync.RWMutex |
|||
} |
|||
|
|||
func NewRoom(id string) *Room { |
|||
room := new(Room) |
|||
room.Id = id |
|||
room.clients = make(map[string]*Client, 0) |
|||
room.rwmux = new(sync.RWMutex) |
|||
return room |
|||
} |
|||
|
|||
func (t *Room) Len() int { |
|||
t.rwmux.RLock() |
|||
defer t.rwmux.RUnlock() |
|||
return len(t.clients) |
|||
} |
|||
|
|||
func (t *Room) DeleteClient(clientId string) { |
|||
t.rwmux.Lock() |
|||
defer t.rwmux.Unlock() |
|||
delete(t.clients, clientId) |
|||
} |
@ -0,0 +1,76 @@ |
|||
package connect |
|||
|
|||
import ( |
|||
"encoding/json" |
|||
"github.com/ouxuanserver/osmanthuswine/src/core" |
|||
"gopkg.in/olahol/melody.v1" |
|||
"math" |
|||
) |
|||
|
|||
type WsCtl struct { |
|||
core.WebSocket |
|||
node *Node |
|||
} |
|||
|
|||
func (ws *WsCtl) HandleConnect(s *melody.Session) { |
|||
if ws.node == nil { |
|||
ws.node = N |
|||
ws.GetMelody().Config.MaxMessageSize = math.MaxInt16 |
|||
ws.GetMelody().Config.MessageBufferSize = math.MaxInt16 |
|||
} |
|||
ws.node.rwmux.Lock() |
|||
defer ws.node.rwmux.Unlock() |
|||
roomId := s.Request.URL.Query().Get("activity_id") |
|||
if _, ok := ws.node.rooms[roomId]; !ok { // 创建房间
|
|||
ws.node.rooms[roomId] = NewRoom(roomId) |
|||
} |
|||
client := NewClient(s, ws.node, roomId) |
|||
ws.node.conns[s] = client |
|||
ws.node.handleMelodyConnect(client) |
|||
} |
|||
|
|||
func (ws *WsCtl) HandleMessage(s *melody.Session, buf []byte) { |
|||
msg := new(Message) |
|||
err := json.Unmarshal(buf, msg) |
|||
if err != nil { |
|||
s.Write([]byte(err.Error())) |
|||
return |
|||
} |
|||
|
|||
client, ok := ws.node.conns[s] // 鉴定某个链接是否登录
|
|||
if !ok { |
|||
s.Write([]byte("链接无效")) |
|||
return |
|||
} |
|||
|
|||
ws.node.handleMelodyMessage(client, msg) |
|||
} |
|||
|
|||
func (ws *WsCtl) HandleDisconnect(s *melody.Session) { |
|||
if client, ok := ws.node.conns[s]; ok { |
|||
ws.node.handleMelodyDisconnect(client) |
|||
} |
|||
ws.node.rwmux.Lock() |
|||
defer ws.node.rwmux.Unlock() |
|||
delete(ws.node.conns, s) |
|||
} |
|||
|
|||
func (ws *WsCtl) HandleError(s *melody.Session, err error) { |
|||
|
|||
} |
|||
|
|||
func (ws *WsCtl) HandleMessageBinary(s *melody.Session, msg []byte) { |
|||
|
|||
} |
|||
|
|||
func (ws *WsCtl) HandlePong(s *melody.Session) { |
|||
|
|||
} |
|||
|
|||
func (ws *WsCtl) HandleSentMessage(s *melody.Session, msg []byte) { |
|||
|
|||
} |
|||
|
|||
func (ws *WsCtl) HandleSentMessageBinary(*melody.Session, []byte) { |
|||
|
|||
} |
@ -0,0 +1,15 @@ |
|||
package main |
|||
|
|||
import ( |
|||
"github.com/ouxuanserver/osmanthuswine" |
|||
"github.com/ouxuanserver/osmanthuswine/src/core" |
|||
"hudongzhuanjia/hdws/connect" |
|||
_ "hudongzhuanjia/hdws/connect/logics" |
|||
) |
|||
|
|||
func main() { |
|||
// hdws
|
|||
core.GetInstanceRouterManage().Registered(new(connect.WsCtl)) |
|||
core.GetInstanceRouterManage().Registered(new(connect.MessageCtl)) |
|||
osmanthuswine.Run() |
|||
} |
@ -0,0 +1,11 @@ |
|||
{ |
|||
"db": { |
|||
"host": "gz-cdb-onvbcfqn.sql.tencentcdb.com", |
|||
"port": "61232", |
|||
"user": "root", |
|||
"password": "Pox2210XXa@", |
|||
"name": "hudongzhuanjia", |
|||
"prefix": "ox_", |
|||
"max_open_conn": 500 |
|||
} |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue