Netty In Action中文版 - 第六章:ChannelHandler
本章介绍
- ChannelPipeline
- ChannelHandlerContext
- ChannelHandler
- Inbound vs outbound(入站和出站)
6.1 ChannelPipeline
- addFirst(...),加入ChannelHandler在ChannelPipeline的第一个位置
- addBefore(...),在ChannelPipeline中指定的ChannelHandler名称之前加入ChannelHandler
- addAfter(...),在ChannelPipeline中指定的ChannelHandler名称之后加入ChannelHandler
- addLast(ChannelHandler...),在ChannelPipeline的末尾加入ChannelHandler
- remove(...),删除ChannelPipeline中指定的ChannelHandler
- replace(...),替换ChannelPipeline中指定的ChannelHandler
ChannelPipeline pipeline = ch.pipeline(); FirstHandler firstHandler = new FirstHandler(); pipeline.addLast("handler1", firstHandler); pipeline.addFirst("handler2", new SecondHandler()); pipeline.addLast("handler3", new ThirdHandler()); pipeline.remove("?handler3?"); pipeline.remove(firstHandler); pipeline.replace("handler2", "handler4", new FourthHandler());被加入到ChannelPipeline的ChannelHandler将通过IO-Thread处理事件,这意味了必须不能有其它的IO-Thread堵塞来影响IO的总体处理;有时候可能须要堵塞,比如JDBC。因此,Netty同意通过一个EventExecutorGroup到每个ChannelPipeline.add*方法,自己定义的事件会被包括在EventExecutorGroup中的EventExecutor来处理,默认的实现是DefaultEventExecutorGroup。
6.2 ChannelHandlerContext
6.2.1 通知下一个ChannelHandler
- 调用Channel的方法
- 调用ChannelPipeline的方法
@Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //Event via Channel Channel channel = ctx.channel(); channel.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8)); //Event via ChannelPipeline ChannelPipeline pipeline = ctx.pipeline(); pipeline.write(Unpooled.copiedBuffer("netty in action", CharsetUtil.UTF_8)); } }); }下图表示通过Channel或ChannelPipeline的通知:
可能你想从ChannelPipeline的指定位置開始,不想流经整个ChannelPipeline,例如以下情况:
- 为了节省开销,不感兴趣的ChannelHandler不让通过
- 排除一些ChannelHandler
// Get reference of ChannelHandlerContext ChannelHandlerContext ctx = ..; // Write buffer via ChannelHandlerContext ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));该消息流经ChannelPipeline到下一个ChannelHandler,在这样的情况下使用ChannelHandlerContext開始下一个ChannelHandler。下图显示了事件流:
如上图显示的,从指定的ChannelHandlerContext開始,跳过前面全部的ChannelHandler,使用ChannelHandlerContext操作是常见的模式,最经常使用的是从ChannelHanlder调用操作,也能够在外部使用ChannelHandlerContext,由于这是线程安全的。
6.2.2 改动ChannelPipeline
public class WriteHandler extends ChannelHandlerAdapter { private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; } public void send(String msg){ ctx.write(msg); } }请注意,ChannelHandler实例假设带有@Sharable注解则能够被加入到多个ChannelPipeline。也就是说单个ChannelHandler实例能够有多个ChannelHandlerContext,因此能够调用不同ChannelHandlerContext获取同一个ChannelHandler。假设加入不带@Sharable注解的ChannelHandler实例到多个ChannelPipeline则会抛出异常;使用@Sharable注解后的ChannelHandler必须在不同的线程和不同的通道上安全使用。怎么是不安全的使用?看以下代码:
@Sharable public class NotSharableHandler extends ChannelInboundHandlerAdapter { private int count; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { count++; System.out.println("channelRead(...) called the " + count + " time?"); ctx.fireChannelRead(msg); } }上面是一个带@Sharable注解的Handler,它被多个线程使用时,里面count是不安全的,会导致count值错误。
为什么要共享ChannelHandler?使用@Sharable注解共享一个ChannelHandler在一些需求中还是有非常好的作用的,如使用一个ChannelHandler来统计连接数或来处理一些全局数据等等。
6.3 状态模型
- channelUnregistered
- channelRegistered
- channelActive
- channelInactive
6.4 ChannelHandler和其子类
6.4.1 ChannelHandler中的方法
- handlerAdded,ChannelHandler加入到实际上下文中准备处理事件
- handlerRemoved,将ChannelHandler从实际上下文中删除,不再处理事件
- exceptionCaught,处理抛出的异常
6.4.2 ChannelInboundHandler
- channelRegistered,ChannelHandlerContext的Channel被注冊到EventLoop;
- channelUnregistered,ChannelHandlerContext的Channel从EventLoop中注销
- channelActive,ChannelHandlerContext的Channel已激活
- channelInactive,ChannelHanderContxt的Channel结束生命周期
- channelRead,从当前Channel的对端读取消息
- channelReadComplete,消息读取完毕后运行
- userEventTriggered,一个用户事件被处罚
- channelWritabilityChanged,改变通道的可写状态,能够使用Channel.isWritable()检查
- exceptionCaught,重写父类ChannelHandler的方法,处理异常
/** * 实现ChannelInboundHandlerAdapter的Handler,不会自己主动释放接收的消息对象 * @author c.k * */ public class DiscardHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //手动释放消息 ReferenceCountUtil.release(msg); } }
/** * 继承SimpleChannelInboundHandler,会自己主动释放消息对象 * @author c.k * */ public class SimpleDiscardHandler extends SimpleChannelInboundHandler<Object> { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { //不须要手动释放 } }假设须要其它状态改变的通知,能够重写Handler的其它方法。通常自己定义消息类型来解码字节,能够实现ChannelInboundHandler或ChannelInboundHandlerAdapter。有一个更好的解决方法,使用编解码器的框架能够非常容的实现。使用ChannelInboundHandler、ChannelInboundHandlerAdapter、SimpleChannelInboundhandler这三个中的一个来处理接收消息,使用哪一个取决于需求;大多数时候使用SimpleChannelInboundHandler处理消息,使用ChannelInboundHandlerAdapter处理其它的“入站”事件或状态改变。
ChannelInitializer用来初始化ChannelHandler,将自己定义的各种ChannelHandler加入到ChannelPipeline中。
6.4.3 ChannelOutboundHandler
- bind,Channel绑定本地地址
- connect,Channel连接操作
- disconnect,Channel断开连接
- close,关闭Channel
- deregister,注销Channel
- read,读取消息,实际是截获ChannelHandlerContext.read()
- write,写操作,实际是通过ChannelPipeline写消息,Channel.flush()属性到实际通道
- flush,刷新消息到通道
public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ReferenceCountUtil.release(msg); promise.setSuccess(); } }重要的是要记得释放致远并直通ChannelPromise,若ChannelPromise没有被通知可能会导致当中一个ChannelFutureListener不被通知去处理一个消息。
假设消息被消费而且没有被传递到ChannelPipeline中的下一个ChannelOutboundHandler,那么就须要调用ReferenceCountUtil.release(message)来释放消息资源。一旦消息被传递到实际的通道,它会自己主动写入消息或在通道关闭是释放。
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。