黄梓健 5 years ago
parent
commit
4b80120c74
  1. 65
      connect/logics/login.go
  2. 12
      connect/logics/msg.go
  3. 8
      main.go
  4. 11
      ws/client.go
  5. 2
      ws/define.go
  6. 2
      ws/http.go
  7. 65
      ws/jwt_go.go
  8. 71
      ws/login.go
  9. 2
      ws/message.go
  10. 10
      ws/msg.go
  11. 26
      ws/node.go
  12. 2
      ws/room.go
  13. 9
      ws/timer.go
  14. 18
      ws/util.go
  15. 8
      ws/ws.go

65
connect/logics/login.go

@ -1,65 +0,0 @@
package logics
import (
"fmt"
"hudongzhuanjia/hdws/connect"
"hudongzhuanjia/libs/jwt"
)
func init() {
connect.N.RegisterLogic(connect.LogicLogin, loginFunc)
}
func loginFunc(c *connect.Client, msg *connect.Message) {
if _, ok := msg.Data["token"]; !ok {
c.Write([]byte("token不能为空"))
return
}
claims, err := jwt.ParseAccessToken(msg.Data["token"].(string))
if err != nil {
c.Write([]byte(err.Error()))
return
}
c.Online = true
c.Id = fmt.Sprintf("%s:%d", claims.AccountType, claims.AccountId)
c.AccountId = claims.AccountId
c.AccountType = claims.AccountType
c.Pid = claims.CustomerPid
if err := c.Register(); err != nil {
c.Write([]byte(err.Error()))
return
}
if claims.AccountType == connect.IDCustomer {
if c.Pid == 0 { // 主账号发给子账号
msg := &connect.Message{
Type: connect.TypeNotice,
Dest: 0,
Tag: connect.IDCustomer,
RoomId: c.RoomId,
From: c.Id,
Data: map[string]interface{}{
"content": "主账号上线",
"nickname": "",
"username": claims.Username,
},
}
c.Send(msg)
} else { // 子账号发给主账号
msg := &connect.Message{
Type: connect.TypeNotice,
Dest: claims.CustomerPid,
Tag: connect.IDCustomer,
RoomId: c.RoomId,
From: c.Id,
Data: map[string]interface{}{
"content": "子账号上线",
"nickname": "",
"username": claims.Username,
},
}
c.Send(msg)
}
}
c.Write([]byte("登录成功"))
}

12
connect/logics/msg.go

@ -1,12 +0,0 @@
package logics
import "hudongzhuanjia/hdws/connect"
func init() {
connect.N.RegisterLogic("msg", msgFunc)
}
// 简单明了
func msgFunc(c *connect.Client, msg *connect.Message) {
c.Send(msg)
}

8
main.go

@ -3,13 +3,13 @@ package main
import ( import (
"github.com/ouxuanserver/osmanthuswine" "github.com/ouxuanserver/osmanthuswine"
"github.com/ouxuanserver/osmanthuswine/src/core" "github.com/ouxuanserver/osmanthuswine/src/core"
"hdws/connect"
_ "hdws/connect/logics"
"hdws/ws"
_ "hdws/ws/logics"
) )
func main() { func main() {
// hdws // hdws
core.GetInstanceRouterManage().Registered(new(connect.WsCtl))
core.GetInstanceRouterManage().Registered(new(connect.MessageCtl))
core.GetInstanceRouterManage().Registered(new(ws.WsCtl))
core.GetInstanceRouterManage().Registered(new(ws.MessageCtl))
osmanthuswine.Run() osmanthuswine.Run()
} }

11
connect/client.go → ws/client.go

@ -1,6 +1,7 @@
package connect
package ws
import ( import (
"encoding/json"
"gopkg.in/olahol/melody.v1" "gopkg.in/olahol/melody.v1"
) )
@ -35,3 +36,11 @@ func (c *Client) Register() error {
func (c *Client) Send(msg *Message) { func (c *Client) Send(msg *Message) {
c.node.Send(msg) c.node.Send(msg)
} }
func (c *Client) WriteJson(body interface{}) error {
bs, err := json.Marshal(&body)
if err != nil {
return err
}
return c.Write(bs)
}

2
connect/define.go → ws/define.go

@ -1,4 +1,4 @@
package connect
package ws
type logic func(client *Client, msg *Message) type logic func(client *Client, msg *Message)

2
connect/http.go → ws/http.go

@ -1,4 +1,4 @@
package connect
package ws
import ( import (
"fmt" "fmt"

65
ws/jwt_go.go

@ -0,0 +1,65 @@
package ws
import (
"errors"
"fmt"
"time"
"github.com/dgrijalva/jwt-go"
"github.com/ouxuanserver/osmanthuswine/src/helper"
)
type Claims struct {
AccountType string
AccountId int64
CustomerId int64
CustomerPid int64
ActivityId int64
AreaId int64
jwt.StandardClaims
}
func GenJwtToken(accountType string, accountId, customerId, customerPid, areaId, activityId int64) (string, error) {
claims := Claims{
accountType,
accountId,
customerId,
customerPid,
activityId,
areaId,
jwt.StandardClaims{
ExpiresAt: time.Now().Add(time.Duration(24000) * time.Hour).Unix(),
Id: helper.CreateUUID(),
Issuer: Issuer,
Subject: Subject,
Audience: Audience,
},
}
t := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return t.SignedString([]byte(Secret))
}
const Secret = "osmanthuswine-very-secret"
const Issuer = "osmanthuswine-issuer-ox"
const Subject = "osmanthuswine-subject-ox"
const Audience = "osmanthuswine-audience-ox"
func ParseAccessToken(accessToken string) (*Claims, error) {
var claims = &Claims{}
token, err := jwt.ParseWithClaims(accessToken, claims, func(token *jwt.Token) (i interface{}, e error) {
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
}
return []byte(Secret), nil
})
if token == nil {
return claims, errors.New("token invalid")
}
if !claims.VerifyExpiresAt(time.Now().Unix(), true) {
return nil, errors.New("token expired")
}
return claims, err
}

71
ws/login.go

@ -0,0 +1,71 @@
package ws
import (
"fmt"
)
func init() {
N.RegisterLogic(LogicLogin, loginFunc)
}
func loginFunc(c *Client, msg *Message) {
if _, ok := msg.Data["token"]; !ok {
_ = c.WriteJson(map[string]interface{}{
"msg": "token不能为空",
"code": 506,
})
return
}
claims, err := ParseAccessToken(msg.Data["token"].(string))
if err != nil {
_ = c.WriteJson(map[string]interface{}{
"msg": "token解析出错, " + err.Error(),
"code": 507,
})
return
}
c.Online = true
c.Id = fmt.Sprintf("%s:%d", claims.AccountType, claims.AccountId)
c.AccountId = claims.AccountId
c.AccountType = claims.AccountType
c.Pid = claims.CustomerPid
if err = c.Register(); err != nil {
_ = c.WriteJson(map[string]interface{}{
"msg": err.Error(),
"code": 404,
})
return
}
if claims.AccountType == IDCustomer {
if c.Pid == 0 { // 主账号发给子账号
m := &Message{
Type: TypeNotice,
Dest: 0,
Tag: IDCustomer,
RoomId: c.RoomId,
From: c.Id,
Data: map[string]interface{}{
"content": "主账号上线",
},
}
c.Send(m)
} else { // 子账号发给主账号
m := &Message{
Type: TypeNotice,
Dest: claims.CustomerPid,
Tag: IDCustomer,
RoomId: c.RoomId,
From: c.Id,
Data: map[string]interface{}{
"content": "子账号上线",
},
}
c.Send(m)
}
}
_ = c.WriteJson(map[string]interface{}{
"msg": "登录成功",
"code": 200,
})
}

2
connect/message.go → ws/message.go

@ -1,4 +1,4 @@
package connect
package ws
import "encoding/json" import "encoding/json"

10
ws/msg.go

@ -0,0 +1,10 @@
package ws
func init() {
N.RegisterLogic("msg", msgFunc)
}
// 简单明了
func msgFunc(c *Client, msg *Message) {
c.Send(msg)
}

26
connect/node.go → ws/node.go

@ -1,7 +1,6 @@
package connect
package ws
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
@ -48,13 +47,17 @@ func NewNode() *Node {
} }
func (t *Node) handleMelodyConnect(client *Client) { func (t *Node) handleMelodyConnect(client *Client) {
log.Printf("[websocket] [connect] [room:%s] [client:%s] %s\n", client.RoomId, client.Id, client.Request.URL)
log.Printf("[websocket] [ws] [room:%s] [client:%s] %s\n", client.RoomId, client.Id, client.Request.URL)
} }
// 处理msg // 处理msg
func (t *Node) handleMelodyMessage(client *Client, msg *Message) { func (t *Node) handleMelodyMessage(client *Client, msg *Message) {
if !client.Online && msg.Type != TypeLogin { if !client.Online && msg.Type != TypeLogin {
client.Write([]byte("尚未登录"))
_ = client.WriteJson(map[string]interface{}{
"msg": "尚未登录",
"code": 504,
})
//client.Write([]byte("尚未登录"))
return return
} }
@ -122,16 +125,12 @@ func (t *Node) RegisterClient(c *Client) error {
// 所有人同步 // 所有人同步
func (t *Node) BroadcastAll(msg *Message, c *Client) { func (t *Node) BroadcastAll(msg *Message, c *Client) {
// 发送给连上ws的所有人 // 发送给连上ws的所有人
m, err := json.Marshal(msg)
if err != nil {
c.Write([]byte(err.Error()))
return
}
m := msg.Encode()
for _, client := range t.conns { for _, client := range t.conns {
if client == c { if client == c {
continue continue
} }
client.Write(m)
_ = client.Write(m)
} }
} }
@ -154,7 +153,7 @@ func (t *Node) Broadcast(msg *Message) {
if room, ok := t.rooms[msg.RoomId]; ok && room != nil { if room, ok := t.rooms[msg.RoomId]; ok && room != nil {
for _, client := range room.clients { for _, client := range room.clients {
if client.Id != msg.From { if client.Id != msg.From {
client.Write(m)
_ = client.Write(m)
} }
} }
} }
@ -165,17 +164,18 @@ func (t *Node) BroadcastTag(msg *Message) {
if room, ok := t.rooms[msg.RoomId]; ok { if room, ok := t.rooms[msg.RoomId]; ok {
for _, client := range room.clients { for _, client := range room.clients {
if client.AccountType == msg.Tag && client.Id != msg.From { if client.AccountType == msg.Tag && client.Id != msg.From {
client.Write(m)
_ = client.Write(m)
} }
} }
} }
} }
func (t *Node) BroadcastDest(msg *Message) { func (t *Node) BroadcastDest(msg *Message) {
m := msg.Encode()
if room, ok := t.rooms[msg.RoomId]; ok { if room, ok := t.rooms[msg.RoomId]; ok {
client := room.clients[fmt.Sprintf("%s:%d", msg.Tag, msg.Dest)] client := room.clients[fmt.Sprintf("%s:%d", msg.Tag, msg.Dest)]
if client.Id != msg.From { if client.Id != msg.From {
client.Write(msg.Encode())
_ = client.Write(m)
} }
} }
} }

2
connect/room.go → ws/room.go

@ -1,4 +1,4 @@
package connect
package ws
import ( import (
"sync" "sync"

9
connect/logics/sync.go → ws/timer.go

@ -1,18 +1,17 @@
package logics
package ws
import ( import (
"encoding/json" "encoding/json"
"hudongzhuanjia/hdws/connect"
"hudongzhuanjia/models" "hudongzhuanjia/models"
"time" "time"
) )
func init() { func init() {
connect.N.RegisterLogic(connect.LogicSync, syncFunc)
N.RegisterLogic(LogicSync, syncFunc)
} }
func syncFunc(c *connect.Client, msg *connect.Message) {
if c.AccountType != connect.IDCustomer && c.Pid != 0 {
func syncFunc(c *Client, msg *Message) {
if c.AccountType != IDCustomer && c.Pid != 0 {
c.Write([]byte("此账号无此权限")) c.Write([]byte("此账号无此权限"))
return return
} }

18
ws/util.go

@ -0,0 +1,18 @@
package ws
import (
"encoding/json"
"errors"
"gopkg.in/olahol/melody.v1"
)
func WriteJsonWithSession(s *melody.Session, body interface{}) error {
if s == nil {
return errors.New("session is nil")
}
bs, err := json.Marshal(body)
if err != nil {
return err
}
return s.Write(bs)
}

8
connect/ws.go → ws/ws.go

@ -1,4 +1,4 @@
package connect
package ws
import ( import (
"encoding/json" "encoding/json"
@ -39,7 +39,11 @@ func (ws *WsCtl) HandleMessage(s *melody.Session, buf []byte) {
client, ok := ws.node.conns[s] // 鉴定某个链接是否登录 client, ok := ws.node.conns[s] // 鉴定某个链接是否登录
if !ok { if !ok {
s.Write([]byte("链接无效"))
WriteJsonWithSession(s, map[string]interface{}{
"msg": "链接无效",
"code": 505,
})
//s.Write([]byte("链接无效"))
return return
} }
Loading…
Cancel
Save