上行和下行事件分为被相应方向的Handler所处理。一个upstream handler通常处理由图中底部所示IO线程产生的带内数据( inbound
data)。带内数据通常是真正的输入流从远端主机读到的数据,即 InputStream.read(byte[]).。如果一个上行事件到达了upstream handler的最顶端,那么就会默默的丢弃。
与之类似,一个downstream handler 产生和传输带外数据(outbound
traffic)比如说写请求,如果一个下行事件到达了了最低端,就会被与这个Channel相关的IO线程所处理(就是之前说道的worker thread)。这里的IO线程通常执行真正的输出流操作,即OutputStream.write(byte[])。
看下面这个pipeline:
ChannelPipeline p = Channels.pipeline();
p.addLast("1", new UpstreamHandlerA());
p.addLast("2", new UpstreamHandlerB());
p.addLast("3", new DownstreamHandlerA());
p.addLast("4", new DownstreamHandlerB());
p.addLast("5", new UpstreamHandlerX());
要注意的是,站在不同的方向Handler顺序的解读是不一样的,而且要看这个Handler的职责范围。比如对于upstream event经过的Handler依次是1,2,5,对于downstream event经过的Handler顺序是4,3.
构建pipeline:我们可能会在程序中需要多个Handlers来处理IO请求如read,write,close等。比如说典型的服务器需要每个通道都有以下handlers,而且会取决于协议和业务罗杰的复杂度。
Protocol Decoder - translates binary data (e.g. ChannelBuffer) into a Java object.
Protocol Encoder - translates a Java object into binary data.
ExecutionHandler - applies a thread model.
Business Logic Handler - performs the actual business logic (e.g. database access).
代码形式像这样:
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new MyProtocolDecoder());
pipeline.addLast("encoder", new MyProtocolEncoder());
pipeline.addLast("executor", new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)));
pipeline.addLast("handler", new MyBusinessLogicHandler());
线程安全:因为ChannelPipeline 是线程安全的,所以可以任何时刻可以将一个ChannelHandler加入或移除。比如,当出现敏感信息的时候加入SslHandler,交易完成后移除它。
缺陷(Pitfall):由于现在默认ChannelPipeline的内部实行细节,下面的代码不会如期工作,如果这里的FirstHandler 也是该pipeline中最后一个Handler的话。(不知道现在的netty5怎么做的呢?)
public class FirstHandler extends SimpleChannelUpstreamHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
// Remove this handler from the pipeline,
ctx.getPipeline().remove(this);
// And let SecondHandler handle the current event.
ctx.getPipeline().addLast("2nd", new SecondHandler());
ctx.sendUpstream(e);
}
}
正确的做法是在移除前确保至少还有一个Handler(因为Handler context中的pre,next缘故?),或者先增加再移除。