nio client和netty server实例
花了一周时间,研究了java里面的nio和netty,其实nio很好理解,用过c语言的,都应该知道select和epoll,nio和select和epoll非常类似,使用方法和解决的问题也都是一样的。
至于netty,不得不钦佩java语言的框架技术,虽说这个框架研究起来非常费劲,但是对于上层使用者,使用这个netty框架,会帮我们解决很多性能、稳定性问题。同时,使用框架,也会大大提高开发效率。
这里,不想讲太多关于nio和netty的东西,所有最基本的知识点,都在如下学习资料中。目前我对这个netty框架研究的还不深入,想了半天其实真写不出啥有水平的文章,待今后深入研究后,将学习成果再和大家汇报。
学习资料:
NIO:http://www.iteye.com/magazines/132-Java-NIO#590
Netty:http://docs.jboss.org/netty/3.1/guide/html_single/
多线程:http://www.cnblogs.com/dolphin0520/p/3932921.html
这里给出一个用nio实现的tcp client、用netty实现的一个tcp server的例子。
处理过程为:client传递a、b两个整型数,server计算和,将结果返回给客户端。在服务端加入线程池,用来处理两个数的和。当然了从性能角度,目前这么简单的操作完全没有必要这样做。主要考虑到如果有更复杂的操作,一般的服务端的模型都是将任务传入一个消息队列,后端再用线程池从消息队列中取出任务进行处理,再返回处理结果。所以这个地方的线程池,可以认为是以后的扩展,也可以认为其就是个摆设。
client 代码
NioClient.java
package nio.client.test; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.IntBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Map; import java.util.Set; public class NioClient { private final static int MAX_BUF_SIZE = 1024; private InetSocketAddress serverAddr; private int clientCount; public NioClient(String ip, int port, int clientCount) { this.clientCount = clientCount; this.serverAddr = new InetSocketAddress(ip, port); } private void sendMessageToSrv(SocketChannel sockChnl, int clientNo, int index) throws IOException { // send data to server... /* ByteBuffer sendBuf = ByteBuffer.allocate(MAX_BUF_SIZE); String sendText = "Client " + clientNo + " say " + index + "\r\n"; sendBuf.put(sendText.getBytes()); sendBuf.flip(); sockChnl.write(sendBuf); System.out.println(sendText);*/ ByteBuffer sendBuf = ByteBuffer.allocate(4*4); sendBuf.putInt(clientNo); sendBuf.putInt(index); sendBuf.putInt(clientNo); sendBuf.putInt(index); sendBuf.flip(); sockChnl.write(sendBuf); String out = String.format("client: %d send message, index: %d, a: %d, b: %d", clientNo, index, clientNo, index); System.out.println(out); } private void recvMessage(SocketChannel sockChnl, int clientNo) throws IOException { /*ByteBuffer recvBuf = ByteBuffer.allocate(MAX_BUF_SIZE); int bytesRead = sockChnl.read(recvBuf); while (bytesRead > 0) { recvBuf.flip(); // write mode to read mode, position to 0, // limit to position String recvText = new String(recvBuf.array(), 0, bytesRead); recvBuf.clear(); // clear buffer content, read mode to write mode, position to 0, limit to capacity System.out.println("Client " + clientNo + " receive: " + recvText); bytesRead = sockChnl.read(recvBuf); }*/ ByteBuffer recvBuf = ByteBuffer.allocate(MAX_BUF_SIZE); int bytesRead = sockChnl.read(recvBuf); while (bytesRead > 0) { recvBuf.flip(); // write mode to read mode, position to 0, // limit to position int result = recvBuf.getInt(); recvBuf.clear(); // clear buffer content, read mode to write mode, position to 0, limit to capacity String out = String.format("client: %d recv message, result: %d", clientNo, result); System.out.println(out); bytesRead = sockChnl.read(recvBuf); } } public void startNioClient() throws IOException, InterruptedException { Selector selector = Selector.open(); for (int i = 0; i < clientCount; i++) { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); Map<String, Integer> clientInfo = new HashMap<String, Integer>(); clientInfo.put("no", i); clientInfo.put("index", 0); socketChannel.register(selector, SelectionKey.OP_CONNECT, clientInfo); socketChannel.connect(this.serverAddr); } while (true) { int readyChannels = selector.select(); if (0 == readyChannels) { continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey sk : selectionKeys) { Map clientInfo = (Map) sk.attachment(); int clientNo = (Integer) clientInfo.get("no"); SocketChannel socketchnl = (SocketChannel) sk.channel(); if (sk.isConnectable()) { while(!socketchnl.finishConnect()) { Thread.sleep(5); } if (socketchnl.isConnected()) { System.out.println("connect is finish..."); // send data to server... sendMessageToSrv(socketchnl, clientNo, -1); sk.interestOps(SelectionKey.OP_READ); } } else if (sk.isReadable()) { // read data from server... recvMessage(socketchnl, clientNo); // send data to server... int index = (Integer) clientInfo.get("index"); index += 1; sendMessageToSrv(socketchnl, clientNo, index); clientInfo.put("index", index); } } selectionKeys.clear(); } } public int getClientCount() { return clientCount; } public void setClientCount(int clientCount) { this.clientCount = clientCount; } }
Main.java
package nio.client.test; import java.io.IOException; public class Main { public static void main(String[] args) { System.out.println("clients start.............."); NioClient client = new NioClient("127.0.0.1", 8080, 5000); try { client.startNioClient(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
server端代码:
NettyServer.java
package com.bj58.nettyTest; import java.net.InetSocketAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFactory; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; public class NettyServer { static final ChannelGroup allChannels = new DefaultChannelGroup("time-server"); public static void main(String[] args) throws Exception { ExecutorService threadPool = Executors.newFixedThreadPool(5); ChannelFactory factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); ServerBootstrap bootstrap = new ServerBootstrap(factory); bootstrap.setPipelineFactory(new ServerPipelineFactory(threadPool)); bootstrap.setOption("child.tcpNoDelay", true); bootstrap.setOption("child.keepAlive", true); System.out.println("Netty Server start..."); Channel channel = bootstrap.bind(new InetSocketAddress(8080)); /* allChannels.add(channel); System.out.println("1111111111111111111"); Thread.sleep(2*60*1000); System.out.println("2222222222222222222"); ChannelGroupFuture future = allChannels.close(); future.awaitUninterruptibly(); factory.releaseExternalResources(); System.out.println("3333333333333333333");*/ } }
ServerDecoder.java
package com.bj58.nettyTest; import java.util.ArrayList; import java.util.List; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.frame.FrameDecoder; public class ServerDecoder extends FrameDecoder{ @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { if (buffer.readableBytes() < 8) { return null; } int clientNo = buffer.readInt(); int index = buffer.readInt(); int a = buffer.readInt(); int b = buffer.readInt(); List<Integer> data = new ArrayList<Integer>(); data.add(clientNo); data.add(index); data.add(a); data.add(b); return data; } }
ServerHandler.java
package com.bj58.nettyTest; import java.util.List; import java.util.concurrent.ExecutorService; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelHandler; public class ServerHandler extends SimpleChannelHandler { private ExecutorService threadPool; public ServerHandler(ExecutorService threadPool) { this.threadPool = threadPool; } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws InterruptedException { /* * ChannelBuffer buf = (ChannelBuffer)e.getMessage(); byte[] des = * buf.array(); String recvText = new String(des, 0, des.length); * System.out.println(recvText); Channel ch = e.getChannel(); * ch.write(e.getMessage()); */ List<Integer> data = (List<Integer>) e.getMessage(); HandleTask ht = new HandleTask(e); threadPool.submit(ht); int clientNo = data.get(0); int index = data.get(1); int a = data.get(2); int b = data.get(3); String content = String.format("client: %d, index: %d, a: %d, b: %d", clientNo, index, a, b); System.out.println(content); Channel ch = e.getChannel(); ChannelBuffer buf = ChannelBuffers.buffer(4); buf.writeInt(a+b); ch.write(buf); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } } class HandleTask implements Runnable { MessageEvent e; public HandleTask(MessageEvent e) { this.e = e; } public void run() { List<Integer> data = (List<Integer>) e.getMessage(); int clientNo = data.get(0); int index = data.get(1); int a = data.get(2); int b = data.get(3); String content = String.format("client: %d, index: %d, a: %d, b: %d", clientNo, index, a, b); System.out.println(content); Channel ch = e.getChannel(); ChannelBuffer buf = ChannelBuffers.buffer(4); buf.writeInt(a+b); ch.write(buf); } }
ServerPipelineFactory.java
package com.bj58.nettyTest; import static org.jboss.netty.channel.Channels.pipeline; import java.util.concurrent.ExecutorService; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; public class ServerPipelineFactory implements ChannelPipelineFactory{ private ExecutorService threadPool; public ServerPipelineFactory(ExecutorService threadPool) { this.threadPool = threadPool; } public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); pipeline.addLast("framer", new ServerDecoder()); pipeline.addLast("handler", new ServerHandler(threadPool)); return pipeline; } }
代码理解:
客户端可以同时启动N个tcp client,同时连接一个tcp server,传递a/b两个数,获取a/b之和。
当代码写完之后,发现通过java的序列化技术,可以直接传递一个java 对象,这样一来,发送和接收端,处理起来会更简单一些,而且实际项目中,传递的数据要比这个复杂的多。
接下来研究一下java的序列化技术、netty如何传递对象、以及Google protobuf,给出一个完整的rpc的代码例子。
郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。