[Go语言]一种用于网游服务器的支持多路复用的网络协议处理框架
- 玩家连接登录服务器
- 登录服务器向数据库请求玩家数据
- 登录服务器获取到玩家数据,把玩家数据转发给游戏服务器进行加载包括创建玩家对象等
- 登录服务器获取到加载成功回应后,通知玩家客户端可以进入游戏世界
- 发送一个数据包并等待回应。比如登录服务器等待游戏服务器加载玩家数据的结果通知。
- 发送一个数据包,不需要回应。比如游戏服务器加载玩家数据后,给登录服务器发送结果通知。
- 如果一个协议包需要等待回应,就在调用函数上阻塞等待。这个调用的签名为:
func (p *Connection) Query(data []byte) ([]byte, error)
注意:data的控制权会转交给框架,因此函数调用后不能修改data的内容。 - 如果发送一个协议包是对于接收到的某个协议包的回应,则调用:
func (p *Connection) Reply(query, answer []byte) error
注意:answer的控制权会转交给框架,因此函数调用后不能修改answer的内容。 - 如果一个协议包不需要回应,就直接调用发送函数:
func (p *Connection) Write(data []byte) error
注意:data的控制权会转交给框架,因此函数调用后不能修改data的内容。 - 调用者需要实现的接口:
- Socket。用于协议包的收发。基本上是net.TCPConn的简单封装,在头部加上一个协议包的长度。
- DataHandler。用于协议处理,即没有通过Query返回的协议包会分发给此接口处理。
- ErrorHandler。用于错误处理。当断线时,会调用此接口。
- IdentityHandler。用于读取和设置会话ID。
ErrorHandler和DataHandler的函数实现中不能直接调用(*Connection).Close,否则会导致死锁。
type Connection
func NewConnection(conn Socket, maxcount int, dh DataHandler, ih IdentityHandler, eh ErrorHandler) *Connection
func (p *Connection) Start()
func (p *Connection) Close()
func (p *Connection) Query(data []byte) (res []byte, err error)
func (p *Connection) Reply(query, answer []byte) error
func (p *Connection) Write(data []byte) error
type Socket interface {
Read() ([]byte, error)
Write([]byte) error
Close()
}
type DataHandler interface {
Process([]byte)
}
type ErrorHandler interface {
OnError(error)
}
type IdentityHandler interface {
GetIdentity([]byte) uint32
SetIdentity([]byte, uint32)
}
完整的代码实现:
package multiplexer
import (
"errors"
"sync"
"sync/atomic"
)
var (
ERR_EXIT = errors.New("exit")
)
type Socket interface {
Read() ([]byte, error)
Write([]byte) error
Close()
}
type DataHandler interface {
Process([]byte)
}
type ErrorHandler interface {
OnError(error)
}
type IdentityHandler interface {
GetIdentity([]byte) uint32
SetIdentity([]byte, uint32)
}
type Connection struct {
conn Socket
wg sync.WaitGroup
mutex sync.Mutex
applicants map[uint32]chan []byte
chexit chan bool
chsend chan []byte
chch chan chan []byte
dh DataHandler
ih IdentityHandler
eh ErrorHandler
identity uint32
}
func NewConnection(conn Socket, maxcount int, dh DataHandler, ih IdentityHandler, eh ErrorHandler) *Connection {
count := maxcount
if count < 1024 {
count = 1024
}
chch := make(chan chan []byte, count)
for i := 0; i < count; i++ {
chch <- make(chan []byte, 1)
}
return &Connection{
conn: conn,
applicants: make(map[uint32]chan []byte, count),
chsend: make(chan []byte, count),
chexit: make(chan bool),
chch: chch,
dh: dh,
ih: ih,
eh: eh,
}
}
func (p *Connection) Start() {
p.wg.Add(2)
go func() {
defer p.wg.Done()
p.recv()
}()
go func() {
defer p.wg.Done()
p.send()
}()
}
func (p *Connection) Close() {
close(p.chexit)
p.conn.Close()
p.wg.Wait()
}
func (p *Connection) Query(data []byte) (res []byte, err error) {
var ch chan []byte
select {
case <-p.chexit:
return nil, ERR_EXIT
case ch = <-p.chch:
defer func() {
p.chch <- ch
}()
}
id := p.newIdentity()
p.ih.SetIdentity(data, id)
p.addApplicant(id, ch)
defer func() {
if err != nil {
p.popApplicant(id)
}
}()
if err := p.Write(data); err != nil {
return nil, err
}
select {
case <-p.chexit:
return nil, ERR_EXIT
case res = <-ch:
break
}
return res, nil
}
func (p *Connection) Reply(query, answer []byte) error {
// put back the identity attached to the query
id := p.ih.GetIdentity(query)
p.ih.SetIdentity(answer, id)
return p.Write(answer)
}
func (p *Connection) Write(data []byte) error {
select {
case <-p.chexit:
return ERR_EXIT
case p.chsend <- data:
break
}
return nil
}
func (p *Connection) send() {
for {
select {
case <-p.chexit:
return
case data := <-p.chsend:
if p.conn.Write(data) != nil {
return
}
}
}
}
func (p *Connection) recv() (err error) {
defer func() {
if err != nil {
select {
case <-p.chexit:
err = nil
default:
p.eh.OnError(err)
}
}
}()
for {
select {
case <-p.chexit:
return nil
default:
break
}
data, err := p.conn.Read()
if err != nil {
return err
}
if id := p.ih.GetIdentity(data); id > 0 {
ch, ok := p.popApplicant(id)
if ok {
ch <- data
continue
}
}
p.dh.Process(data)
}
return nil
}
func (p *Connection) newIdentity() uint32 {
return atomic.AddUint32(&p.identity, 1)
}
func (p *Connection) addApplicant(identity uint32, ch chan []byte) {
p.mutex.Lock()
defer p.mutex.Unlock()
p.applicants[identity] = ch
}
func (p *Connection) popApplicant(identity uint32) (chan []byte, bool) {
p.mutex.Lock()
defer p.mutex.Unlock()
ch, ok := p.applicants[identity]
if !ok {
return nil, false
}
delete(p.applicants, identity)
return ch, true
}
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。