Netty源代码之FrameDecoder

FrameDecoder说白了是解码器,可以将网络数据一帧一帧地解码出来

主要看起messageReceived

 public void messageReceived(
            ChannelHandlerContext ctx, MessageEvent e) throws Exception {

        Object m = e.getMessage();
        if (!(m instanceof ChannelBuffer)) {
            ctx.sendUpstream(e);
            return;
        }

        ChannelBuffer input = (ChannelBuffer) m;
        if (!input.readable()) {
            return;
        }

        if (cumulation == null) {
            try {
                // the cumulation buffer is not created yet so just pass the input to callDecode(...) method
                callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
            } finally {
                updateCumulation(ctx, input);
            }

        } else {
            input = appendToCumulation(input);
            try {
                callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
            } finally {
                updateCumulation(ctx, input);
            }
        }
    }

这里这个cumulation是关键,cumulation的是"累计的意思",即将数据累计起来,首先判断cululation是不是null,是null的话直接调用callDecode方法,不然先将input添加到culutation

callDecode()

    private void callDecode(
            ChannelHandlerContext context, Channel channel,
            ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {

        while (cumulation.readable()) {
            int oldReaderIndex = cumulation.readerIndex();
            Object frame = decode(context, channel, cumulation);
            if (frame == null) {
                if (oldReaderIndex == cumulation.readerIndex()) {
                    // Seems like more data is required.
                    // Let us wait for the next notification.
                    break;
                } else {
                    // Previous data has been discarded.
                    // Probably it is reading on.
                    continue;
                }
            } else if (oldReaderIndex == cumulation.readerIndex()) {
                throw new IllegalStateException(
                        "decode() method must read at least one byte " +
                        "if it returned a frame (caused by: " + getClass() + ")");
            }

            unfoldAndFireMessageReceived(context, remoteAddress, frame);
        }
    }

在unfoldAndFireMessageReceived()里面,会调用Channels.fireMessageReceived(),最后调用ctx.sendUpstream,事件会交给下一个Handler来处理

            ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
        if (unfold) {
            if (result instanceof Object[]) {
                for (Object r: (Object[]) result) {
                    Channels.fireMessageReceived(context, r, remoteAddress);
                }
            } else if (result instanceof Iterable<?>) {
                for (Object r: (Iterable<?>) result) {
                    Channels.fireMessageReceived(context, r, remoteAddress);
                }
            } else {
                Channels.fireMessageReceived(context, result, remoteAddress);
            }
        } else {
            Channels.fireMessageReceived(context, result, remoteAddress);
        }
    }
appendToCumulation()这里可以看出cumulation是CompositeChannelBuffer,当之前还有buffer没处理完时,与这次的buffer合并一起进行解码

    protected ChannelBuffer appendToCumulation(ChannelBuffer input) {
        ChannelBuffer cumulation = this.cumulation;
        assert cumulation.readable();
        if (cumulation instanceof CompositeChannelBuffer) {
            // Make sure the resulting cumulation buffer has no more than the configured components.
            CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
            if (composite.numComponents() >= maxCumulationBufferComponents) {
                cumulation = composite.copy();
            }
        }

        this.cumulation = input = ChannelBuffers.wrappedBuffer(cumulation, input);
        return input;
    }
另外一个函数时finally里面的updateCumulation()

    protected ChannelBuffer updateCumulation(ChannelHandlerContext ctx, ChannelBuffer input) {
        ChannelBuffer newCumulation;
        int readableBytes = input.readableBytes();
        if (readableBytes > 0) {
            int inputCapacity = input.capacity();

            // If input.readableBytes() == input.capacity() (i.e. input is full),
            // there‘s nothing to save from creating a new cumulation buffer
            // even if input.capacity() exceeds the threshold, because the new cumulation
            // buffer will have the same capacity and content with input.
            if (readableBytes < inputCapacity && inputCapacity > copyThreshold) {
                // At least one byte was consumed by callDecode() and input.capacity()
                // exceeded the threshold.
                cumulation = newCumulation = newCumulationBuffer(ctx, input.readableBytes());
                cumulation.writeBytes(input);
            } else {
                // Nothing was consumed by callDecode() or input.capacity() did not
                // exceed the threshold.
                if (input.readerIndex() != 0) {
                    cumulation = newCumulation = input.slice();
                } else {
                    cumulation = newCumulation = input;
                }
            }
        } else {
            cumulation = newCumulation = null;
        }
        return newCumulation;
    }

在这个函数里面会对cumulation进行操作修改。这个是在finally,也就是对剩余的数据进行暂时的存取,因为不管是这个buffer比一帧数据大或者小,都不能只是一帧数据,少于一帧不能解码出来,大于一帧势必会有一部分需要暂时存取


郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。