AQS同步元件--Semaphore
Semaphore
什麼是Semaphore?
是用於控制某個資源同一時間被執行緒訪問的個數,提供acquire()和release()方法,acquire獲取一個許可,如果沒有獲取的到就等待,release是在操作完成後釋放一個許可,Semaphore維護了當前訪問的個數,通過同步機制來控制同時訪問的個數,在資料結構裡連結串列中的節點是可以無限個的,而Semaphore裡維護的是一個有大小的限連結串列。
Semaphore的使用場景
Semaphore用於僅能提供有限訪問的資源,比如資料庫中的連結數只有20但是我們上層應用數可能遠大於20,如果同時都對資料庫連結進行獲取,那很定會因為連結獲取不到而報錯,所以我們就要對資料庫連結的訪問進行控制。
演示程式碼
@Slf4j public class SemaphoreExample1 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(); // 獲取一個許可 test(threadNum); semaphore.release(); // 釋放一個許可 } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
我們在執行 test(threadNum)方式前後包裹上acquire和release,這樣其實我們就相當於一個單執行緒在執行。當執行acquire後就只能等待執行release後再執行新的執行緒,然後我們在acquire()和release()都是沒有傳參也就是1,每次只允許一個執行緒執行,如果我們改成
semaphore.acquire(3); // 獲取多個許可 test(threadNum); semaphore.release(3); // 釋放多個許可
那麼我們就是每3個3個執行直到把執行緒池中的執行緒執行完。在列印的日誌中我們也可以看到是每三個每三個列印。
假設我們的資料庫允許獲取連線是3剩餘的獲取執行緒我們不想要只想丟棄改如何實現?
Semaphore提供了一個嘗試獲取許可的方法,tryAcquire()嘗試獲取許可成功就執行,嘗試獲取許可失敗就丟棄執行緒。下面看程式碼
@Slf4j public class SemaphoreExample3 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { if (semaphore.tryAcquire()) { // 嘗試獲取一個許可 test(threadNum); semaphore.release(); // 釋放一個許可 } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
這段程式碼執行結果就只打印了3行日誌,其他的執行緒就被丟棄了。tryAcquire()共提供如下幾種方法。
我們用一個例子來演示一下引數的方法的使用。
@Slf4j public class SemaphoreExample4 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 嘗試獲取一個許可 test(threadNum); semaphore.release(); // 釋放一個許可 } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
這次我們使用的是一個tryAcquire(5000, TimeUnit.MILLISECONDS))方法,這個方法的第一個引數是表示等待5000毫秒,第二引數是表示多長時間嘗試一次,TimeUnit.MILLISECONDS表示1毫秒。這時候我們會發現20個執行緒都執行了,為什麼會這樣呢?因為我們在執行時等待超時時間是5秒,每次執行就是sleep 1秒,所以可以獲取成tryAcquire進而執行。