Netty4.x中文教程系列(四) 对象传输
Netty4.x中文教程系列(四) 对象传输
我们在使用netty的过程中肯定会遇到传输对象的情况,Netty4通过ObjectEncoder和ObjectDecoder来支持。
首先我们定义一个User对象,一定要实现Serializable接口:
package mjorcen.netty.object; import java.io.Serializable; /** * User: hupeng Date: 14-6-3 Time: 上午1:31 */ public class User implements Serializable { private int id; private String name; private String cardNo; private String description; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getCardNo() { return cardNo; } public void setCardNo(String cardNo) { this.cardNo = cardNo; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } @Override public String toString() { return "User{" + "id=" + id + ", name=‘" + name + ‘\‘‘ + ", cardNo=‘" + cardNo + ‘\‘‘ + ", description=‘" + description + ‘\‘‘ + ‘}‘; } }
服务端和客户端里,我们自定义的Handler实现如下:
server
package mjorcen.netty.object; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class ObjectTranferServer { private final int port; public ObjectTranferServer(int port) { this.port = port; } public void run() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new ObjectEncoder(), new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)), new ObjectTransferServerHandler()); } }); // Bind and start to accept incoming connections. b.bind(port).sync().channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 11000; } new ObjectTranferServer(port).run(); } }
serverHandler
package mjorcen.netty.object; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.logging.Level; import java.util.logging.Logger; public class ObjectTransferServerHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger .getLogger(ObjectTransferServerHandler.class.getName()); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); ctx.writeAndFlush(msg); } // @Override // public void channelReadComplete(ChannelHandlerContext ctx) throws // Exception { // ctx.flush(); // ctx.close(); // } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.log(Level.WARNING, "Unexpected exception from downstream.", cause); ctx.close(); } }
client
package mjorcen.netty.object; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class ObjectTransferClient { private String host; private int port; private int messageSize; public ObjectTransferClient(String host, int port, int messageSize) { this.host = host; this.port = port; this.messageSize = messageSize; } public void run() throws InterruptedException { Bootstrap bootstrap = new Bootstrap(); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new ObjectEncoder(), new ObjectDecoder(Integer.MAX_VALUE ,ClassResolvers.cacheDisabled(null)), new ObjectTransferClientHandler(messageSize)); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); future.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { final String host = "localhost"; final int port = 11000; final int messageSize = 20; new ObjectTransferClient(host, port, messageSize).run(); } }
clientHandler
package mjorcen.netty.object; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.ArrayList; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; public class ObjectTransferClientHandler extends ChannelInboundHandlerAdapter { private static final Logger logger = Logger .getLogger(ObjectTransferClientHandler.class.getName()); private final List<User> message; /** * Creates a client-side handler. */ public ObjectTransferClientHandler(int messageSize) { if (messageSize <= 0) { throw new IllegalArgumentException("firstMessageSize: " + messageSize); } message = new ArrayList<User>(messageSize); for (int i = 0; i < messageSize; i++) { User user = new User(); user.setId(i); user.setCardNo("420000" + i); user.setName("hu" + i); user.setDescription("你觉得这样好吗??真的好吗" + i); message.add(user); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // Send the message to Server super.channelActive(ctx); ctx.writeAndFlush(message); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // you can use the Object from Server here System.out.println(msg); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.log(Level.WARNING, "Unexpected exception from downstream.", cause); ctx.close(); } }
简单梳理一下思路:
- 通过Netty传递,都需要基于流,以ChannelBuffer的形式传递。所以,Object -> ChannelBuffer.
- Netty提供了转换工具,需要我们配置到Handler。
- 样例从客户端 -> 服务端,单向发消息,所以在客户端配置了编码,服务端解码。如果双向收发,则需要全部配置Encoder和Decoder。
这里需要注意,注册到Server的Handler是有顺序的,如果你颠倒一下注册顺序:
结果就是,会先进入我们自己的业务,再进行解码。这自然是不行的,会强转失败。至此,你应该会用Netty传递对象了吧。
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。