netty ByteToMessageDecoder 分析
ByteToMessageDecoder 1.socket 移除时触发,最后次读数据处理 @Override public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = internalBuffer(); if (buf.isReadable()) { ByteBuf bytes = buf.readBytes(buf.readableBytes()); buf.release(); ctx.fireChannelRead(bytes); } cumulation = null; ctx.fireChannelReadComplete(); handlerRemoved0(ctx); } 2.读取数据 ByteBuf release,discardSomeReadBytes 方法后面研究 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RecyclableArrayList out = RecyclableArrayList.newInstance();//创建包装过的数组 try { if (msg instanceof ByteBuf) { ByteBuf data = (ByteBuf) msg; if (cumulation == null) { //是否继续读取数据 cumulation = data; try { callDecode(ctx, cumulation, out); } finally { if (cumulation != null && !cumulation.isReadable()) { cumulation.release(); cumulation = null; } } } else {//继续读取处理 try { //如果剩余空间不够开创建新空间 if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) { ByteBuf oldCumulation = cumulation; cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + data.readableBytes()); cumulation.writeBytes(oldCumulation); oldCumulation.release(); } cumulation.writeBytes(data); callDecode(ctx, cumulation, out); } finally { if (cumulation != null) { if (!cumulation.isReadable()) { cumulation.release(); cumulation = null; } else { cumulation.discardSomeReadBytes(); } } data.release(); } } } else { out.add(msg); } } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { int size = out.size(); if (size == 0) { decodeWasNull = true; } else { for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.get(i)); } } out.recycle(); } } //其实只要管核心代码 decode 调用业务处理 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { //循环读取数据 while (in.isReadable()) { int outSize = out.size(); int oldInputLength = in.readableBytes(); decode(ctx, in, out);//业务扩展处理 // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 //如果 handler 删除之前,那么不读取数据了 if (ctx.isRemoved()) { break; } //下面写得很不清晰。。。。。 if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } }
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。