Semaphore 是一種基於計數的信號量,在定義信號量對象的時候可以設置一個閾值,然後基於這個閾值,多線程可以競爭訪問信號量,線程競爭到許可的信號之後,開始執行具體的業務邏輯,業務邏輯在執行完成之後釋放這個許可信號。
如果許可信號競爭隊列超過閾值,新加入的申請信號許可的線程就會被阻塞,知道有其他許可信號被釋放。使用Semaphore可以控制同時訪問資源的線程個數,例如,實現一個文件允許的並發訪問數。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
//創建Semaphore信號量,初始化許可大小為3
final Semaphore sp = new Semaphore(3);
for(int i=0;i<10;i++){
try {
Thread.sleep(100);
} catch (InterruptedException e2) {
e2.printStackTrace();
}
Runnable runnable = new Runnable(){
public void run(){
try {
//請求獲得許可,如果有可獲得的許可則繼續往下執行,
//許可數減1。否則進入阻塞狀態
sp.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("線程" + Thread.currentThread().getName() +
"進入,當前已有" + (3-sp.availablePermits()) + "個並發");
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("線程" + Thread.currentThread().getName() +
"即將離開");
sp.release();//釋放許可,許可數加1
//下面代碼有時候執行不準確,因為其沒有和上面的代碼合成原子單元
System.out.println("線程" + Thread.currentThread().getName() +
"已離開,當前已有" + (3-sp.availablePermits()) + "個並發");
}
};
service.execute(runnable);
}
}
}
Semaphore 對於鎖的申請與釋放和Reentrantlock是類似的,通過acquire()方法和release()方法來獲取和釋放許可信號資源。
Semaphore.acquire()方法與ReentrantLock.lockInterruptibly()方法的效果是一樣的,為可響應中斷的鎖。也就是說在等待獲取許可信號的過程中可以被Thread.interrupt()方法中斷而取消對許可信號的申請操作。
除此之外,Semaphore也實現了可輪詢的鎖請求、定時鎖的等功能,以及公平與非公平鎖的定義在構造函數中設定
Semaphore鎖的釋放操作也需要手動進行釋放,為此為了避免線程因為異常沒有正常釋放鎖,釋放鎖的操作必須在finally代碼塊中完成。
Semaphore也可以用於實現一些對象池、資源池的構建,例如靜態全局對象池、資料庫連接池等等。
單個信號量的Semaphore對象可以實現互斥鎖的功能,並且可以是由一個線程獲得了「鎖」,再由另一個線程釋放「鎖」,這可應用於死鎖恢復的一些場合。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockTest {
public static void main(String[] args) {
final Business business = new Business();
ExecutorService executor = Executors.newFixedThreadPool(3);
for(int i=0;i<3;i++)
{
executor.execute(
new Runnable()
{
public void run()
{
business.service();
}
}
);
}
executor.shutdown();
}
private static class Business
{
private int count;
Lock lock = new ReentrantLock();
Semaphore sp = new Semaphore(1);
public void service()
{
//lock.lock();
try {
//當前線程使用count變量的時候將其鎖住,不允許其他線程訪問
sp.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
count++;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count);
} catch (RuntimeException e) {
e.printStackTrace();
}
finally
{
//lock.unlock();
sp.release(); //釋放鎖
}
}
}
}