Tornado 的 IOStream 简介与应用

Tornado的核心源码是由ioloop.py和iostream.py这2个文件组成的。前者提供了一个循环,用于处理I/O事件;后者则封装了一个非阻塞的socket。
有了这2者后,就能搭建起TCP server和HTTP server,实现异步HTTP客户端,这便是Tornado的主要内容了。
之前在研究socket时已差不多弄懂了ioloop的逻辑,于是本文就接着研究iostream了。

这里我并不想逐行分析iostream的源码,因为并不是什么很难懂的代码,于是只说说它做了些什么事。
先看IOStream的__init__()方法,重要的参数只有socket和io_loop这2个。所以很容易猜想到,它只是封装了这个socket,然后把I/O事件注册到io_loop上。
读了源码就能很快证实我们的猜测,值得注意的是,封装socket时还将其设为非阻塞的了。
此后在连接和读写时都调用了_add_io_state()方法,这个方法就是调用io_loop的add_handler()和update_handler()方法来注册事件的。
此外,似乎没有开放读完所有数据的接口,要自行实现的话可以使用_read_to_buffer()方法。

看完源码可能还是一头雾水,它究竟能用来干啥呢?
最为重要的用处当然是实现TCPServer了。有了ioloop,一个服务器就可以处理I/O事件了;可是I/O并不能凭空产生啊,它还需要通过socket连接和传输。
而正如前面所说,IOStream封装了socket,把它变成了非阻塞的,每次连接和读写这些I/O事件都注册到io_loop上,这样就实现了一个完整的TCPServer了。
可惜Tornado源码里的TCPServer并没有实际功能,HTTPServer又太复杂了,所以我找了一段echo server的代码,一看就知道用了。测试也很简单,用telnet连上去后,输入什么就会回显什么。

另一个功能还是和非阻塞有关。
Tornado能够以单线程处理高并发,靠的就是非阻塞。而假如其中的任何一次I/O是非阻塞的,需要消耗数毫秒甚至数秒,那么每秒能处理的请求数就不可能超过1000了。
所以这些耗时很长I/O访问必须封装成非阻塞的,而这早就被IOStream做好了。

此外,为了提高服务的响应速度,也需要用到异步处理。
以前几天我写的聊天室为例,当用户发送了一条信息后,我需要把它广播给所有的用户,然后才能结束这次响应。
在这个例子中,用户可能不需要等待多久。可是假如我还需要做一些复杂的处理,比如关键字过滤,分析@用户名、URL,保存到数据库,或者发送email等,我并不想让用户的请求一直被阻塞着。
那么我可以搭建另一个TCPServer,将接收到的信息通过IOStream发送给它,然后立即结束响应。在那个TCPServer上,我想做任何耗时的事,都不会拖慢主HTTPServer的响应速度;而一旦完成处理,就可以将结果通过IOStream返回给HTTPServer,让其完成扫尾工作。

这里仍然以聊天室为例,当接收到信息后,我将其发送给EchoServer,并结束响应。
EchoServer返回输入的信息,此时HTTPServer监听到这个事件,就读取并广播信息。
虽然对用户来说,效果是一样的。不过整个系统的扩展性就增强了,你可以把这个EchoServer替换成消息队列、memcache等各种数据源。虽然实现上仍有很大差异,但最重要的思路是一致的。

代码和上次的聊天室差不多,handler部分都一样,我就只贴出更改的部分吧:

import logging
import os.path
import socket
import uuid
import tornado.httpserver
import tornado.ioloop
import tornado.iostream
import tornado.options
import tornado.web
import tornado.websocket


def broadcast_message(message):
for handler in ChatSocketHandler.socket_handlers:
  try:
   handler.write_message(message)
  except:
   logging.error('Error sending message', exc_info=True)

for callback in ChatHandler.callbacks:
  try:
   callback(message)
  except:
   logging.error('Error in callback', exc_info=True)
ChatHandler.callbacks = set()

def send_message(message):
stream.write((message + '\n').encode('utf-8'))

def read_message_from_echo_server():
def broadcast(message):
  broadcast_message(message[:-1])
  read_message_from_echo_server()
stream.read_until('\n', broadcast)

# ...

if __name__ == '__main__':
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
stream = tornado.iostream.IOStream(s)
stream.connect(('127.0.0.1', 8888))
read_message_from_echo_server()

main()

 

要注意的是,连接EchoServer需要在调用ioloop的start()方法之前,因为这个方法是个死循环,后面的代码没机会执行。

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。