Java多執行緒8 Semaphore實現訊號燈
前言
Semaphore是計數訊號量。Semaphore管理一系列許可證。每個acquire方法阻塞,直到有一個許可證可以獲得然後拿走一個許可證;每個release方法增加一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並沒有實際的許可證這個物件,Semaphore只是維持了一個可獲得許可證的數量。
Semaphore可以維護當前訪問自身的執行緒個數,並提供了同步機制。使用Semaphore可以控制同時訪問資源的執行緒個數,例如,實現一個檔案允許的併發訪問數。
1 Semaphore的主要方法
Semaphore(int permits):構造方法,建立具有給定許可數的計數訊號量並設定為非公平訊號量。
Semaphore(int permits,boolean fair):構造方法,當fair等於true時,建立具有給定許可數的計數訊號量並設定為公平訊號量。
void acquire():當前執行緒嘗試去阻塞的獲取1個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前執行緒獲取了1個可用的許可證,則會停止等待,繼續執行。
- 當前執行緒被中斷,則會丟擲InterruptedException異常,並停止等待,繼續執行。
void acquire(int n):從此訊號量獲取給定數目許可,在提供這些許可前一直將執行緒阻塞。
當前執行緒嘗試去阻塞的獲取多個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前執行緒獲取了n個可用的許可證,則會停止等待,繼續執行。
- 當前執行緒被中斷,則會丟擲InterruptedException異常,並停止等待,繼續執行。
void release():釋放一個許可,將其返回給訊號量。
void release(int n):釋放n個許可。
int availablePermits():當前可用的許可數。
voidacquierUninterruptibly():當前執行緒嘗試去阻塞的獲取1個許可證(不可中斷的)。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前執行緒獲取了1個可用的許可證,則會停止等待,繼續執行。
void acquireUninterruptibly(permits):當前執行緒嘗試去阻塞的獲取多個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
當前執行緒獲取了n個可用的許可證,則會停止等待,繼續執行。
boolean tryAcquire()當前執行緒嘗試去獲取1個許可證。
此過程是非阻塞的,它只是在方法呼叫時進行一次嘗試。
如果當前執行緒獲取了1個可用的許可證,則會停止等待,繼續執行,並返回true。
如果當前執行緒沒有獲得這個許可證,也會停止等待,繼續執行,並返回false。
boolean tryAcquire(permits):當前執行緒嘗試去獲取多個許可證。
此過程是非阻塞的,它只是在方法呼叫時進行一次嘗試。
如果當前執行緒獲取了permits個可用的許可證,則會停止等待,繼續執行,並返回true。
如果當前執行緒沒有獲得permits個許可證,也會停止等待,繼續執行,並返回false。
boolean tryAcquire(timeout,TimeUnit):當前執行緒在限定時間內,阻塞的嘗試去獲取1個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前執行緒獲取了可用的許可證,則會停止等待,繼續執行,並返回true。
- 當前執行緒等待時間timeout超時,則會停止等待,繼續執行,並返回false。
- 當前執行緒在timeout時間內被中斷,則會丟擲InterruptedException一次,並停止等待,繼續執行。
boolean tryAcquire(permits,timeout,TimeUnit):當前執行緒在限定時間內,阻塞的嘗試去獲取permits個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前執行緒獲取了可用的permits個許可證,則會停止等待,繼續執行,並返回true。
- 當前執行緒等待時間timeout超時,則會停止等待,繼續執行,並返回false。
- 當前執行緒在timeout時間內被中斷,則會丟擲InterruptedException一次,並停止等待,繼續執行。
2 例項講解
public class SemaphoreTest { private static final Semaphore semaphore = new Semaphore(3); public static void main(String[] args) { Executor executor = Executors.newCachedThreadPool(); String[] name = {"Jack", "Pony", "Larry", "Martin", "James", "ZhangSan","Tree"}; int[] age = {21,22,23,24,25,26,27}; for(int i=0;i<7;i++) { Thread t1=new InformationThread(name[i],age[i]); executor.execute(t1); } } private static class InformationThread extends Thread { private final String name; private final int age; public InformationThread(String name, int age) { this.name = name; this.age = age; } @Override public void run() { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + ":大家好,我是" + name + "我今年" + age + "當前時間段為:" + System.currentTimeMillis()); Thread.sleep(1000); System.out.println(name + "要準備釋放許可證了,當前時間為:" + System.currentTimeMillis()); System.out.println("當前可使用的許可數為:" + semaphore.availablePermits()); System.out.println("是否有正在等待許可證的執行緒:" + semaphore.hasQueuedThreads()); System.out.println("正在等待許可證的佇列長度(執行緒數量):" + semaphore.getQueueLength()); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
pool-1-thread-1:大家好,我是Jack我今年21當前時間段為:1543498535306 pool-1-thread-3:大家好,我是Larry我今年23當前時間段為:1543498535306 pool-1-thread-2:大家好,我是Pony我今年22當前時間段為:1543498535306 Pony要準備釋放許可證了,當前時間為:1543498536310 Jack要準備釋放許可證了,當前時間為:1543498536310 當前可使用的許可數為:0 Larry要準備釋放許可證了,當前時間為:1543498536310 是否有正在等待許可證的執行緒:true 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:true 正在等待許可證的佇列長度(執行緒數量):4 正在等待許可證的佇列長度(執行緒數量):4 當前可使用的許可數為:0 pool-1-thread-4:大家好,我是Martin我今年24當前時間段為:1543498536311 是否有正在等待許可證的執行緒:true pool-1-thread-5:大家好,我是James我今年25當前時間段為:1543498536311 正在等待許可證的佇列長度(執行緒數量):2 pool-1-thread-6:大家好,我是ZhangSan我今年26當前時間段為:1543498536312 James要準備釋放許可證了,當前時間為:1543498537315 Martin要準備釋放許可證了,當前時間為:1543498537315 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:true 正在等待許可證的佇列長度(執行緒數量):1 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:false pool-1-thread-7:大家好,我是Tree我今年27當前時間段為:1543498537316 正在等待許可證的佇列長度(執行緒數量):0 ZhangSan要準備釋放許可證了,當前時間為:1543498537317 當前可使用的許可數為:1 是否有正在等待許可證的執行緒:false 正在等待許可證的佇列長度(執行緒數量):0 Tree要準備釋放許可證了,當前時間為:1543498538319 當前可使用的許可數為:2 是否有正在等待許可證的執行緒:false 正在等待許可證的佇列長度(執行緒數量):0
以上是非公平訊號量,將建立Semaphore物件的語句改為如下語句:
private static final Semaphore semaphore=new Semaphore(3,true);
pool-1-thread-1:大家好,我是Jack我今年21當前時間段為:1543498810563 pool-1-thread-3:大家好,我是Larry我今年23當前時間段為:1543498810564 pool-1-thread-2:大家好,我是Pony我今年22當前時間段為:1543498810563 Jack要準備釋放許可證了,當前時間為:1543498811564 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:true 正在等待許可證的佇列長度(執行緒數量):4 pool-1-thread-4:大家好,我是Martin我今年24當前時間段為:1543498811564 Larry要準備釋放許可證了,當前時間為:1543498811568 當前可使用的許可數為:0 Pony要準備釋放許可證了,當前時間為:1543498811568 是否有正在等待許可證的執行緒:true 當前可使用的許可數為:0 正在等待許可證的佇列長度(執行緒數量):3 是否有正在等待許可證的執行緒:true pool-1-thread-5:大家好,我是James我今年25當前時間段為:1543498811568 正在等待許可證的佇列長度(執行緒數量):2 pool-1-thread-6:大家好,我是ZhangSan我今年26當前時間段為:1543498811568 Martin要準備釋放許可證了,當前時間為:1543498812566 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:true 正在等待許可證的佇列長度(執行緒數量):1 pool-1-thread-7:大家好,我是Tree我今年27當前時間段為:1543498812566 James要準備釋放許可證了,當前時間為:1543498812572 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:false 正在等待許可證的佇列長度(執行緒數量):0 ZhangSan要準備釋放許可證了,當前時間為:1543498812572 當前可使用的許可數為:1 是否有正在等待許可證的執行緒:false 正在等待許可證的佇列長度(執行緒數量):0 Tree要準備釋放許可證了,當前時間為:1543498813568 當前可使用的許可數為:2 是否有正在等待許可證的執行緒:false 正在等待許可證的佇列長度(執行緒數量):0
實現單例模式
將建立訊號量物件語句修改如下:
private static final Semaphore semaphore=new Semaphore(1);
執行程式,結果如下:
pool-1-thread-1:大家好,我是Jack我今年21當前時間段為:1543499053898 Jack要準備釋放許可證了,當前時間為:1543499054903 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:true 正在等待許可證的佇列長度(執行緒數量):6 pool-1-thread-2:大家好,我是Pony我今年22當前時間段為:1543499054904 Pony要準備釋放許可證了,當前時間為:1543499055907 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:true 正在等待許可證的佇列長度(執行緒數量):5 pool-1-thread-3:大家好,我是Larry我今年23當前時間段為:1543499055907 Larry要準備釋放許可證了,當前時間為:1543499056909 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:true 正在等待許可證的佇列長度(執行緒數量):4 pool-1-thread-4:大家好,我是Martin我今年24當前時間段為:1543499056909 Martin要準備釋放許可證了,當前時間為:1543499057913 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:true 正在等待許可證的佇列長度(執行緒數量):3 pool-1-thread-5:大家好,我是James我今年25當前時間段為:1543499057913 James要準備釋放許可證了,當前時間為:1543499058914 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:true 正在等待許可證的佇列長度(執行緒數量):2 pool-1-thread-6:大家好,我是ZhangSan我今年26當前時間段為:1543499058915 ZhangSan要準備釋放許可證了,當前時間為:1543499059919 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:true 正在等待許可證的佇列長度(執行緒數量):1 pool-1-thread-7:大家好,我是Tree我今年27當前時間段為:1543499059919 Tree要準備釋放許可證了,當前時間為:1543499060923 當前可使用的許可數為:0 是否有正在等待許可證的執行緒:false 正在等待許可證的佇列長度(執行緒數量):0
如上可知,如果將給定許可數設定為1,就如同一個單例模式,即單個停車位,只有一輛車進,然後這輛車出來後,下一輛車才能進。
3 原始碼解析
Semaphore有兩種模式,公平模式和非公平模式。公平模式就是呼叫acquire的順序就是獲取許可證的順序,遵循FIFO;而非公平模式是搶佔式的,也就是有可能一個新的獲取執行緒恰好在一個許可證釋放時得到了這個許可證,而前面還有等待的執行緒。
構造方法
Semaphore有兩個構造方法,如下:
public Semaphore(int permits) { sync = new NonfairSync(permits); }
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
獲取許可
先從獲取一個許可看起,並且先看非公平模式下的實現。首先看acquire方法,acquire方法有幾個過載,但主要是下面這個方法
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); }
從上面可以看到,呼叫了Sync的acquireSharedInterruptibly方法,該方法在父類AQS中,如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) //如果執行緒被中斷了,丟擲異常 throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //獲取許可失敗,將執行緒加入到等待佇列中 doAcquireSharedInterruptibly(arg); }
AQS子類如果要使用共享模式的話,需要實現tryAcquireShared方法,下面看NonfairSync的該方法實現:
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
該方法呼叫了父類中的nonfairTyAcquireShared方法,如下:
final int nonfairTryAcquireShared(int acquires) { for (;;) { //獲取剩餘許可數量 int available = getState(); //計算給完這次許可數量後的個數 int remaining = available - acquires; //如果許可不夠或者可以將許可數量重置的話,返回 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
從上面可以看到,只有在許可不夠時返回值才會小於0,其餘返回的都是剩餘許可數量,這也就解釋了,一旦許可不夠,後面的執行緒將會阻塞。看完了非公平的獲取,再看下公平的獲取,程式碼如下:
protected int tryAcquireShared(int acquires) { for (;;) { //如果前面有執行緒再等待,直接返回-1 if (hasQueuedPredecessors()) return -1; //後面與非公平一樣 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
從上面可以看到,FairSync與NonFairSync的區別就在於會首先判斷當前佇列中有沒有執行緒在等待,如果有,就老老實實進入到等待佇列;而不像NonfairSync一樣首先試一把,說不定就恰好獲得了一個許可,這樣就可以插隊了。
看完了獲取許可後,再看一下釋放許可。
釋放許可
釋放許可也有幾個過載方法,但都會呼叫下面這個帶引數的方法,
public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
releaseShared方法在AQS中,如下:
public final boolean releaseShared(int arg) { //如果改變許可數量成功 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
AQS子類實現共享模式的類需要實現tryReleaseShared類來判斷是否釋放成功,實現如下:
protected final boolean tryReleaseShared(int releases) { for (;;) { //獲取當前許可數量 int current = getState(); //計算回收後的數量 int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //CAS改變許可數量成功,返回true if (compareAndSetState(current, next)) return true; } }
從上面可以看到,一旦CAS改變許可數量成功,那麼就會呼叫doReleaseShared()方法釋放阻塞的執行緒。
減小許可數量
Semaphore還有減小許可數量的方法,該方法可以用於用於當資源用完不能再用時,這時就可以減小許可證。程式碼如下:
protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); }
可以看到,委託給了Sync,Sync的reducePermits方法如下:
final void reducePermits(int reductions) { for (;;) { //得到當前剩餘許可數量 int current = getState(); //得到減完之後的許可數量 int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); //如果CAS改變成功 if (compareAndSetState(current, next)) return; } }
從上面可以看到,就是CAS改變AQS中的state變數,因為該變數代表許可證的數量。
獲取剩餘許可數量
Semaphore還可以一次將剩餘的許可數量全部取走,該方法是drain方法,如下:
public int drainPermits() { return sync.drainPermits(); }
Sync的實現如下:
final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } }
可以看到,就是CAS將許可數量置為0。
總結
Semaphore是訊號量,用於管理一組資源。其內部是基於AQS的共享模式,AQS的狀態表示許可證的數量,在許可證數量不夠時,執行緒將會被掛起;而一旦有一個執行緒釋放一個資源,那麼就有可能重新喚醒等待佇列中的執行緒繼續執行。