CountDownLatch & CyclicBarrier源码Android版实现解析

CountDownLatch

    CountDownLatch允许一条或者多条线程等待直至其它线程完成以系列的操作的辅助同步器。

    用一个指定的count值对CountDownLatch进行初始化。await方法会阻塞,直至因为调用countDown方法把当前的count降为0,在这以后,所有的等待线程会被释放,并且在这以后的await调用将会立即返回。这是一个一次性行为——count不能被重置。如果你需要一个可以重置count的版本,考虑使用CyclicBarrier。
    其实本类实现非常简单,和ReentrantLock类似,公有的方法都是调用内部类的桥接模式,内部类是继承AQS的锁实现。具体如下:
  private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;


        Sync(int count) {
            setState(count);
        }


        int getCount() {
            return getState();
        }


        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }


        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    构造函数调用setState把count值设置为当前的状态。内部类Sync由CountDownLatch构造函数时创建,当然保证非负值也是这里判断,
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    另外,看看CountDownLatch的的await和countDown方法的实现:
   public void countDown() {
        sync.releaseShared(1);
    }


    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    可以看到await调了AQS的acquireSharedInterruptibly尝试获取共享锁,countDown方法调用了releaseShared尝试释放共享锁,两个方法的参数都是1,因此当此时有多条线程同时调用await时,这时候看到内部Sync类的tryAcquireShared方法实现,由于在构造函数里已经调用setState把当前锁状态数设置为count,因此这里在getState()的判断会一直返回count的值,因此tryAcquireShared会一直返回-1,然后这些调用await的线程都会进入等待队列。
    继续看看countDown的实现,调用的是AQS的releaseShared的方法,经过调用来到内部Sync类的tryReleaseShared方法,明显看到方法只是利用自旋把当前的锁状态数减去一,直到锁状态数等于0,然后返回true,这样就AQS就把先前进入等待队列里的所有等待共享锁的线程唤醒,同时如果后面继续有线程调用await的话,由于锁状态数已经变为0,因此tryAcquireShared会一直返回1,这时函数countDown就会立刻返回,不需要再进入等待队列。
    需要注意的是,这里的count值在构造函数就已经被决定了,后续也没有方法可以修改,当然这是由这个类的最初设计意图所决定的。如果需要能够对count值进行更改,可以参考CyclicBarrier。

CyclicBarrier

    CyclicBarrier允许一组线程互相等待直到一个公平屏障点(common barrier point)。与CountDownLatch不同的是CyclicBarrier着重与互相等待,并且添加重置原状态的方法。

    在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier很有用。因为该屏障(barrier)在释放等待线程后可以重用,所以称它为循环的屏障。CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
    如果屏障操作不依赖于线程组执行时被悬挂,则线程组内任何线程在被释放的时候都可以执行屏障操作。为了改进这个行为,每个await的调用都会返回线程到达屏障的索引。然后你就可以选择哪条线程应该执行屏障操作,例如:
    if (barrier.await() == 0) {
        // 执行屏障操作
    }
    CyclicBarrier对于失败的同步尝试,会使用全有或者全无的破坏模型(breakage model):如果一条线程由于中断、异常或者超时提前离开了屏障点,其它所有在屏障点等待的线程也会通过抛出BrokenBarrierException(或者InterruptedException异常,同时被中断的情况下)离开屏障点。接下来看看具体的实现。

    首先来看看CyclicBarrier类的构造函数和成员变量以及内部类Generation:
    private static class Generation {
        boolean broken = false;
    }
    CyclicBarrier声明一个内部类Generation,在每次屏障点的使用就代表着一个Generation实例。当屏障点被破坏或者重置的时候,generation就要改变。
    //保护屏障点入口的锁
    private final ReentrantLock lock = new ReentrantLock();
    //使线程在await中等待直至屏障点被脱落(tripped)的条件对象
    private final Condition trip = lock.newCondition();
    //需要调用await来脱落屏障点的线程数,为final变量
    private final int parties;
    //在屏障点被脱落之后需要运行的命令
    private final Runnable barrierCommand;
    //当前的Generation
    private Generation generation = new Generation();
    //在每次generation中从parties递减到0,当新的Generation被创建或者屏障点被破坏的时候,count就会被重置
    private int count;
    成员变量中采用lock和trip进行同步控制,另外parties和count记录线程数,generation则表示屏障点的当前状态,还有barrierCommand记录屏障点破坏后需要执行的命令。
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    其中parties表示在屏障点被脱落(tripped)之前需要调用await的线程数,需要注意的是parties是final变量,因此不能改变,而成员变量count则在每次await的时候递减,重置的时候把parties赋值即可。barrierAction表示当屏障点被脱落的时候,执行的命令。要注意的时parties数在构造函数里设定以后就不能更改,如果屏障被脱落的时候,可以调用reset重置,我们先来看看await的实现。在具体实现里,await有两个不同的函数版本,包括无超时版本和超时版本,具体如下。
 //无超时版本
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }


    //超时版本
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }


    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;


            if (g.broken)
                throw new BrokenBarrierException();


            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }


            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }


            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }


                if (g.broken)
                    throw new BrokenBarrierException();


                if (g != generation)
                    return index;


                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    可以看到无超时版本和超时版本的await实现只是在调用dowait函数的参数不同。具体来看看dowait的实现。和之前解析的ReentranLock,ReentrantReadWriteLock,以及上面的CountDownLatch不同,CyclicBarrier并没有重载AQS类,而是选择了直接使用ReentrantLock以及其Condition。
    dowait函数一开始就调用RenntrantLock的lock方法尝试获取锁,当获取锁成功之后,然后创建栈变量g保留成员变量generation,由于generation在后面会被其它线程重新赋值,因此用栈变量保留的做法在多线程同步是实用的技巧。接着检验当前Generation的broken变量,如果该变量为true(在重置或者有足够线程调用类await破坏了屏障点),则此刻马上抛出BrokenBarrierException异常,然后检查当前线程是否已经被中断,如果被中断则调用breakBarrier,根据CyclicBarrier的特性,当一个await的线程中断,则屏障点被破坏,则所有await的线程被唤醒抛出异常。
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    breakBarrier作用就是破坏屏障点,函数首先把成员变量的generation.broken变为true,重置count为parties值,然后调用条件对象trip的signalAll唤醒所有在await等待的线程。在await下面我们即将会看到,breakBarrier会造成之前在await的所有线程抛出异常。如果线程没有被中断,则把count自减,并保留至index(count同样会在后面释放锁的时候被其它线程修改),如果index为0,则表示屏障点已经被破坏,然后如果barrierCommand非null,则执行命令,如果执行成功ranAction修改为true,否则在命令里抛出任何异常的话,则会在finally块调用breakBarrier破坏屏障点,唤醒其它线程。命令执行成功后,则会调用nextGeneration(脱落当前的屏障点):
    private void nextGeneration() {
        trip.signalAll();
        count = parties;
        generation = new Generation();
    }
    函数先唤醒所有在等待中的线程,然后重置count,接着创建一个新的Generation类实例,这样就等于把CyclicBarrier内部的状态重置。
    我们再来看回dowait的实现,如果当前的index还没有递减到0,则会进入一个循环,在这里首先调用trip的await函数按照参数进行超时或无穷等待,如果在等待过程中,有线程抛出了InterruptedException中断了当前线程,则要继续判断generation是否和当前发生改变,如果栈变量g和generation相等,并且broken为false,则表示在await中发生了中断,因此要调用breakBarrier破坏屏障点,然后抛出异常;但如果以上条件不符合,则表明当前屏障点已经被脱落,但在线程仍在等待唤醒的过程中发生的中断,则此次中断应该于当前的await无关,则需要调用Thread.interrupt方法重置interrupt标识。
    然后如果是被正常唤醒或者超时等待await以后,还要继续判断g.broken,如果为true,则表示屏障被破坏,要抛出BrokenBarrierException异常;如果栈变量g不和generation相等,则表示当前屏障点已经被脱落,因此要返回之前的index表达进入屏障点的索引。另外继续是否超时,如果超时则同样需要breakBarrier并且抛出TimeoutException。由于考虑到线程并发问题,如果以上判断都失败则必须要重新循环。最后离开函数的时候必须要调用lock.unlock释放锁。

    另外,如果在await的线程数没有达到parties,但需要重新同步,可以调用reset方法。
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }
    函数实现很简单,尝试获取锁,然后调用breakBarrier和nextGeneration方法。这样调用之后,屏障点就会被破坏(breakage),则把之前在await的线程唤醒并让它们抛出异常;然后调用nextGeneration重置当前状态,这样后来的await能够再次重新等待。

总结

    这样,我们就完整地把CountDownLatch和CyclicBarrier进行了分析。CountDownLatch着重于多组线程等待另外一组线程完成操作,并且是无法重置的;CyclicBarrier则是着重于一组线程互相等待到对方都完成操作为止,但可以重置。
    如果要考虑到CountDownLatch为什么不提供一个可重置的方法,个人认为考虑到实现重置,则必须要像CyclicBarrier一样要考虑到对其它正在等待线程的影响,这样势必就会使整个同步器模型更加复杂,令使用者不方便,同时也会加大实现难度,这样不如像JUC包给出的解决方案一样,提供CountDownLatch用于更普遍的简单的并发情况,另外再提供CyclicBarrier来为更加复杂的并发模型提供帮助。而事实上CountDownLatch的同步模型比CyclicBarrier要简单,主要体现在等待线程之间不会互相影响,另外CountDownLatch的实现也要比CyclicBarrier更加简单。

CountDownLatch & CyclicBarrier源码Android版实现解析,,5-wow.com

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。