Netty3 源码分析 - ChannelPipeline


ChannelPipeline的作用就是组织一系列的ChannelHandlers 为某一个Channel服务,处理各种事件。实现了拦截过滤器模式的高级形式(an advanced form of the Intercepting Filter pattern),进而有效控制如何处理一个事件以及ChannelHandlers之间如何交互。类型结构图为:


流水线的创建:对于每个新的通道,必须为其创建和添置一个Pipeline,一旦设置,他们之间的耦合就是永久的,这个通道不同添置另一个流水线,不能解耦当前的Pipeline。创建Pipeline的推荐方式是利用Channels中的辅助方法,而不是调用某个实现的构造器。
 import static org.jboss.netty.channel.Channels.*;
 ChannelPipeline pipeline = pipeline(); // same with Channels.pipeline()

Pipeline中事件流动:下图展示了事件被各个ChannelHandlers 处理的典型方式. 一个ChannelEvent可以被ChannelUpstreamHandle或ChannelDownstreamHandler 处理而后通过调用 ChannelHandlerContext.sendUpstream(ChannelEvent) 或者 ChannelHandlerContext.sendDownstream(ChannelEvent) 把事件转发到最近的一个Handler。根据事件的方向事件有不同的解释 (参见ChannelEvent )。



上行和下行事件分为被相应方向的Handler所处理。一个upstream handler通常处理由图中底部所示IO线程产生的带内数据( inbound data)。带内数据通常是真正的输入流从远端主机读到的数据,即 InputStream.read(byte[]).。如果一个上行事件到达了upstream handler的最顶端,那么就会默默的丢弃。
与之类似,一个downstream handler 产生和传输带外数据outbound traffic)比如说写请求,如果一个下行事件到达了了最低端,就会被与这个Channel相关的IO线程所处理(就是之前说道的worker thread)。这里的IO线程通常执行真正的输出流操作,即OutputStream.write(byte[])。

看下面这个pipeline:
 ChannelPipeline p = Channels.pipeline();
 p.addLast("1", new UpstreamHandlerA());
 p.addLast("2", new UpstreamHandlerB());
 p.addLast("3", new DownstreamHandlerA());
 p.addLast("4", new DownstreamHandlerB());
 p.addLast("5", new UpstreamHandlerX());
要注意的是,站在不同的方向Handler顺序的解读是不一样的,而且要看这个Handler的职责范围。比如对于upstream event经过的Handler依次是1,2,5,对于downstream event经过的Handler顺序是4,3.

构建pipeline:我们可能会在程序中需要多个Handlers来处理IO请求如read,write,close等。比如说典型的服务器需要每个通道都有以下handlers,而且会取决于协议和业务罗杰的复杂度。
Protocol Decoder - translates binary data (e.g. ChannelBuffer) into a Java object. 
Protocol Encoder - translates a Java object into binary data. 
ExecutionHandler - applies a thread model. 
Business Logic Handler - performs the actual business logic (e.g. database access). 

代码形式像这样: 
 ChannelPipeline pipeline = Channels.pipeline();
 pipeline.addLast("decoder", new MyProtocolDecoder());
 pipeline.addLast("encoder", new MyProtocolEncoder());
 pipeline.addLast("executor", new ExecutionHandler(
         new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)));
 pipeline.addLast("handler", new MyBusinessLogicHandler());

线程安全因为ChannelPipeline 是线程安全的,所以可以任何时刻可以将一个ChannelHandler加入或移除。比如,当出现敏感信息的时候加入SslHandler,交易完成后移除它。

缺陷(Pitfall):由于现在默认ChannelPipeline的内部实行细节,下面的代码不会如期工作,如果这里的FirstHandler 也是该pipeline中最后一个Handler的话。(不知道现在的netty5怎么做的呢?
 public class FirstHandler extends SimpleChannelUpstreamHandler {

     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
         // Remove this handler from the pipeline,
         ctx.getPipeline().remove(this);
         // And let SecondHandler handle the current event.
         ctx.getPipeline().addLast("2nd", new SecondHandler());
         ctx.sendUpstream(e);
     }
 }
  正确的做法是在移除前确保至少还有一个Handler(因为Handler context中的pre,next缘故?),或者先增加再移除。

详细的源码注释看我的 github



郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。