Java NIO 笔记

NIO(同步非阻塞IO)是jdk1.4之后推出的new io,它跟BIO(同步阻塞IO)有着非常明显的区别。

BIO在调用read/write的时候会阻塞线程,也就是就算某个时刻你的socket并没有数据需要传输,但是你的socket线程却仍然会被阻塞在read/write方法上,所以BIO是一个socket连接一个线程。

NIO与BIO不同,它主要依靠事件监听反应器进行工作,一个监听器可以监听好几个socket连接,只有在socket有事件发生(如读写数据,连接到达等)的时候才进行事件分发,开启线程去处理事件(一个请求一个线程),所以在高并发的时候NIO是优于BIO的。

NIO主要可以分为四个核心模块,分别是Buffer(数据缓冲区),Channel(数据通道),Selector(监听器),Unicode(字符集)。

Buffer(数据缓冲区)在NIO中扮演着数据临时存储的角色,对数据的写入写出都必须先经过Buffer缓冲区。Buffer的实现简单来说就是基于Java底层数组,并添加了特定的行为和属性的对象封装。Buffer使用数组进行数据存储,但它与一般的数组操作并不一样,下面我们来看看Buffer特有的一些属性和行为。这里借用一张图片说明一下缓冲区的作用:

技术分享

Buffer缓冲区主要属性有三个,分别是position(下一次可操作指针),limit(可操作指针上限),capacity(容量)。

主要方法有四个,分别是clear(清空缓冲区),flip(把缓冲区状态改为写状态),put(向缓冲区写入数据),get(从缓冲区读取数据)。

Buffer初始化的时候会根据指定的容量构造一个定长数组,position设为0,capacity和limit分别设为容量减一。而调用clear方法的操作是position=0,limit=capacity。调用flip方法的操作是limit=position,position=0。put、get方法的操作将在下面用实例来说明。

Buffer缓冲区区别于数组的另一个体现在于Buffer是有状态的,写入和读取。当要进行数据写入时,通常我们都会先调用clear()方法,保证缓冲区有充足的空间写入数据,然后调用put()方法,比如向一个容量为1024的缓冲区写入’abc’,position指针会前移,写入完成后position=3,而limit以及capacity不变。数据写入后肯定需要读取,这时要调用flip()方法使缓冲区从写入状态变成读取状态。然后我们读取数据,调用get()方法,比如读取’a’,首先判断position<limt,如果为true,获取数组中下标==position的元素,position前移加一;如果判断结果为false,则抛出异常。这里我们要注意的一点是,我们读取数据改变的是position这个指针,数组里面的元素是没有改变的。为什么要特别强调这一点,这是因为Buffer缓冲区有一个方法,array()方法,这个方法是直接获取缓冲区的底层数组,当调用这个方法时候其实我们的操作对象已经不是缓冲区,变成了直接对数组操作。这样一来,缓冲区的三个重要指针就不起作用了,比如我们刚才写入了abc,读取之后再写入数据def,然后进行读取,本来我们希望读取当然是刚才写入的def,但是这时候底层数组存储的其实是abcdef,我们使用array()方法读取的时候就出现垃圾数据(abc),所以慎用array方法,一般建议不要脱离缓冲区独立操作其底层数组。如果看不太明白的同学可以看看这片文章http://www.ibm.com/developerworks/cn/education/java/j-nio/j-nio.html,这里只是简单记录一下笔记。

Buffer缓冲区还有几个特性能大幅度提高IO性能。例如汇聚/发散特性,这个特性简单来说就是Channel在一次读写中对一组缓冲区进行操作,一个缓冲区满了之后自动读写下一个缓冲区,借用图片说明:

技术分享

 

当然Buffer缓冲区还有其他的特性,比如内存映射文件 I/O ,直接缓冲区/间接缓冲区,文件锁定IO等等,这里就不作详细介绍了,有兴趣的同学可以自己查找一下资料。

 

Channel(数据通道),这个相当于BIO里面的Stream(数据流),但Channel与Stream不同,Channel是双向的,可以向通道两边传输数据,而不用像BIO那样要专门建立一个输入流和一个输出流。Channel相对简单,主要使用的有三个类,FileChannel,ServerSocketChannel以及SocketChannel,主要使用的方法有两个,一个是read(buffer),向缓冲区buffer写入数据;另一个是write(buffer),从缓冲区buffer读取数据写入通道。这里贴一段FileChannel的代码,其他两个类的使用将会在下面记录。

package socket.FileChannel;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

public class FileChannelIO {
    
    private FileInputStream inputStream=null;
    private FileChannel channel=null;
    private FileOutputStream outputStream=null;
    private FileChannel channel2=null;
    private ByteBuffer buffer=ByteBuffer.allocate(1024);//容量为1024的缓冲区
    
    public void exedcute(){
        try {
            inputStream=new FileInputStream("D:/test.txt");
            channel=inputStream.getChannel();//FileChannel需要从文件输入输出流获取通道
            outputStream=new FileOutputStream("D:/test2.txt", false);//文件末尾追加
            channel2=outputStream.getChannel();
            buffer.clear();//清空缓冲区
            while(channel.read(buffer)>0){
                buffer.flip();//更改缓冲区为读取状态
                channel2.write(buffer);//读取缓冲区数据,写入通道,写入文件
                buffer.clear();
            }
            inputStream.close();
            channel.close();
            outputStream.close();
            channel2.close();
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            try {
                inputStream.close();
                channel.close();
                outputStream.close();
                channel2.close();
            } catch (Exception e2) {
                // TODO: handle exception
            }
        }
    }
    
    public static void main(String[] args) {
        new FileChannelIO().exedcute();
    }

}

 

NIO最核心无疑是Selector(监听器),Selector是处理一切事件,进行事件分发的核心。Selector类有几个重要的集合,分别是SelectionKey (通道注册键值)以及它的两个子集readyOps(就绪事件集)interestOps(通道感兴趣集合),还有已选择的键值集合,已取消的键值集合。还有几个重要的方法,如select()(轮询操作系统检查就绪事件),wakeup()(让阻塞在select方法的线程立刻返回)等等。

先介绍一下interestOps(通道感兴趣集合),这个集合一共有四个元素,分别是OP_READ = 1 << 0(读事件),OP_WRITE = 1 << 2(写事件),OP_CONNECT = 1 << 3(连接成功事件),OP_ACCEPT = 1 << 4(接到连接请求事件)。前三个元素无论是ServerSocketChannel或者SocketChannel都可以注册,最后一个元素则唯有ServerSocketChannel可以注册。

至于readyOps(就绪事件集),其实这个集合里面存放的就是interestOps(通道感兴趣集合)的元素,但有一点不同的是,就绪事件是可以累加的,比如说就绪事件集里面可以同时存放OP_READ(读事件)以及OP_WRITE(写事件),但在就绪事件集里面这两个元素并不是分开存放的,而是累加存放,读事件和写事件累加的值就是0011。那么NIO是怎么从这个0011中分离出读事件以及写事件的呢,我们可以看看源码:

public final boolean isReadable() {
    return (readyOps() & OP_READ) != 0;
    }

public final boolean isWritable() {
    return (readyOps() & OP_WRITE) != 0;
    }

 

明显,NIO是采用位与分离就绪事件集中的事件。

接下来我们记录一下怎样把通道注册到Selector监听器,产生SelectionKey (通道注册键值)。这里贴一段代码:

ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking( false );

ServerSocket ss = ssc.socket();
InetSocketAddress address = new InetSocketAddress( ports[i] );
ss.bind( address );

SelectionKey key = ssc.register( selector, SelectionKey.OP_ACCEPT );

 

在注册通道的同时要指定通道感兴趣的事件(如果有多个感兴趣事件可以用|分割),也可以指定通道关联的Buffer缓冲区。

通道注册成功后,我们就可以调用select方法轮询操作系统,查询在通道感兴趣的事件上有多少事件发生,也就是产生了多少就绪事件。select()方法可以说是NIO的重中之重,我们这里详细记录一下。

我们调用select()方法的时候,它的执行流程如下:

1、首先检查已取消的键值集合。如果非空,从其他两个集合中移除已经取消的键,注销相关通道,清空已经取消的键的集合。 

2、检查已注册键值的子集interestOps(通道感兴趣集合),如果自上一次调用select()方法到本次调用select()之间有通道的感兴趣事件发生变化,则更改该通道注册键值的子集interestOps(通道感兴趣集合)。

就绪条件确认后,Java告知底层操作系统就绪条件,底层操作系统开始查询各个通道是否产生就绪事件。这一步可能会阻塞很久,因为底层操作系统只有在查询到有就绪事件发生才会返回,而select()只有接到操作系统的返回值才会执行下一步操作。当然我们可以预先设定一个超时时间,或者调用wakeup()方法让阻塞在select()方法的线程立即返回。

底层操作系统返回就绪事件后,select()方法执行下面操作:

  a: 如果通道的键还没有在已经选择的键的集合(selectedKeys)中,那么键的ready集合将被清空。然后表示操作系统发现的当前通道已经准备好的操作的比特掩码将被设置。 
  b: 否则,一旦通道的键被放入已经选择的键的集合中时,ready集合不会被清除,而是累积。这就是说,如果之前的状态是ready的操作,本次已经不是ready了,但是他的bit位依然表示是ready,不会被清除。 

这里要特别声明一点,返回值,select的返回值说明的是从上一次调用到本次调用,就绪选择的个数。如果上一次就已经是就绪的,那么本次不统计。这是是为何返回为0时,我们continue的原因。

 好了,现在就绪事件集合已经确定,我们就可以分配线程处理各个就绪事件。

while(true){//轮询
    int num=selector.select();
    if(num==0){//自上次select到本次select之间没有就绪通道
        continue;
    }
    Set selectedKeys = selector.selectedKeys();
    Iterator keyIterator = selectedKeys.iterator();
    while(keyIterator.hasNext()) {
        SelectionKey key = (SelectionKey) keyIterator.next();
        keyIterator.remove();
        if(key.isAcceptable()) {
            // a connection was accepted by a ServerSocketChannel.
        } else if (key.isConnectable()) {
            // a connection was established with a remote server.
        } else if (key.isReadable()) {
            // a channel is ready for reading
        } else if (key.isWritable()) {
            // a channel is ready for writing
        }
    }
}

 下面贴一个示例:

server端

package socket.socket3;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * @author JIAN
 */
public class Server {
    
    private Selector selector=null;
    private ServerSocketChannel channel=null;
    private ServerSocket serverSocket=null;
    private final int port=8083;

    public void init(){
        try {
            selector=Selector.open();
            channel=ServerSocketChannel.open();
            channel.configureBlocking(false);
            serverSocket=channel.socket();
            serverSocket.bind(new InetSocketAddress(port));
            channel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public void execute(){
        while(true){//轮询
            try {
                int num=selector.select();
                if(num==0){//自上次select到本次select之间没有就绪通道
                    continue;
                }
                Set<SelectionKey> keys=selector.selectedKeys();
                Iterator<SelectionKey> it=keys.iterator();
                while(it.hasNext()){
                    SelectionKey key=it.next();
                    it.remove();
                    if(key.isAcceptable()){
                        ServerSocketChannel channel=(ServerSocketChannel) key.channel();
                        SocketChannel channel2=channel.accept();
                        channel2.configureBlocking(false);
                        channel2.register(selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024));
                        
                        /*通道数据到达时的触发事件也就是ready集合,主要存放就绪事件的键值,分辨这次选择的通道各自的触发事件
                         * 通道的数据始终没有分别存放,对于每一次触发的事件而言,其数据都是存放在缓冲区,不会被单独分离
                         */
                    }else if(key.isReadable()){
                        key.interestOps(key.interestOps()&(~SelectionKey.OP_READ));
                        //通道兴趣转移,多线程下防止多个线程同时操作同一个通道以及同一组缓冲区,下一次select时生效
                        new Thread(new Handler(key)).start();
                    }else if(key.isWritable()){
                        
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    class Handler implements Runnable{
        
        private SelectionKey key=null;
        
        public Handler(SelectionKey key){
            this.key=key;
        }

        /*
         * 对于ByteBuffer缓冲区的实现:
         * 底层使用Java基本数组,所以基本上每一个缓冲区都带有一个底层实现数组
         * 但是与基本数组不同的地方在于,缓冲区是NIO基于Java基本数组之上进行的一个封装
         * 数据存储还是使用基本数组实现,但缓冲区的这个基本数组被封装了很多功能
         * 首先也是最重要的一点是: 缓冲区的数组操作不是Java基本数组的直接覆盖,缓冲区的操作基于三个参数position,limit以及capacity
         * 缓冲区数组的读写操作主要依靠这三个参数,例如清空缓冲区只是把position设为0,limit设为capacity,缓冲区中的数据其实并没有被覆盖
         * 所以,对于array()这个获取底层数组的method需要慎用,因为可能有一些作废数据被误读
         */
        public void run() {
            SocketChannel channel=(SocketChannel) key.channel();
            ByteBuffer buffer=(ByteBuffer) key.attachment();
            
            try {
                buffer.clear();
                int number=0;
                StringBuilder builder=new StringBuilder();
                while((number=channel.read(buffer))>0){
//                    buffer.array();//获取缓冲区底层数组,两者为实参关系,一个变另一个也变
                    int index=buffer.position();
                    byte[] b=new byte[buffer.position()];
                    for(int i=0;i<index;i++){
                        b[i]=buffer.get(i);
                    }
                    builder.append(new String(b).trim());
                    buffer.clear();
                }
                String message=builder.toString();
                System.out.println(message);
                if(number==-1){
                    System.out.println("通道关闭,注销监听键");
                    channel.close();
                    key.cancel();
                    return;
                }
                buffer.clear();
                buffer.put("服务器成功接收!".getBytes());
                buffer.flip();
                channel.write(buffer);
                key.interestOps(key.interestOps()|SelectionKey.OP_READ);
                //线程执行完毕,通道兴趣恢复,下一次select时生效
                selector.wakeup();//唤醒阻塞线程,执行下一次select方法,让通道兴趣恢复立即生效
            } catch (Exception e) {
                e.printStackTrace();
                try {
                    channel.close();
                    key.cancel();
                } catch (IOException e1) {
                    e1.printStackTrace();
                }
            }
        }
        
    }

    public static void main(String[] args) {
        Server server=new Server();
        server.init();
        server.execute();
    }
}

 客户端:

package socket.socket3;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Client {
    
    private SocketChannel channel=null;
    private Selector selector=null;
    private final int port=8083;
    private final String addr="localhost";
    
    public void init(){
        try {
            selector=Selector.open();
            channel=SocketChannel.open(new InetSocketAddress(addr, port));
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_READ,ByteBuffer.allocate(1024));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public void execute(){
        while(true){//轮询
            try {
                int num=selector.select();
                if(num==0){//自上次select到本次select之间没有就绪通道
                    continue;
                }
                Set<SelectionKey> keys=selector.selectedKeys();
                Iterator<SelectionKey> it=keys.iterator();
                while(it.hasNext()){
                    SelectionKey key=it.next();
                    it.remove();
                    if(key.isReadable()){
                        String message=reader(key);
                        if(message!=null){
                            writer("客户端发送!");
                        }
                    }else if(key.isWritable()){
                        /*
                         * 通道空闲就会触发写事件,所以一般上不会注册写事件,只有在读事件繁重,不能及时执行写操作时才会注册写事件
                         * 写事件执行完毕后一定要及时cancel掉,否则通道空闲会无限触发写事件
                         */
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    public String reader(SelectionKey key){
        SocketChannel channel=(SocketChannel) key.channel();
        ByteBuffer buffer=(ByteBuffer) key.attachment();
        
        try {
            buffer.clear();
            int number=0;
            StringBuilder builder=new StringBuilder();
            while((number=channel.read(buffer))>0){
                int index=buffer.position();
                byte[] b=new byte[buffer.position()];
                for(int i=0;i<index;i++){
                    b[i]=buffer.get(i);
                }
                builder.append(new String(b).trim());
                buffer.clear();
            }
            String message=builder.toString();
            System.out.println(message);
            if(number==-1){
                System.out.println("通道关闭,注销监听键");
                channel.close();
                key.cancel();
                return null;
            }
            return message;
        } catch (Exception e) {
            e.printStackTrace();
            try {
                channel.close();
                key.cancel();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
            return null;
        }
    }
    
    public void writer(String str){
        ByteBuffer buffer=ByteBuffer.allocate(1024);
        try {
            buffer.clear();
            buffer.put(str.getBytes());
            buffer.flip();
            int number=channel.write(buffer);
            if(number==-1){
                channel.close();
            }
        } catch (Exception e) {
            try {
                channel.close();
            } catch (IOException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        Client client=new Client();
        client.init();
        client.writer("客户端初始发送!");
        client.execute();
    }

}

 有些同学或许会疑惑,当数据到达时会触发读事件,在我们处理读事件的过程中肯定会有一部分数据在通道没来得及处理,那不是会一直触发读事件?关于这点我们可以分两种情况讨论。第一种,单线程处理,很明显,如果你采用单线程处理,程序是会顺序执行,只有当你执行完读操作,才会进行下一次select()方法,这时数据早已处理完毕,肯定不会重复触发事件。第二种,多线程处理,上面的例子其实已经说明了,多线程处理的时候肯定要考虑同步的问题,当一个线程进入处理某个通道及其缓冲区的时候首先要做就是把通道的兴趣转移,不让同类事件触发,直至线程处理完毕,这同样也不会重复触发事件。

 

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。