golang:实现thrift的client端协程安全
前言
Golang作为我们服务端开发的主要语言,实现了很多基础服务,比如oauth2,账户系统,支付,客服等。而在早期开发阶段,也尝试使用golang做页面展示,但每种语言都有自己最擅长的领域,让golang来搞前端实在是有点疼,最终我们选择php+golang的方式来作为整体的服务架构。
那么问题来了,php和golang这对好基友如何愉快的玩耍呢?结论是thrift是块好肥皂!
抛砖
市面上肥皂一大堆,最著名的是舒肤佳,那么我们为毛不用舒肤佳,而选择thrift呢。。。因为够酸爽!
这种酸爽,只有牙口好,才能吃嘛嘛香。众所周知,thrift有多种型号(传输协议),比如家用型的TDebugProtocol,持久型TBinaryProtocol还有爆炸型TCompactProtocol。
而我们使用初始,想当然的选择了爆炸型TCompactProtocol这种更能让酸爽感提升百分之10的型号。但是php牙口不太好,遇到golang搓出的64位int时,1234567890硬是给爆成了1234567891(此处只是举个例子,php在处理golang返回的int64会出现错误的结果)。所以,php和golang这对好基友,thrift爆炸酸爽好,不如thrift持久好。(据说thrift下一个版本会修复这个bug,敬请关注吧)
引玉
乱扯一通,引据经典,发现Thrift生成的server端是thread safe的,但client端不是。所以需要多个thread和server端通信,则每个thread需要init一个自己的client实例。
那么问题来了,golang是如何实现thrift的client端协程安全呢?
实践
首先,thrift实现golang的server端,依托golang牛叉的goroutine,只实现了一种类似TThreadedServer的服务模型,所以毛老师再也不用担心我滴使用了。
func (p *TSimpleServer) AcceptLoop() error { for { select { case <-p.quit: return nil default: } client, err := p.serverTransport.Accept() if err != nil { log.Println("Accept err: ", err) } if client != nil { go func() {// 起新routine处理 if err := p.processRequests(client); err != nil { log.Println("error processing request:", err) } }() } } }
其次,thrift的client端都是线程不安全的,那么问题来了,重新实现Transport好搞,还是在现有Transport的基础上使用pool好?
还在我思考如何修改Transport的实现时,毛老师已经搞定了pool,那么结论来了,在Transport基础上使用pool好。。。即便重新实现也无非是加pool,这样一来还得改thrift的client实现,真是费时费力又不讨好。thrift默认实现的Transport有基础的读写功能,丢到pool里照样游来游去。
以下是毛老师实现的pool,有基本的超时检查,最大激活和空闲数等功能。
type Pool struct { // Dial is an application supplied function for creating new connections. Dial func() (interface{}, error) // Close is an application supplied functoin for closeing connections. Close func(c interface{}) error // TestOnBorrow is an optional application supplied function for checking // the health of an idle connection before the connection is used again by // the application. Argument t is the time that the connection was returned // to the pool. If the function returns an error, then the connection is // closed. TestOnBorrow func(c interface{}, t time.Time) error // Maximum number of idle connections in the pool. MaxIdle int // Maximum number of connections allocated by the pool at a given time. // When zero, there is no limit on the number of connections in the pool. MaxActive int // Close connections after remaining idle for this duration. If the value // is zero, then idle connections are not closed. Applications should set // the timeout to a value less than the server's timeout. IdleTimeout time.Duration // mu protects fields defined below. mu sync.Mutex closed bool active int // Stack of idleConn with most recently used at the front. idle list.List } type idleConn struct { c interface{} t time.Time } // New creates a new pool. This function is deprecated. Applications should // initialize the Pool fields directly as shown in example. func New(dialFn func() (interface{}, error), closeFn func(c interface{}) error, maxIdle int) *Pool { return &Pool{Dial: dialFn, Close: closeFn, MaxIdle: maxIdle} } // Get gets a connection. The application must close the returned connection. // This method always returns a valid connection so that applications can defer // error handling to the first use of the connection. func (p *Pool) Get() (interface{}, error) { p.mu.Lock() // if closed if p.closed { p.mu.Unlock() return nil, ErrPoolClosed } // Prune stale connections. if timeout := p.IdleTimeout; timeout > 0 { for i, n := 0, p.idle.Len(); i < n; i++ { e := p.idle.Back() if e == nil { break } ic := e.Value.(idleConn) if ic.t.Add(timeout).After(nowFunc()) { break } p.idle.Remove(e) p.active -= 1 p.mu.Unlock() // ic.c.Close() p.Close(ic.c) p.mu.Lock() } } // Get idle connection. for i, n := 0, p.idle.Len(); i 0 && p.active >= p.MaxActive { p.mu.Unlock() return nil, ErrPoolExhausted } // No idle connection, create new. dial := p.Dial p.active += 1 p.mu.Unlock() c, err := dial() if err != nil { p.mu.Lock() p.active -= 1 p.mu.Unlock() c = nil } return c, err } // Put adds conn back to the pool, use forceClose to close the connection forcely func (p *Pool) Put(c interface{}, forceClose bool) error { if !forceClose { p.mu.Lock() if !p.closed { p.idle.PushFront(idleConn{t: nowFunc(), c: c}) if p.idle.Len() > p.MaxIdle { // remove exceed conn c = p.idle.Remove(p.idle.Back()).(idleConn).c } else { c = nil } } p.mu.Unlock() } // close exceed conn if c != nil { p.mu.Lock() p.active -= 1 p.mu.Unlock() return p.Close(c) } return nil } // ActiveCount returns the number of active connections in the pool. func (p *Pool) ActiveCount() int { p.mu.Lock() active := p.active p.mu.Unlock() return active } // Relaase releases the resources used by the pool. func (p *Pool) Release() error { p.mu.Lock() idle := p.idle p.idle.Init() p.closed = true p.active -= idle.Len() p.mu.Unlock() for e := idle.Front(); e != nil; e = e.Next() { p.Close(e.Value.(idleConn).c) } return nil }
最后,在实际使用thrift相关的设置貌似只有超时时间,那么问题来了,pool下,thrift的超时时间如何是好?
由于在使用pool之前,使用每个routine创建一个client的方式,超时时间设置的都很短,server端和client都是15秒。换了pool使用方式之后,时间没变,也就是说我们把超时交给thrift自己管理,但发现经常性的出现EOF的I/O错误。经过跟踪发现,在请求量小的情况下,15秒就显得太短了,pool里会easy的出现空闲时间超过15秒的连接,而当我们get出来使用时,因为超时,导致了EOF。
经过实践,server端的时间一定要足够长,我们设置了8h,client端的超时则交给pool管理,不然pool里还有可能出现超时的连接。
// server transportFactory := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory()) protocolFactory := thrift.NewTBinaryProtocolFactoryDefault() serverTransport, err := thrift.NewTServerSocketTimeout(bind, thriftCallTimeOut) if err != nil { log.Exitf("start thrift rpc error(%v)", err) } // thrift rpc service handler := NewThriftRPC() processor := thriftRpc.NewRpcServiceProcessor(handler) server := thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory) thriftServer = append(thriftServer, server) log.Info("start thrift rpc listen addr: %s", bind) go server.Serve() // client thriftPool = &pool.Pool{ Dial: func() (interface{}, error) { addr := conf.MyConf.ThriftOAuth2Addr[rand.Intn(len(conf.MyConf.ThriftOAuth2Addr))] sock, err := thrift.NewTSocket(addr) // client端不设置超时 if err != nil { log.Error("thrift.NewTSocketTimeout(%s) error(%v)", addr, err) return nil, err } tF := thrift.NewTFramedTransportFactory(thrift.NewTTransportFactory()) pF := thrift.NewTBinaryProtocolFactoryDefault() client := rpc.NewRpcServiceClientFactory(tF.GetTransport(sock), pF) if err = client.Transport.Open(); err != nil { log.Error("client.Transport.Open() error(%v)", err) return nil, err } return client, nil }, Close: func(v interface{}) error { v.(*rpc.RpcServiceClient).Transport.Close() return nil }, MaxActive: conf.MyConf.ThriftMaxActive, MaxIdle: conf.MyConf.ThriftMaxIdle, IdleTimeout: conf.MyConf.ThriftIdleTimeout, } pool.idleTimeout 7h // pool最大空闲时间,设置比server端小,都设置8h,也有可能出现超时连接
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。