netty ByteToMessageDecoder 分析

    ByteToMessageDecoder
        1.socket 移除时触发,最后次读数据处理
        
            @Override
        public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            ByteBuf buf = internalBuffer();
             if (buf.isReadable()) {
                ByteBuf bytes = buf.readBytes(buf.readableBytes());
                buf.release();
                ctx.fireChannelRead(bytes);
            }
            cumulation = null;
            ctx.fireChannelReadComplete();
            handlerRemoved0(ctx);
        }
        
        2.读取数据 ByteBuf release,discardSomeReadBytes 方法后面研究
        
         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            RecyclableArrayList out = RecyclableArrayList.newInstance();//创建包装过的数组
            try {
                if (msg instanceof ByteBuf) {
                    ByteBuf data = (ByteBuf) msg;
                    if (cumulation == null) { //是否继续读取数据
                        cumulation = data;
                        try {
                            callDecode(ctx, cumulation, out);
                        } finally {
                            if (cumulation != null && !cumulation.isReadable()) {
                                cumulation.release();
                                cumulation = null;
                            }
                        }
                    } else {//继续读取处理
                        try {
                            //如果剩余空间不够开创建新空间
                            if (cumulation.writerIndex() > cumulation.maxCapacity() - data.readableBytes()) {
                                ByteBuf oldCumulation = cumulation;
                                cumulation = ctx.alloc().buffer(oldCumulation.readableBytes() + data.readableBytes());
                                cumulation.writeBytes(oldCumulation);
                                oldCumulation.release();
                            }
                            cumulation.writeBytes(data);
                            callDecode(ctx, cumulation, out);
                        } finally {
                            if (cumulation != null) {
                                if (!cumulation.isReadable()) {
                                    cumulation.release();
                                    cumulation = null;
                                } else {
                                    cumulation.discardSomeReadBytes();
                                }
                            }
                            data.release();
                        }
                    }
                } else {
                    out.add(msg);
                }
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                int size = out.size();
                if (size == 0) {
                    decodeWasNull = true;
                } else {
                    for (int i = 0; i < size; i ++) {
                        ctx.fireChannelRead(out.get(i));
                    }
                }
                out.recycle();
            }
        }
        
        //其实只要管核心代码 decode 调用业务处理
         protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            try {
                //循环读取数据
                while (in.isReadable()) {
                    int outSize = out.size();
                    int oldInputLength = in.readableBytes();
                    decode(ctx, in, out);//业务扩展处理

                    // Check if this handler was removed before continuing the loop.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See https://github.com/netty/netty/issues/1664
                    //如果 handler 删除之前,那么不读取数据了
                    if (ctx.isRemoved()) {
                        break;
                    }
                    //下面写得很不清晰。。。。。
                    
                    if (outSize == out.size()) {
                        if (oldInputLength == in.readableBytes()) {
                            break;
                        } else {
                            continue;
                        }
                    }

                    if (oldInputLength == in.readableBytes()) {
                        throw new DecoderException(
                                StringUtil.simpleClassName(getClass()) +
                                ".decode() did not read anything but decoded a message.");
                    }

                    if (isSingleDecode()) {
                        break;
                    }
                }
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable cause) {
                throw new DecoderException(cause);
            }
        }

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。