java 信号量Semaphore

Semaphore 信号量主要用于约束多个线程可同时获取的物理上的或者逻辑上的资源数。比如用在各种池的设计中。

信号量用于管理这些资源的一个虚拟的管理凭据。线程在获取一个资源时,首先要获取一个资源的许可凭证。当线程用完之后将资源返回池中,并将许可凭证返回给信号量。

例如:

一个池的例子:

class Pool {

   private static final int MAX_AVAILABLE = 100;//最大可用资源数

   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

 

   public Object getItem() throws InterruptedException {

     available.acquire();

     return getNextAvailableItem();

   }

 

   public void putItem(Object x) {

     if (markAsUnused(x))

       available.release();

   }

 

   // Not a particularly efficient data structure; just for demo

 

   protected Object[] items = ... whatever kinds of items being managed

   protected boolean[] used = new boolean[MAX_AVAILABLE];//是否被使用

 

   protected synchronized Object getNextAvailableItem() {

     for (int i = 0; i < MAX_AVAILABLE; ++i) {

       if (!used[i]) {

          used[i] = true;

          return items[i];

       }

     }

     return null; // not reached

   }

 

   protected synchronized boolean markAsUnused(Object item) {

     for (int i = 0; i < MAX_AVAILABLE; ++i) {

       if (item == items[i]) {

          if (used[i]) {

            used[i] = false;

            return true;

          } else

            return false;

       }

     }

     return false;

   }

 }}

信号量初始化时有两个参数,第一个参数是资源可用数目,第二个是fairness parameter。fairness parameter 是公平变量。设为false那么,当有一个线程请求acquire时,会直接分给这个线程,而不管等待队列中焦急等待的众线程。而公平指的当然就是先来先服务了,如果没获取到一个许可,线程就只好先把自己挂起来了。但是使用tryacquire则直接抢一个许可凭证。调用一个acquire()并不意味着必须调用release(),即java不要求其成对出现,但是保证代码正确,就靠自己的程序设计了。

一下是我自己写的一个信号量的例子:

package com;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Semaphore;

class Test
{
private final Semaphore sem;
private final Set<String> set;
public Test(int bound)
{
this.set = Collections.synchronizedSet(new HashSet<String>());
this.sem=new Semaphore(bound,true);
}
public boolean add(String str) throws InterruptedException
{
boolean result = false;
sem.acquire();
try{
result=set.add(str);
}finally{
if(!result)
{
sem.release();
}
}
return result;
}
public boolean remove(String str)
{
boolean result=set.remove(str);
if(result)
{
sem.release();

}
return result;
}
}
public class Main{

public static void main(String[] args)
{
Test test = new Test(10);
new Thread(){
public void run()
{
int j=0;
for(int i=20;i<40;)
{
System.out.println("Thread1 run");
try {
test.add(String.valueOf(i));
System.out.println("Thread1 add"+(i++));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("Thread1 stop");
}
}.start();
new Thread(){
public void run()
{
int j=0;
boolean result=false;
for(int i=0;i<20;)
{
System.out.println("Thread2 run");
try {
result=test.add(String.valueOf(i));
if(result)System.out.println("Thread2 add"+(i++));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
System.out.println("Thread2 stop");
}
}.start();
new Thread(){
public void run()
{
int i=0,j=0;
boolean result=false;
while(i<40)
{
j++;
j=j%40;
System.out.println("Thread3 run");
result=test.remove(String.valueOf(j));

if(result)System.out.println("remove"+(i++));
}
System.out.println("Thread3 stop");
}
}.start();
}
}

郑重声明:本站内容如果来自互联网及其他传播媒体,其版权均属原媒体及文章作者所有。转载目的在于传递更多信息及用于网络分享,并不代表本站赞同其观点和对其真实性负责,也不构成任何其他建议。