nio client和netty server实例

花了一周时间,研究了java里面的nio和netty,其实nio很好理解,用过c语言的,都应该知道select和epoll,nio和select和epoll非常类似,使用方法和解决的问题也都是一样的。

至于netty,不得不钦佩java语言的框架技术,虽说这个框架研究起来非常费劲,但是对于上层使用者,使用这个netty框架,会帮我们解决很多性能、稳定性问题。同时,使用框架,也会大大提高开发效率。

这里,不想讲太多关于nio和netty的东西,所有最基本的知识点,都在如下学习资料中。目前我对这个netty框架研究的还不深入,想了半天其实真写不出啥有水平的文章,待今后深入研究后,将学习成果再和大家汇报。

学习资料:

NIO:http://www.iteye.com/magazines/132-Java-NIO#590

Netty:http://docs.jboss.org/netty/3.1/guide/html_single/

多线程:http://www.cnblogs.com/dolphin0520/p/3932921.html

这里给出一个用nio实现的tcp client、用netty实现的一个tcp server的例子。

处理过程为:client传递a、b两个整型数,server计算和,将结果返回给客户端。在服务端加入线程池,用来处理两个数的和。当然了从性能角度,目前这么简单的操作完全没有必要这样做。主要考虑到如果有更复杂的操作,一般的服务端的模型都是将任务传入一个消息队列,后端再用线程池从消息队列中取出任务进行处理,再返回处理结果。所以这个地方的线程池,可以认为是以后的扩展,也可以认为其就是个摆设。

client 代码

NioClient.java

package nio.client.test;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class NioClient {
	private final static int MAX_BUF_SIZE 					= 1024;
	private InetSocketAddress serverAddr;
	private int clientCount;
	
	public NioClient(String ip, int port, int clientCount) {
		this.clientCount 	= clientCount;
		this.serverAddr 	= new InetSocketAddress(ip, port);
	}
	
	private void sendMessageToSrv(SocketChannel sockChnl, int clientNo, int index) throws IOException {
		// send data to server...
/*		ByteBuffer sendBuf = ByteBuffer.allocate(MAX_BUF_SIZE);
		String sendText = "Client " + clientNo + " say " + index + "\r\n";
		sendBuf.put(sendText.getBytes());
		sendBuf.flip();
		sockChnl.write(sendBuf);
		System.out.println(sendText);*/
		
		ByteBuffer sendBuf = ByteBuffer.allocate(4*4);
		sendBuf.putInt(clientNo);
		sendBuf.putInt(index);
		sendBuf.putInt(clientNo);
		sendBuf.putInt(index);
		sendBuf.flip();
		
		sockChnl.write(sendBuf);
		String out = String.format("client: %d send message, index: %d, a: %d, b: %d", clientNo, index, clientNo, index);
		System.out.println(out);
	}
	
	private void recvMessage(SocketChannel sockChnl, int clientNo) throws IOException {
		/*ByteBuffer recvBuf = ByteBuffer.allocate(MAX_BUF_SIZE);
		int bytesRead = sockChnl.read(recvBuf);
		while (bytesRead > 0) {
			recvBuf.flip(); // write mode to read mode, position to 0, // limit to position
			String recvText = new String(recvBuf.array(), 0, bytesRead);
			recvBuf.clear(); // clear buffer content, read mode to write mode, position to 0, limit to capacity
			System.out.println("Client " + clientNo + " receive: " + recvText);
			bytesRead = sockChnl.read(recvBuf);
		}*/
		
		ByteBuffer recvBuf = ByteBuffer.allocate(MAX_BUF_SIZE);
		int bytesRead = sockChnl.read(recvBuf);
		while (bytesRead > 0) {
			recvBuf.flip(); // write mode to read mode, position to 0, // limit to position
			int result = recvBuf.getInt();
			recvBuf.clear(); // clear buffer content, read mode to write mode, position to 0, limit to capacity
			String out = String.format("client: %d recv message, result: %d", clientNo, result);
			System.out.println(out);
			bytesRead = sockChnl.read(recvBuf);
		}
	}

	public void startNioClient() throws IOException, InterruptedException {
		Selector selector = Selector.open();

		for (int i = 0; i < clientCount; i++) {
			SocketChannel socketChannel = SocketChannel.open();
			socketChannel.configureBlocking(false);
			Map<String, Integer> clientInfo = new HashMap<String, Integer>();
			clientInfo.put("no", i);
			clientInfo.put("index", 0);
			socketChannel.register(selector, SelectionKey.OP_CONNECT, clientInfo);
			socketChannel.connect(this.serverAddr);
		}

		while (true) {
			int readyChannels = selector.select();
			if (0 == readyChannels) {
				continue;
			}

			Set<SelectionKey> selectionKeys = selector.selectedKeys();
			for (SelectionKey sk : selectionKeys) {
				Map clientInfo = (Map) sk.attachment(); 
				int clientNo = (Integer) clientInfo.get("no");
				SocketChannel socketchnl = (SocketChannel) sk.channel();
				
				if (sk.isConnectable()) {
					while(!socketchnl.finishConnect()) {
						Thread.sleep(5);
					}
					if (socketchnl.isConnected()) {
                        System.out.println("connect is finish...");
                        // send data to server...
    					sendMessageToSrv(socketchnl, clientNo, -1);
    					sk.interestOps(SelectionKey.OP_READ);
					}
				} else if (sk.isReadable()) {
					// read data from server...
					recvMessage(socketchnl, clientNo);
					
					// send data to server...
					int index = (Integer) clientInfo.get("index");
					index += 1;
					sendMessageToSrv(socketchnl, clientNo, index);		
					clientInfo.put("index", index);
				}
			}
			selectionKeys.clear();
		}
	}

	public int getClientCount() {
		return clientCount;
	}

	public void setClientCount(int clientCount) {
		this.clientCount = clientCount;
	}
}

Main.java

package nio.client.test;

import java.io.IOException;

public class Main {
	public static void main(String[] args) {
		System.out.println("clients start..............");
		NioClient client = new NioClient("127.0.0.1", 8080, 5000);
		try {
			client.startNioClient();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

server端代码:

NettyServer.java

package com.bj58.nettyTest;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class NettyServer {

	static final ChannelGroup allChannels = new DefaultChannelGroup("time-server");
	
	public static void main(String[] args) throws Exception {
	    ExecutorService threadPool = Executors.newFixedThreadPool(5);
	    
		ChannelFactory factory = new NioServerSocketChannelFactory(
				Executors.newCachedThreadPool(),
				Executors.newCachedThreadPool());

		ServerBootstrap bootstrap = new ServerBootstrap(factory);

		bootstrap.setPipelineFactory(new ServerPipelineFactory(threadPool));
		
		bootstrap.setOption("child.tcpNoDelay", true);
		bootstrap.setOption("child.keepAlive", true);

		System.out.println("Netty Server start...");
		Channel channel = bootstrap.bind(new InetSocketAddress(8080));
		
/*		allChannels.add(channel);
		System.out.println("1111111111111111111");
		Thread.sleep(2*60*1000);
		System.out.println("2222222222222222222");
        ChannelGroupFuture future = allChannels.close();
        future.awaitUninterruptibly();
        factory.releaseExternalResources();
        System.out.println("3333333333333333333");*/
		
	}
}

ServerDecoder.java

package com.bj58.nettyTest;

import java.util.ArrayList;
import java.util.List;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

public class ServerDecoder extends FrameDecoder{
    @Override
    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
        if (buffer.readableBytes() < 8) {
            return null;
        }
        
        int clientNo = buffer.readInt();
        int index = buffer.readInt();
        int a = buffer.readInt();
        int b = buffer.readInt();

        List<Integer> data = new ArrayList<Integer>();
        data.add(clientNo);
        data.add(index);
        data.add(a);
        data.add(b);
        
        return data;
    }
}

ServerHandler.java

package com.bj58.nettyTest;

import java.util.List;
import java.util.concurrent.ExecutorService;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;



public class ServerHandler extends SimpleChannelHandler {

    private ExecutorService threadPool;

    public ServerHandler(ExecutorService threadPool) {
        this.threadPool = threadPool;
    }
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws InterruptedException {
        /*
         * ChannelBuffer buf = (ChannelBuffer)e.getMessage(); byte[] des =
         * buf.array(); String recvText = new String(des, 0, des.length);
         * System.out.println(recvText); Channel ch = e.getChannel();
         * ch.write(e.getMessage());
         */
        
        List<Integer> data = (List<Integer>) e.getMessage();
        HandleTask ht = new HandleTask(e);
        threadPool.submit(ht);
        
        int clientNo = data.get(0);
        int index = data.get(1);
        int a = data.get(2);
        int b = data.get(3);

        String content = String.format("client: %d, index: %d, a: %d, b: %d", clientNo, index, a, b);
        System.out.println(content);
        
        Channel ch = e.getChannel();
        ChannelBuffer buf = ChannelBuffers.buffer(4);
        buf.writeInt(a+b);
        ch.write(buf);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();

        Channel ch = e.getChannel();
        ch.close();
    }
}

class HandleTask implements Runnable {
    MessageEvent e;
    
    public HandleTask(MessageEvent e) {
        this.e = e;
    }
    
    public void run() {
        List<Integer> data = (List<Integer>) e.getMessage();
        
        int clientNo = data.get(0);
        int index = data.get(1);
        int a = data.get(2);
        int b = data.get(3);

        String content = String.format("client: %d, index: %d, a: %d, b: %d", clientNo, index, a, b);
        System.out.println(content);
        
        Channel ch = e.getChannel();
        ChannelBuffer buf = ChannelBuffers.buffer(4);
        buf.writeInt(a+b);
        ch.write(buf);
    }
}

ServerPipelineFactory.java

package com.bj58.nettyTest;

import static org.jboss.netty.channel.Channels.pipeline;

import java.util.concurrent.ExecutorService;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;

public class ServerPipelineFactory implements ChannelPipelineFactory{   
    
    private ExecutorService threadPool;

    public ServerPipelineFactory(ExecutorService threadPool) {
        this.threadPool = threadPool;
    }
    
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = pipeline();

        pipeline.addLast("framer", new ServerDecoder());
        pipeline.addLast("handler", new ServerHandler(threadPool));
        return pipeline;
    }
}


代码理解:

客户端可以同时启动N个tcp client,同时连接一个tcp server,传递a/b两个数,获取a/b之和。

当代码写完之后,发现通过java的序列化技术,可以直接传递一个java 对象,这样一来,发送和接收端,处理起来会更简单一些,而且实际项目中,传递的数据要比这个复杂的多。

接下来研究一下java的序列化技术、netty如何传递对象、以及Google protobuf,给出一个完整的rpc的代码例子。




郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。