Browse Source

fix:bug

master
黄梓健 5 years ago
parent
commit
7628da699c
  1. 12
      controllers/pc/activity.go
  2. 22
      hdws/.gitignore
  3. 5
      hdws/build.bat
  4. 8
      hdws/config.json
  5. 14
      hdws/main.go
  6. 11
      hdws/private.json
  7. 46
      hdws/ws/client.go
  8. 14
      hdws/ws/define.go
  9. 34
      hdws/ws/http.go
  10. 65
      hdws/ws/jwt_go.go
  11. 71
      hdws/ws/login.go
  12. 21
      hdws/ws/message.go
  13. 10
      hdws/ws/msg.go
  14. 186
      hdws/ws/node.go
  15. 31
      hdws/ws/room.go
  16. 71
      hdws/ws/timer.go
  17. 18
      hdws/ws/util.go
  18. 80
      hdws/ws/ws.go
  19. 1
      models/activity.go
  20. 27
      models/arch.go
  21. 69
      utils/define/config.go

12
controllers/pc/activity.go

@ -6,6 +6,7 @@ import (
"hudongzhuanjia/models"
"hudongzhuanjia/utils/code"
"hudongzhuanjia/utils/define"
"time"
)
//活动
@ -35,7 +36,7 @@ func (t *ActivityCtl) StartActivity() {
t.ERROR("该活动在彩排中", code.MSG_ERR)
}
if mode == 1 {
if mode == 1 { // 彩排
rehearsal := models.Rehearsal{
CustomerId: uid,
ActivityId: activityId,
@ -44,9 +45,16 @@ func (t *ActivityCtl) StartActivity() {
IsDelete: false,
}
_, err = models.Add(&rehearsal)
//core.GetXormAuto().InsertOne(rehearsal)
t.CheckErr(err)
activity.RehearsalId = rehearsal.Id
} else { //非彩排
arch := &models.Arch{}
count, err := arch.Count(activity.Id)
t.CheckErr(err)
arch.Date = time.Now().Format("2006-01-02")
arch.Name = fmt.Sprintf("第%d场", count)
_, err = models.Update(activity.ArchId, arch, "name", "date")
t.CheckErr(err)
}
activity.Status = define.StatusRunning

22
hdws/.gitignore

@ -0,0 +1,22 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
.idea
hudongzhuanjia-go-build
hudongzhuanjia
go_build_main_go
.vscode
.log
services/pay/bindata.go

5
hdws/build.bat

@ -0,0 +1,5 @@
SET CGO_ENABLED=0
SET GOOS=linux
SET GOARCH=amd64
go build main.go

8
hdws/config.json

@ -0,0 +1,8 @@
{
"port": "20182",
"host": "127.0.0.1",
"cross_domain": "*",
"post_max_memory": 1024000,
"api_router": "/PcClient/*",
"update_path": "example_update"
}

14
hdws/main.go

@ -0,0 +1,14 @@
package main
import (
"github.com/ouxuanserver/osmanthuswine"
"github.com/ouxuanserver/osmanthuswine/src/core"
"hudongzhuanjia/hdws/ws"
)
func main() {
// hdws
core.GetInstanceRouterManage().Registered(new(ws.WsCtl))
core.GetInstanceRouterManage().Registered(new(ws.MessageCtl))
osmanthuswine.Run()
}

11
hdws/private.json

@ -0,0 +1,11 @@
{
"db": {
"host": "ouxuanhudongapi.t.3pr.com.cn",
"port": "3306",
"user": "ouxuanhudongtest",
"password": "wXrzerxEBGTYs28M",
"name": "ouxuanhudongtest",
"prefix": "ox_",
"max_open_conn": 500
}
}

46
hdws/ws/client.go

@ -0,0 +1,46 @@
package ws
import (
"encoding/json"
"gopkg.in/olahol/melody.v1"
)
type Client struct {
Id string // account_typ:account_id
AccountId int64
AccountType string
Pid int64
Online bool
RoomId string
node *Node
*melody.Session
}
func NewClient(session *melody.Session, node *Node, roomId string) *Client {
client := new(Client)
client.Session = session
client.node = node
client.Online = false
client.RoomId = roomId
return client
}
func (c *Client) BroadcastAll(msg *Message) {
c.node.BroadcastAll(msg, c)
}
func (c *Client) Register() error {
return c.node.RegisterClient(c)
}
func (c *Client) Send(msg *Message) {
c.node.Send(msg)
}
func (c *Client) WriteJson(body interface{}) error {
bs, err := json.Marshal(&body)
if err != nil {
return err
}
return c.Write(bs)
}

14
hdws/ws/define.go

@ -0,0 +1,14 @@
package ws
type logic func(client *Client, msg *Message)
const IDCustomer = "customer"
const IDUser = "user"
const IDEntry = "entry"
const TypeNotice = "notice"
const TypeLogin = "login"
const LogicLogin = "login"
const LogicSync = "sync"
const PoolSize = 10000

34
hdws/ws/http.go

@ -0,0 +1,34 @@
package ws
import (
"fmt"
"github.com/ouxuanserver/osmanthuswine/src/core"
"time"
)
type MessageCtl struct {
core.Controller
}
var now = time.Now()
func (t *MessageCtl) Checkin() {
t.DisplayByData(map[string]interface{}{
"version": now,
"status": "success",
})
}
func (t *MessageCtl) Receive() {
msg := new(Message)
err := t.RequestToStruct(msg)
if err != nil {
t.DisplayByError(fmt.Sprintf("信息发送失败, error: %v", err), 0)
}
go N.Send(msg)
// 通知特定的大屏幕和用户
t.DisplayByData(map[string]interface{}{
"date": time.Now(),
"msg": "success",
})
}

65
hdws/ws/jwt_go.go

@ -0,0 +1,65 @@
package ws
import (
"errors"
"fmt"
"time"
"github.com/dgrijalva/jwt-go"
"github.com/ouxuanserver/osmanthuswine/src/helper"
)
type Claims struct {
AccountType string
AccountId int64
CustomerId int64
CustomerPid int64
ActivityId int64
AreaId int64
jwt.StandardClaims
}
func GenJwtToken(accountType string, accountId, customerId, customerPid, areaId, activityId int64) (string, error) {
claims := Claims{
accountType,
accountId,
customerId,
customerPid,
activityId,
areaId,
jwt.StandardClaims{
ExpiresAt: time.Now().Add(time.Duration(24000) * time.Hour).Unix(),
Id: helper.CreateUUID(),
Issuer: Issuer,
Subject: Subject,
Audience: Audience,
},
}
t := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return t.SignedString([]byte(Secret))
}
const Secret = "osmanthuswine-very-secret"
const Issuer = "osmanthuswine-issuer-ox"
const Subject = "osmanthuswine-subject-ox"
const Audience = "osmanthuswine-audience-ox"
func ParseAccessToken(accessToken string) (*Claims, error) {
var claims = &Claims{}
token, err := jwt.ParseWithClaims(accessToken, claims, func(token *jwt.Token) (i interface{}, e error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
}
return []byte(Secret), nil
})
if token == nil {
return claims, errors.New("token invalid")
}
if !claims.VerifyExpiresAt(time.Now().Unix(), true) {
return nil, errors.New("token expired")
}
return claims, err
}

71
hdws/ws/login.go

@ -0,0 +1,71 @@
package ws
import (
"fmt"
)
func init() {
N.RegisterLogic(LogicLogin, loginFunc)
}
func loginFunc(c *Client, msg *Message) {
if _, ok := msg.Data["token"]; !ok {
_ = c.WriteJson(map[string]interface{}{
"msg": "token不能为空",
"code": 506,
})
return
}
claims, err := ParseAccessToken(msg.Data["token"].(string))
if err != nil {
_ = c.WriteJson(map[string]interface{}{
"msg": "token解析出错, " + err.Error(),
"code": 507,
})
return
}
c.Online = true
c.Id = fmt.Sprintf("%s:%d", claims.AccountType, claims.AccountId)
c.AccountId = claims.AccountId
c.AccountType = claims.AccountType
c.Pid = claims.CustomerPid
if err = c.Register(); err != nil {
_ = c.WriteJson(map[string]interface{}{
"msg": err.Error(),
"code": 404,
})
return
}
if claims.AccountType == IDCustomer {
if c.Pid == 0 { // 主账号发给子账号
m := &Message{
Type: TypeNotice,
Dest: 0,
Tag: IDCustomer,
RoomId: c.RoomId,
From: c.Id,
Data: map[string]interface{}{
"content": "主账号上线",
},
}
c.Send(m)
} else { // 子账号发给主账号
m := &Message{
Type: TypeNotice,
Dest: claims.CustomerPid,
Tag: IDCustomer,
RoomId: c.RoomId,
From: c.Id,
Data: map[string]interface{}{
"content": "子账号上线",
},
}
c.Send(m)
}
}
_ = c.WriteJson(map[string]interface{}{
"msg": "登录成功",
"code": 200,
})
}

21
hdws/ws/message.go

@ -0,0 +1,21 @@
package ws
import "encoding/json"
type Message struct {
Type string `json:"type"` // 对应处理函数 / sync 同步函数,存入数据库 / msg 发送消息,不存入数据库 / login 登录校验某个客户是否存在
From string `json:"-"` // 发送方 ==> 自行处理, 无需发送
Dest int64 `json:"dest"` // 目标id / 0 代表所有 非0代表某一个
Tag string `json:"tag"` // 目标类型 customer代表客户 user代表用户 entry代表录入人员
RoomId string `json:"room_id"` // 房间id // activity_id
Data map[string]interface{} `json:"data"`
}
func (m *Message) Encode() []byte {
buf, _ := json.Marshal(m)
return buf
}
func (m *Message) Decode(buf []byte) error {
return json.Unmarshal(buf, m)
}

10
hdws/ws/msg.go

@ -0,0 +1,10 @@
package ws
func init() {
N.RegisterLogic("msg", msgFunc)
}
// 简单明了
func msgFunc(c *Client, msg *Message) {
c.Send(msg)
}

186
hdws/ws/node.go

@ -0,0 +1,186 @@
package ws
import (
"errors"
"fmt"
"github.com/rs/zerolog/log"
"gopkg.in/olahol/melody.v1"
"strings"
"sync"
)
// 根据主活动划分房间 ==> 适配现有的可以
// 在主活动开始的时候
// HandleConnect 处理房间号
// HandleMessage 处理逻辑函数 包括登录, 同步 .... 发送到特定的用户
// HandleDisconnect 处理房间删除, 链接删除
// 登录, 用户注册
// tag 或者 ?activity_id=? 作为一个标志
// 同步消息 => 发送给所有的用户 使用协程池
// NewWebSocket
// Connect
// Disconnect
// Message
// Run
var N = NewNode()
type Node struct {
conns map[*melody.Session]*Client
main *Client // 通过client 发送信息
rooms map[string]*Room // 房间
handles map[string]logic // 逻辑函数
rwmux *sync.RWMutex
}
func NewNode() *Node {
node := new(Node)
node.conns = make(map[*melody.Session]*Client, 0)
node.main = new(Client)
node.rooms = make(map[string]*Room, 0)
node.handles = make(map[string]logic, 0)
node.rwmux = new(sync.RWMutex)
return node
}
func (t *Node) handleMelodyConnect(client *Client) {
log.Printf("[websocket] [ws] [room:%s] [client:%s] %s\n", client.RoomId, client.Id, client.Request.URL)
}
// 处理msg
func (t *Node) handleMelodyMessage(client *Client, msg *Message) {
if !client.Online && msg.Type != TypeLogin {
_ = client.WriteJson(map[string]interface{}{
"msg": "尚未登录",
"code": 504,
})
return
}
if fn, ok := t.handles[msg.Type]; ok {
fn(client, msg)
}
log.Printf("[websocket] [message] [room:%s] [client:%s] %v\n", client.RoomId, client.Id, msg)
}
// 删掉注册的
func (t *Node) handleMelodyDisconnect(client *Client) {
if client.Online && strings.HasPrefix(client.Id, IDCustomer) {
if client.Pid == 0 { // 主账号下线 通知子账号
msg := &Message{
Type: TypeNotice,
Dest: 0,
Tag: IDCustomer,
RoomId: client.RoomId,
From: client.Id,
Data: map[string]interface{}{
"id": client.AccountId,
"content": "主账号下线",
},
}
t.Send(msg)
} else { // 子账号下线 通知主账号
msg := &Message{
Type: TypeNotice,
Dest: client.Pid,
Tag: IDCustomer,
RoomId: client.RoomId,
From: client.Id,
Data: map[string]interface{}{
"id": client.AccountId,
"content": "子账号下线",
},
}
t.Send(msg)
}
}
// 释放内存
if room, ok := t.rooms[client.RoomId]; ok {
room.DeleteClient(client.Id)
if room.Len() <= 0 {
t.DeleteRoom(room.Id)
}
}
log.Printf("[websocket] [disconnect] [room:%s] [client:%s] online=>%v\n", client.RoomId, client.Id, client.Online)
}
func (t *Node) RegisterLogic(name string, fn logic) {
t.handles[name] = fn
}
func (t *Node) RegisterClient(c *Client) error {
if room, ok := t.rooms[c.RoomId]; ok {
room.rwmux.Lock()
defer room.rwmux.Unlock()
room.clients[c.Id] = c
return nil
}
return errors.New("加入房间失败")
}
// 所有人同步
func (t *Node) BroadcastAll(msg *Message, c *Client) {
// 发送给连上ws的所有人
m := msg.Encode()
for _, client := range t.conns {
if client == c {
continue
}
_ = client.Write(m)
}
}
func (t *Node) Send(msg *Message) {
if msg.Dest == 0 && msg.Tag == "" { // 代表发送给所有人
t.Broadcast(msg)
} else if msg.Dest == 0 && msg.Tag != "" { // 代表发给某个团体
t.BroadcastTag(msg)
} else if msg.Dest != 0 && msg.Tag != "" {
t.BroadcastDest(msg)
} else {
log.Printf("[websocket] [send] msg=>%v", msg)
}
}
// 发送给所有的
// delete 删除 room client
func (t *Node) Broadcast(msg *Message) {
m := msg.Encode()
if room, ok := t.rooms[msg.RoomId]; ok && room != nil {
for _, client := range room.clients {
if client.Id != msg.From {
_ = client.Write(m)
}
}
}
}
func (t *Node) BroadcastTag(msg *Message) {
m := msg.Encode()
if room, ok := t.rooms[msg.RoomId]; ok {
for _, client := range room.clients {
if client.AccountType == msg.Tag && client.Id != msg.From {
_ = client.Write(m)
}
}
}
}
func (t *Node) BroadcastDest(msg *Message) {
m := msg.Encode()
if room, ok := t.rooms[msg.RoomId]; ok {
client := room.clients[fmt.Sprintf("%s:%d", msg.Tag, msg.Dest)]
if client.Id != msg.From {
_ = client.Write(m)
}
}
}
func (t *Node) DeleteRoom(roomId string) {
t.rwmux.Lock()
defer t.rwmux.Unlock()
delete(t.rooms, roomId)
}

31
hdws/ws/room.go

@ -0,0 +1,31 @@
package ws
import (
"sync"
)
type Room struct {
Id string
clients map[string]*Client
rwmux *sync.RWMutex
}
func NewRoom(id string) *Room {
room := new(Room)
room.Id = id
room.clients = make(map[string]*Client, 0)
room.rwmux = new(sync.RWMutex)
return room
}
func (t *Room) Len() int {
t.rwmux.RLock()
defer t.rwmux.RUnlock()
return len(t.clients)
}
func (t *Room) DeleteClient(clientId string) {
t.rwmux.Lock()
defer t.rwmux.Unlock()
delete(t.clients, clientId)
}

71
hdws/ws/timer.go

@ -0,0 +1,71 @@
package ws
import (
"encoding/json"
"hudongzhuanjia/models"
"time"
)
func init() {
N.RegisterLogic(LogicSync, syncFunc)
}
func syncFunc(c *Client, msg *Message) {
if c.AccountType != IDCustomer && c.Pid != 0 {
c.Write([]byte("此账号无此权限"))
return
}
operation, ok := msg.Data["operation"]
if !ok {
operation = ""
}
areaId, ok := msg.Data["area_id"]
if !ok {
areaId = 0.0
}
moduleActivityId, ok := msg.Data["module_activity_id"]
if !ok {
moduleActivityId = 0.0
}
moduleActivityType, ok := msg.Data["module_activity_type"]
if !ok {
moduleActivityType = ""
}
activityId, ok := msg.Data["activity_id"]
if !ok {
activityId = 0.0
}
extraData := []byte("")
if otherData, ok := msg.Data["other_data"]; ok {
extraData, _ = json.Marshal(otherData)
}
// 发送给所有的账户
msg.Tag = ""
msg.Dest = 0
msg.RoomId = c.RoomId
go c.Send(msg)
op := &models.CustomerOperation{
CustomerId: c.AccountId,
Operation: operation.(string),
AreaId: int64(areaId.(float64)),
ModuleActivityId: int64(moduleActivityId.(float64)),
ModuleActivityType: moduleActivityType.(string),
ActivityId: int64(activityId.(float64)),
ExtraData: string(extraData),
UpdatedAt: time.Now(),
CreatedAt: time.Now(),
}
err := models.Save(map[string]interface{}{
"activity_id=": activityId,
"is_delete=": 0,
}, op)
if err != nil {
c.Write([]byte("同步失败"))
return
}
c.Write([]byte("同步成功"))
}

18
hdws/ws/util.go

@ -0,0 +1,18 @@
package ws
import (
"encoding/json"
"errors"
"gopkg.in/olahol/melody.v1"
)
func WriteJsonWithSession(s *melody.Session, body interface{}) error {
if s == nil {
return errors.New("session is nil")
}
bs, err := json.Marshal(body)
if err != nil {
return err
}
return s.Write(bs)
}

80
hdws/ws/ws.go

@ -0,0 +1,80 @@
package ws
import (
"encoding/json"
"github.com/ouxuanserver/osmanthuswine/src/core"
"gopkg.in/olahol/melody.v1"
"math"
)
type WsCtl struct {
core.WebSocket
node *Node
}
func (ws *WsCtl) HandleConnect(s *melody.Session) {
if ws.node == nil {
ws.node = N
ws.GetMelody().Config.MaxMessageSize = math.MaxInt16
ws.GetMelody().Config.MessageBufferSize = math.MaxInt16
}
ws.node.rwmux.Lock()
defer ws.node.rwmux.Unlock()
roomId := s.Request.URL.Query().Get("activity_id")
if _, ok := ws.node.rooms[roomId]; !ok { // 创建房间
ws.node.rooms[roomId] = NewRoom(roomId)
}
client := NewClient(s, ws.node, roomId)
ws.node.conns[s] = client
ws.node.handleMelodyConnect(client)
}
func (ws *WsCtl) HandleMessage(s *melody.Session, buf []byte) {
msg := new(Message)
err := json.Unmarshal(buf, msg)
if err != nil {
s.Write([]byte(err.Error()))
return
}
client, ok := ws.node.conns[s] // 鉴定某个链接是否登录
if !ok {
WriteJsonWithSession(s, map[string]interface{}{
"msg": "链接无效",
"code": 505,
})
//s.Write([]byte("链接无效"))
return
}
ws.node.handleMelodyMessage(client, msg)
}
func (ws *WsCtl) HandleDisconnect(s *melody.Session) {
if client, ok := ws.node.conns[s]; ok {
ws.node.handleMelodyDisconnect(client)
}
ws.node.rwmux.Lock()
defer ws.node.rwmux.Unlock()
delete(ws.node.conns, s)
}
func (ws *WsCtl) HandleError(s *melody.Session, err error) {
}
func (ws *WsCtl) HandleMessageBinary(s *melody.Session, msg []byte) {
}
func (ws *WsCtl) HandlePong(s *melody.Session) {
}
func (ws *WsCtl) HandleSentMessage(s *melody.Session, msg []byte) {
}
func (ws *WsCtl) HandleSentMessageBinary(*melody.Session, []byte) {
}

1
models/activity.go

@ -10,6 +10,7 @@ const ActivityTableName = TableNamePrefix + "activity"
type Activity struct {
Id int64 `json:"id" xorm:"pk autoincr INT(11)"`
CustomerId int64 `json:"customer_id" xorm:"not null default(0) comment('customer_id, 创建客户id') INT(11)"`
ArchId int64 `json:"arch_id" xorm:"not null default 0 comment('归档id') INT(11)"`
Services []*ActivityModuleService `json:"services,omitempty" xorm:"-" description:"主活动下的服务"`
AreaStores []*AreaStore `json:"area_stores,omitempty" xorm:"-" description:"地区"`
BarrageStatus string `json:"barrage_status,omitempty" xorm:"-" description:"弹幕服务状态"`

27
models/arch.go

@ -0,0 +1,27 @@
package models
import (
"github.com/ouxuanserver/osmanthuswine/src/core"
"time"
)
const ArchTableName = TableNamePrefix + "arch"
type Arch struct {
Id int `json:"id"`
CustomerId int `json:"customer_id"`
ActivityId int `json:"activity_id"`
Date string `json:"date"`
Name string `json:"name"`
IsDelete int `json:"is_delete"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
func (t *Arch) TableName() string {
return ArchTableName
}
func (t *Arch) Count(activityId interface{}) (int64, error) {
return core.GetXormAuto().Where("is_delete=0 and activity_id=?", activityId).Get(t)
}

69
utils/define/config.go

@ -0,0 +1,69 @@
package define
var IsDebugging = true
func SetDebug(modes ...bool) {
if len(modes) > 0 {
IsDebugging = modes[0]
} else {
IsDebugging = false
}
if IsDebugging {
//RoomPrefix = "test_"
HOST = "https://hdzj.utools.club"
H5Host = "http://ouxuanhudongtest.t.3pr.com.cn/web"
SendUrl = "https://hdzjws.utools.club/PcClient/Ws/MessageCtl/Receive"
}
}
var (
SendUrl = "https://api.ouxuanhudong.com:20182/PcClient/Connect/MessageCtl/Receive"
RoomPrefix = ""
HOST = "https://api.ouxuanhudong.com"
H5Host = "https://h5.ouxuanhudong.com/web"
)
const (
H5Index = "index.html"
H5SignIn = "SignIn.html"
H5ShakeRb = "shakeRb.html"
H5TugOfWar = "tugOfWar.html"
H5UpperWall = "UpperWall.html"
H5Barrage = "barrage.html"
H5Order = "order.html"
H5Reward = "reward.html"
H5BScreen = "bScreen.html"
H5Auction = "auction.html"
H5Vote = "vote.html"
H5Calorie = "calorie.html"
)
// 固定长度
var DefaultOrderNo = 10000000000
// 微信常量
const (
// 欧轩互动 -> 普通商户
ApiKey = `2c82c64ceec6ba89ffc9f593c671a12f`
WxAppId = `wx7b0bcf476552c5e9`
Secret = `f6aabdd40ea25272f4442603a7dc8028`
AppId = `wx7b0bcf476552c5e9`
MchId = `1394404502`
SubMchId = ``
// 欧轩 -> 服务商
//ApiKey = `6e281c8b5430c674034594cab789334F`
//AppId = `wx662a1633304bfd42`
//WxAppId = `wx662a1633304bfd42`
//Secret = `7e4ecfe06ad1e075c210059d5b0162a3`
//MchId = `1441266702`
//SubMchId = `1394404502`
//SubMchId = `1594049151`
// 认证服务号:欧轩(用于申请微信服务商)
// AppID:wx662a1633304bfd42
// 密钥:7e4ecfe06ad1e075c210059d5b0162a3
WxHost = `https://api.mch.weixin.qq.com`
WxBack = `https://api2.mch.weixin.qq.com`
ClientIp = `123.207.246.51`
)
Loading…
Cancel
Save