Netty4.x中文教程系列(四) 对象传输

 

 Netty4.x中文教程系列(四)  对象传输 

  我们在使用netty的过程中肯定会遇到传输对象的情况,Netty4通过ObjectEncoder和ObjectDecoder来支持。

  首先我们定义一个User对象,一定要实现Serializable接口:

package mjorcen.netty.object;

import java.io.Serializable;

/**
 * User: hupeng Date: 14-6-3 Time: 上午1:31
 */
public class User implements Serializable {

    private int id;

    private String name;

    private String cardNo;

    private String description;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getCardNo() {
        return cardNo;
    }

    public void setCardNo(String cardNo) {
        this.cardNo = cardNo;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    @Override
    public String toString() {
        return "User{" + "id=" + id + ", name=‘" + name + ‘\‘‘ + ", cardNo=‘"
                + cardNo + ‘\‘‘ + ", description=‘" + description + ‘\‘‘ + ‘}‘;
    }
}

服务端和客户端里,我们自定义的Handler实现如下:

server

package mjorcen.netty.object;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class ObjectTranferServer {

    private final int port;

    public ObjectTranferServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new ObjectEncoder(),
                                    new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)),
                                    new ObjectTransferServerHandler());
                        }
                    });

            // Bind and start to accept incoming connections.
            b.bind(port).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 11000;
        }
        new ObjectTranferServer(port).run();
    }
}

serverHandler

package mjorcen.netty.object;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.logging.Level;
import java.util.logging.Logger;

public class ObjectTransferServerHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = Logger
            .getLogger(ObjectTransferServerHandler.class.getName());

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println(msg);
        ctx.writeAndFlush(msg);

    }

    // @Override
    // public void channelReadComplete(ChannelHandlerContext ctx) throws
    // Exception {
    // ctx.flush();
    // ctx.close();
    // }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        logger.log(Level.WARNING, "Unexpected exception from downstream.",
                cause);
        ctx.close();
    }

}

 

client

package mjorcen.netty.object;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class ObjectTransferClient {
    private String host;

    private int port;

    private int messageSize;


    public ObjectTransferClient(String host, int port, int messageSize) {
        this.host = host;
        this.port = port;
        this.messageSize = messageSize;
    }

    public void run() throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            bootstrap.group(eventLoopGroup)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new ObjectEncoder(),
                                    new ObjectDecoder(Integer.MAX_VALUE ,ClassResolvers.cacheDisabled(null)),
                                    new ObjectTransferClientHandler(messageSize));
                        }
                    });

            ChannelFuture future = bootstrap.connect(host, port).sync();

            future.channel().closeFuture().sync();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws Exception {
        final String host = "localhost";
        final int port = 11000;
        final int messageSize = 20;

        new ObjectTransferClient(host, port, messageSize).run();
    }
}

clientHandler

package mjorcen.netty.object;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ObjectTransferClientHandler extends ChannelInboundHandlerAdapter {

    private static final Logger logger = Logger
            .getLogger(ObjectTransferClientHandler.class.getName());

    private final List<User> message;

    /**
     * Creates a client-side handler.
     */
    public ObjectTransferClientHandler(int messageSize) {
        if (messageSize <= 0) {
            throw new IllegalArgumentException("firstMessageSize: "
                    + messageSize);
        }
        message = new ArrayList<User>(messageSize);
        for (int i = 0; i < messageSize; i++) {
            User user = new User();
            user.setId(i);
            user.setCardNo("420000" + i);
            user.setName("hu" + i);
            user.setDescription("你觉得这样好吗??真的好吗" + i);
            message.add(user);
         
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // Send the message to Server
        super.channelActive(ctx);
        ctx.writeAndFlush(message);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        // you can use the Object from Server here
        System.out.println(msg);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        logger.log(Level.WARNING, "Unexpected exception from downstream.",
                cause);
        ctx.close();
    }
}

 

简单梳理一下思路:

  1. 通过Netty传递,都需要基于流,以ChannelBuffer的形式传递。所以,Object -> ChannelBuffer.
  2. Netty提供了转换工具,需要我们配置到Handler。
  3. 样例从客户端 -> 服务端,单向发消息,所以在客户端配置了编码,服务端解码。如果双向收发,则需要全部配置Encoder和Decoder。

  这里需要注意,注册到Server的Handler是有顺序的,如果你颠倒一下注册顺序:

  结果就是,会先进入我们自己的业务,再进行解码。这自然是不行的,会强转失败。至此,你应该会用Netty传递对象了吧。

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。