一个http请求在play框架中的前世今生(下)

上一篇提到了play底层的网络通信基于netty实现,于是粗略地研究了一下netty,总结如下。(netty版本是3.2.5,不同版本的实现可能差异较大)

一、netty的组件

channelBuffer:

传输Buffer和抽象后的逻辑Buffer的结合,将NIO底层的多个buffer合并成了一个可以代表完整消息内容的buffer,可以理解为一个message


channel:

对于Java的old IO和NIO的输入|输出通道的一个封装


channelPipeline:

Netty的ChannelPipeline包含两条线路:Upstream和Downstream。Upstream对应上行,接收到的消息、被动的状态改变,都属于Upstream。Downstream则对应下行,发送的消息、主动的状态改变,都属于Downstream。ChannelPipeline接口包含了两个重要的方法:sendUpstream(ChannelEvent e)和sendDownstream(ChannelEvent e),就分别对应了Upstream和Downstream。


handler:

 ChannelPipeline里包含的ChannelHandler也包含两类:ChannelUpstreamHandler和ChannelDownstreamHandler。每条线路的Handler是互相独立的。它们都很简单的只包含一个方法:ChannelUpstreamHandler.handleUpstream和ChannelDownstreamHandler.handleDownstream。

handler要实现messageReceive方法,在这里面做特定协议的实现

 @Override

    public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent messageEvent) throws Exception {

          //do something

}


二、编程模型

1.实现自己的handler

2.实现自己的pipeline factory,将handler注册到pipeline中

3.在netty启动类中指定自己的pipeline

三、线程模型(Reactor模式)

netty的线程模型采用了reactor模式,如下图,boss线程(reactor)使用selector接收请求,然后委派给acceptor,worker线程(图中的线程池表示)从acceptor领取任务并执行。


----------------------------------------------初始化过程-----------------------------------------------------------------

通过阅读Netty中的NioServerSocketChannelFactory类的代码,可以详细了解reactor模式对应的组件的初始化过程。

NioServerSocketChannelFactory负责构造一个NioServerSocket,包含一个ChannelSink的引用,ChannelSink中管理着worker线程池:

NioServerSocketPipelineSink(Executor workerExecutor, int workerCount) {

        workers = new NioWorker[workerCount];

        for (int i = 0; i < workers.length; i ++) {

            workers[i] = new NioWorker(id , i + 1, workerExecutor);

        }

    }

NioServerSocketPipelineSink实现ChannelSink接口,包含eventSunk方法,负责severSocker绑定到tcp端口时的初始化工作和建立新的客户端socket连接的处理。如下述代码所示:

public void eventSunk(

            ChannelPipeline pipeline, ChannelEvent e) throws Exception {

        Channel channel = e.getChannel();

        if (channel instanceof NioServerSocketChannel) {//serverSocket绑定端口和创建channel

            handleServerSocket(e);

        } else if (channel instanceof NioSocketChannel) {//client socket的连接建立

            handleAcceptedSocket(e);

        }

handlerServerSocket会调用如下的bind方法,初始化扮演mainReactor角色的Boss线程。

private void bind(

            NioServerSocketChannel channel, ChannelFuture future,

            SocketAddress localAddress) {


        boolean bound = false ;

        boolean bossStarted = false;

        try {

            channel. socket.socket().bind(localAddress, channel.getConfig().getBacklog());

            bound = true;


            future.setSuccess();

            fireChannelBound(channel, channel.getLocalAddress());


            Executor bossExecutor =

                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor ;

            DeadLockProofWorker. start(

                    bossExecutor,

                    new ThreadRenamingRunnable(

                            new Boss(channel),

                            "New I/O server boss #" + id + " (" + channel + ‘)‘));

            bossStarted = true;

        } catch (Throwable t) {

            future.setFailure(t);

            fireExceptionCaught(channel, t);

        } finally {

            if (!bossStarted && bound) {

                close(channel, future);

            }

        }

    }

----------------------------------------------初始化过程-----------------------------------------------------------------

到此为止Boss线程使用NIO的selector监听我们制定的socket端口是否有客户端连接的IO事件发生,它的主循环loop如下:

 public void run() {

            final Thread currentThread = Thread.currentThread();


            channel. shutdownLock.lock();

                for (;;) {

                        if (selector .select(1000) > 0) {

                            selector.selectedKeys().clear();

                        }


                        SocketChannel acceptedSocket = channel. socket.accept();

                        if (acceptedSocket != null) {

                            registerAcceptedChannel(acceptedSocket, currentThread);

                        }

               }

        }

只干了一件事情,就是讲接收到的客户端sokcet连接注册到Acceptor中。为了提高boss线程的处理效率?注册工作会被抽象成一个runnable对象,放入注册任务队列中,交给worker线程池来执行。worker的主循环如下:

    thread = Thread.currentThread();


        boolean shutdown = false;

        Selector selector = this.selector ;

        for (;;) {

            wakenUp.set( false);


            try {

                SelectorUtil. select(selector);


                if (wakenUp .get()) {

                    selector.wakeup();

                }


                cancelledKeys = 0;

                processRegisterTaskQueue();

                processWriteTaskQueue();

                processSelectedKeys(selector.selectedKeys());

            } catch (Throwable t) {

                logger.warn(

                        "Unexpected exception in the selector loop." , t);

            }

        }

    }

worker中采用了流水线模式,在每一次循环中,执行以下三个任务:

1.从acceptor中拿到mainReactor中接收的客户端连接,注册到自己的selector中(每个worker一个selector)

2.如果写任务队里中有数据需要执行,取出一个写任务并执行

3.对于已经注册到selector的channel,处理它的read或write IO事件。

四、play线程与netty线程的协作

play线程和netty线程的对于请求的协作处理过程如下:

请求->netty master->netty worker->自己启动一个新的线程->netty response->netty worker。

下面是协作过程的详细分析:

1.netty worker转交请求处理工作给play

PlayHandler里的messageRecived:


public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)

            throws Exception{

....

Invoker.invoke(new NettyInvokation(request, response, ctx, nettyRequest, e));

}

通过在messageReceived中向线程池提交任务的方式来完成request的业务逻辑处理部分,messageReceived结束后,worker线程脱离这个request,这个request不再会占用worker了。这样不但可以保持长连接不关闭,而且不会占用netty的worker线程。

2.play将写回数据响应交回给netty worker

playHandler用来返回响应数据的copyResponse方法,底层会委托给下面这个方法:

public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {

        ChannelFuture future = future(channel);

        channel.getPipeline().sendDownstream(

                new DownstreamMessageEvent(channel, future, message, remoteAddress));

        return future;

    }

可见返回的数据以message为载体又回到了channel中注册的pipeline中,netty的每一个pipeline最后都有一个channelSink负责收尾工作,对于Play应用,这个sink是上面提到的NioServerSocketPipelineSink

private void handleAcceptedSocket (ChannelEvent e) {

        if (e instanceof ChannelStateEvent) {

            //此处省略

        } else if (e instanceof MessageEvent) {

            MessageEvent event = (MessageEvent) e;

            NioSocketChannel channel = (NioSocketChannel) event.getChannel();

            boolean offered = channel.writeBuffer .offer(event);

            assert offered;

            channel. worker.writeFromUserCode(channel);

        }

    }

worker的writeFromUserCode方法会将写任务放入worker的写任务队列中,然后等待worker的主循环来处理

void writeFromUserCode(final NioSocketChannel channel) {

        if (!channel.isConnected()) {

            cleanUpWriteBuffer(channel);

            return;

        }


        if (scheduleWriteIfNecessary(channel)) {//放入worker的写任务队列

            return;

        }


        // From here, we are sure Thread.currentThread() == workerThread.


        if (channel.writeSuspended ) {

            return;

        }


        if (channel.inWriteNowLoop ) {

            return;

        }


        write0(channel);

    }


如果在一次循环中write0方法没有写完数据,write0方法还会主动设置channel的OP_WRITE状态,让主循环的processSelectedKeys方法来处理,一直到数据写完,才清除这个OP_WRITE状态。


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。