java多线程开发之CyclicBarrier

  最近研究了一个别人的源码,其中用到多个线程并行操作一个文件,并且在所有线程全部结束后才进行主线程后面的处理。

  其用到java.util.concurrent.CyclicBarrier 这个类。

   CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用,其相当于一个屏障,当一个线程跑到await()方法时,将挂起这个线程,等待直到其他线程同样跑这个await()方法。

        int index = --count;
           if (index == 0) {  // tripped
             //。。
         }
          else {
            // We‘re about to finish waiting even if we had not
            // been interrupted, so this interrupt is deemed to
            // "belong" to subsequent execution.
            Thread.currentThread().interrupt();
            }

其是使用一个计数器,当计数器没有减少到0的时候,则会将当前线程中断。

另外,这个是可以重复等待的,重复调用await()方法,将进行重新等待。

构造方法:

 CyclicBarrier(int parties)
          创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在每个 barrier 上执行预定义的操作。

CyclicBarrier(int parties, Runnable barrierAction)
          创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行

 当构造方法传入一个Runnable时,每次到达公共屏障点的时候都最先执行这个传进去的Runnable,然后再执行处于等待的Runnable

 

    /**
     * CyclicBarrier类似于CountDownLatch也是个计数器,
     * 不同的是CyclicBarrier数的是调用了CyclicBarrier.await()进入等待的线程数,
     * 当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。
     * CyclicBarrier就象它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍。
     * CyclicBarrier初始时还可带一个Runnable的参数,
     * 此Runnable任务在CyclicBarrier的数目达到后,所有其它线程被唤醒前被执行。
     */
public class CyclicBarrierTest {

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        //final  CyclicBarrier cb = new CyclicBarrier(3);//创建CyclicBarrier对象并设置3个公共屏障点
        final  CyclicBarrier cb = new CyclicBarrier(3,new Runnable(){
            @Override
            public void run() {
                System.out.println("********我最先执行***********");
            }
        });
        for(int i=0;i<3;i++){
            Runnable runnable = new Runnable(){
                    public void run(){
                    try {
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("线程" + Thread.currentThread().getName() + 
                                "即将到达集合地点1,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
                        cb.await();//到此如果没有达到公共屏障点,则该线程处于等待状态,如果达到公共屏障点则所有处于等待的线程都继续往下运行
                        
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("线程" + Thread.currentThread().getName() + 
                                "即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
                        cb.await();    //这里CyclicBarrier对象又可以重用
                        Thread.sleep((long)(Math.random()*10000));    
                        System.out.println("线程" + Thread.currentThread().getName() + 
                                "即将到达集合地点3,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
                        cb.await();                        
                    } catch (Exception e) {
                        e.printStackTrace();
                    }                
                }
            };
            service.execute(runnable);
        }
        service.shutdown();
    }
}

 

另外,其还有一些其他方法

方法摘要
int await()
          在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int await(long timeout, TimeUnit unit)
          在所有参与者都已经在此屏障上调用 await 方法之前,将一直等待。
int getNumberWaiting()
          返回当前在屏障处等待的参与者数目。
int getParties()
          返回要求启动此 barrier 的参与者数目。
boolean isBroken()
          查询此屏障是否处于损坏状态。
void reset()
          将屏障重置为其初始状态。

 

。。

 

 

 

 

 

 

 

 

 

 

方法摘要
int await()
          在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int await(long timeout, TimeUnit unit)
          在所有参与者都已经在此屏障上调用 await 方法之前,将一直等待。
int getNumberWaiting()
          返回当前在屏障处等待的参与者数目。
int getParties()
          返回要求启动此 barrier 的参与者数目。
boolean isBroken()
          查询此屏障是否处于损坏状态。
void reset()
          将屏障重置为其初始状态。

 

技术分享
 1 package com.thread;
 2 import java.util.concurrent.CyclicBarrier;
 3 import java.util.concurrent.ExecutorService;
 4 import java.util.concurrent.Executors;
 5 import java.util.concurrent.Semaphore;
 6 
 7 public class CyclicBarrierTest {
 8 
 9     public static void main(String[] args) {
10         ExecutorService service = Executors.newCachedThreadPool();
11         final  CyclicBarrier cb = new CyclicBarrier(3);//创建CyclicBarrier对象并设置3个公共屏障点
12         for(int i=0;i<3;i++){
13             Runnable runnable = new Runnable(){
14                     public void run(){
15                     try {
16                         Thread.sleep((long)(Math.random()*10000));    
17                         System.out.println("线程" + Thread.currentThread().getName() + 
18                                 "即将到达集合地点1,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
19                         cb.await();//到此如果没有达到公共屏障点,则该线程处于等待状态,如果达到公共屏障点则所有处于等待的线程都继续往下运行
20                         
21                         Thread.sleep((long)(Math.random()*10000));    
22                         System.out.println("线程" + Thread.currentThread().getName() + 
23                                 "即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
24                         cb.await();    
25                         Thread.sleep((long)(Math.random()*10000));    
26                         System.out.println("线程" + Thread.currentThread().getName() + 
27                                 "即将到达集合地点3,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
28                         cb.await();                        
29                     } catch (Exception e) {
30                         e.printStackTrace();
31                     }                
32                 }
33             };
34             service.execute(runnable);
35         }
36         service.shutdown();
37     }
38 }
技术分享
技术分享
线程pool-1-thread-1即将到达集合地点1,当前已有0个已经到达,正在等候
线程pool-1-thread-3即将到达集合地点1,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合地点1,当前已有2个已经到达,正在等候
线程pool-1-thread-3即将到达集合地点2,当前已有0个已经到达,正在等候
线程pool-1-thread-2即将到达集合地点2,当前已有1个已经到达,正在等候
线程pool-1-thread-1即将到达集合地点2,当前已有2个已经到达,正在等候
线程pool-1-thread-1即将到达集合地点3,当前已有0个已经到达,正在等候
线程pool-1-thread-3即将到达集合地点3,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合地点3,当前已有2个已经到达,正在等候
技术分享

 

  如果在构造CyclicBarrier对象的时候传了一个Runnable对象进去,则每次到达公共屏障点的时候都最先执行这个传进去的Runnable,然后再执行处于等待的Runnable。如果把上面的例子改成下面这样:

按 Ctrl+C 复制代码
按 Ctrl+C 复制代码

则结果如下:

技术分享
线程pool-1-thread-1即将到达集合地点1,当前已有0个已经到达,正在等候
线程pool-1-thread-3即将到达集合地点1,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合地点1,当前已有2个已经到达,正在等候
********我最先执行***********
线程pool-1-thread-1即将到达集合地点2,当前已有0个已经到达,正在等候
线程pool-1-thread-3即将到达集合地点2,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合地点2,当前已有2个已经到达,正在等候
********我最先执行***********
线程pool-1-thread-1即将到达集合地点3,当前已有0个已经到达,正在等候
线程pool-1-thread-3即将到达集合地点3,当前已有1个已经到达,正在等候
线程pool-1-thread-2即将到达集合地点3,当前已有2个已经到达,正在等候
********我最先执行***********
技术分享

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。