Java-NIO-Selector

扩展阅读:

Java NIO类库Selector机制解析(上)

Java NIO类库Selector机制解析(下)

 

 Java NIO的选择器部分,实际上有三个重要的类。 
1,Selector 选择器,完成主要的选择功能。select(), 并保存有注册到他上面的通道集合。 
2,SelectableChannel 可被注册到Selector上的通道。 
3,SelectionKey 描述一个Selector和SelectableChannel的关系。并保存有通道所关心的操作。 

接下来,便是一个通用的流程。 
首先, 创建选择器, 
然后,注册通道, 
其次,选择就绪通道, 
最后,处理已就绪通道数据。 

 

package socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOServer2 {

    private void startServer() throws IOException {
        Selector selector = Selector.open();
        
        {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ServerSocket ss = ssc.socket();
            InetSocketAddress address = new InetSocketAddress(9000);
            ss.bind(address);
            
            System.out.println("ssc 0 : " + ssc);
            System.out.println("ss 0 : " + ss);
            
            SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("acceptKey: " + acceptKey);
            printKeyInfo(acceptKey);
            System.out.println("Going to listen on 9000");
        }
        
        while (true) {
            System.out.println("===================================\nstart select...");
            int num = selector.select();
            System.out.println("NIOServer: Number of keys after select operation: " + num);
            
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectionKeys.iterator();
            
            while (it.hasNext()) {
                SelectionKey key = it.next();
                System.out.println("key: " + key);
                printKeyInfo(key);
                
                it.remove();
                
                
                if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                    System.out.println("select ACCEPT");
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    
                    System.out.println("ssc 1 : " + ssc);
                    System.out.println("sc 1 : " + sc);
                    
                    SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);
                    System.out.println("new key:" + newKey);
                    printKeyInfo(newKey);
                }
                else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
//                    System.out.println("select READ");
//                    System.out.print("before cancel:");printKeyInfo(key);
//                    key.cancel();    //这是取消注册,取消之后就select不到了
//                    System.out.println("after cancel:");printKeyInfo(key);
                    SocketChannel sc = (SocketChannel) key.channel();
                    System.out.println("sc 2 : " + sc);
                    
                    //echo data
                    int nbytes = 0;
                    ByteBuffer echoBuffer = ByteBuffer.allocate(1024);
            //必须读取channel中的数据,否则selector中会一直有channel数据到达,不停地在这个else if里面执行
while (true) { echoBuffer.clear(); int r = sc.read(echoBuffer); if (r <= 0) break; echoBuffer.flip(); sc.write(echoBuffer); nbytes += r; } System.out.println("echoed " + nbytes + " from " + sc); }// if ... else if try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } }//while }//while } private static void printKeyInfo(SelectionKey sk) { String s = new String(); s = "Att: " + (sk.attachment() == null ? "no" : "yes"); s += ", Read: " + sk.isReadable(); s += ", Acpt: " + sk.isAcceptable(); s += ", Cnct: " + sk.isConnectable(); s += ", Wrt: " + sk.isWritable(); s += ", Valid: " + sk.isValid(); s += ", interestOps: " + sk.interestOps(); s += ", readyOps: " + sk.readyOps(); System.out.println(s); } public static void main(String[] args) { try { new NIOServer2().startServer(); } catch (IOException e) { e.printStackTrace(); } } }

 

首先,一个selectionKey 包含了两个集合,一个是 注册的感兴趣的操作集合,一个是已经准备好的集合(就绪集合)。第一个集合基本上是注册就确定的,或者通过interestOps(int)来改变。select是不会改变interest集合的。但是select改变的是 ready集合。也就是准备好的感兴趣的操作的集合,这样说,也说明,ready集合实际上是interest集合的子集。 

 

对于SelectionKey, 还可以执行cancel操作,一个被cancel掉的SelectionKey,实际上只是被放到了Selector的cancel键集合里,键马上失效,但是通道依然是注册状态,要等到下一个select时才真正取消注册。 

现在,我们再来看看选择器做了什么。选择器是就绪选择的核心,它包含了注册到它上面的通道与操作关系的Key,它维护了三个集合。 
1,已经注册的键集合 调用, keys() 
2,已经选择的键集合 调用, selectedKeys() 
3,已经取消的键集合 私有。 

选择器虽然封装了select,poll等底层的系统调用,但是她有自己的一套来管理这些键。 
每当select被调用时,她做如下检查: 
1、检查已经取消的键的集合。如果非空,从其他两个集合中移除已经取消的键,注销相关通道,清空已经取消的键的集合。 
2、已注册的键的集合中的键的interest集合被检查。例如有新的interest的操作注册。但是这一步不会影响后面的操作。这是延时到下一次select调用时才会影响的。 
就绪条件确认后,底层系统进行查询。依赖于select方法的参数,如果没有通道准备好,根select带的参数超时设置,可能会阻塞线程。 
系统调用完成后,可以对操作系统指示的已经准备好的interest集合中的一种操作的通道,执行以下操作: 
  a: 如果通道的键还没有在已经选择的键的集合中,那么键的ready集合将被清空。然后表示操作系统发现的当前通道已经准备好的操作的比特掩码将被设置。 
  b: 否则,一旦通道的键被放入已经选择的键的集合中时,ready集合不会被清除,而是累积。这就是说,如果之前的状态是ready的操作,本次已经不是ready了,但是他的bit位依然表示是ready,不会被清除。 
3、步骤2可能会有很长一段时间的休眠。所以在步骤2完成以后,步骤1继续执行以确保被取消的键正确处理。 
4、返回值,select的返回值说明的是从上一次调用到本次调用,就绪选择的个数。如果上一次就已经是就绪的,那么本次不统计。这是是为何返回为0时,我们continue的原因。 

这里使用的延迟注销方法,正是为了解决注销键的问题。如果线程在取消键的同时进行通道注销,那么很可能阻塞并与正在进行的选择操作发生冲突。 

同样我们有3中select可以选择: 
1, select() 
2, select(long timeout) 
3, selectNow(); 

select()会阻塞线程知道又一个通道就绪。 
而select带timeout的会在特定时间内阻塞,或者至少有一个通道就绪。 
而selectNow()如果没有发现就绪,就直接返回。 

如何停止中断选择呢? 
有三种方法。 
1, wakeup()这是一种优雅的方法,同时也是延时的。如果当前没有正进行的选择操作,也就是要等到下一个select才起作用。 
2, close()选择器的close被调用,则所有在选择操作中阻塞的线程被唤醒,相关通道被注销,键也被取消。 
3, interrupt() 实际上interrupt并不会中断线程。而是设置线程中断标志。 
然后依然是调用wakeup()。这是因为 Selector 捕获了interruptedException,然后在异常处理中调用了 wakeup() 

根据以上的信息,我们可以了解到,实际上选择器对选择键中的集合的操作,是交给程序员来完成的。如何管理选择键,是很关键的。 

这里需要记住的是,ready集合中的比特位,是累积的。根据步骤2,如果一个键是在选择集合中,那么这个键的ready集合是不会被清除的。而如果这个键不在选择集合中,那么就要首先清空这个键的ready集合,然后把就绪信息更新到这个ready集合上,最后,就是把这个键加入到已选择的集合中。 

这也是为什么上面的流程中,我们为什么要把处理的键删除,因为如果不删除,下一次的信息是累积的,我们就不能分出本次select中那些操作就绪了。如果清除掉,那么下一次如果就绪,ready集合就是重置后更新的信息。 

 

通过Selector选择通道

一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。这些方法返回你所感兴趣的事件(如连接、接受、读或写)已经准备就绪的那些通道。换句话说,如果你对“读就绪”的通道感兴趣,select()方法会返回读事件已经就绪的那些通道。

下面是select()方法:

  • int select()
  • int select(long timeout)
  • int selectNow()

select()阻塞到至少有一个通道在你注册的事件上就绪了。

select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。

selectNow()不会阻塞,不管什么通道就绪都立刻返回(译者注:此方法执行非阻塞的选择操作。如果自从前一次选择操作后,没有通道变成可选择的,则此方法直接返回零。)。

select()方法返回的int值表示有多少通道已经就绪。亦即,自上次调用select()方法后有多少通道变成就绪状态。如果调用select()方法,因为有一个通道变成就绪状态,返回了1,若再次调用select()方法,如果另一个通道就绪了,它会再次返回1。如果对第一个就绪的channel没有做任何操作,现在就有两个就绪的通道,但在每次select()方法调用之间,只有一个通道就绪了。

 

wakeUp()

某个线程调用select()方法后阻塞了,即使没有通道已经就绪,也有办法让其从select()方法返回。只要让其它线程在第一个线程调用select()方法的那个对象上调用Selector.wakeup()方法即可。阻塞在select()方法上的线程会立马返回。

如果有其它线程调用了wakeup()方法,但当前没有线程阻塞在select()方法上,下个调用select()方法的线程会立即“醒来(wake up)”。

 

 

Reference:

Java-NIO-Selector

 Java NIO系列教程(六) Selector

 

Java-NIO-Selector,古老的榕树,5-wow.com

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。