Java執行緒池瞭解一下
馬上就要過年了,還在崗位上堅守“swimming”的小夥伴們頂住。博主給大家帶來一篇執行緒池的基本使用解解悶。
為什麼需要使用執行緒池
1、減少執行緒建立與切換的開銷
- 在沒有使用執行緒池的時候,來了一個任務,就建立一個執行緒,我們知道系統建立和銷燬工作執行緒的開銷很大,而且頻繁的建立執行緒也就意味著需要進行頻繁的執行緒切換,這都是一筆很大的開銷。
2、控制執行緒的數量
- 使用執行緒池我們可以有效地控制執行緒的數量,當系統中存在大量併發執行緒時,會導致系統性能劇烈下降。
執行緒池做了什麼
重複利用有限的執行緒
- 執行緒池中會預先建立一些空閒的執行緒,他們不斷的從工作佇列中取出任務,然後執行,執行完之後,會繼續執行工作佇列中的下一個任務,減少了建立和銷燬執行緒的次數,每個執行緒都可以一直被重用,變了建立和銷燬的開銷。
執行緒池的使用
其實常用Java執行緒池本質上都是由ThreadPoolExecutor
或者ForkJoinPool
生成的,只是其根據建構函式傳入不同的實參來例項化相應執行緒池而已。
Executors
Executors
是一個執行緒池工廠類,該工廠類包含如下集合靜態工廠方法來建立執行緒池:
newFixedThreadPool() newSingleThreadExecutor() newCachedThreadPool() newWorkStealingPool() newScheduledThreadPool()
ExecutorService介面
對設計模式有了解過的同學都會知道,我們儘量面向介面程式設計,這樣對程式的靈活性是非常友好的。Java執行緒池也採用了面向介面程式設計的思想,可以看到ThreadPoolExecutor
和ForkJoinPool
所有都是ExecutorService
介面的實現類。在ExecutorService
介面中定義了一些常用的方法,然後再各種執行緒池中都可以使用ExecutorService
介面中定義的方法,常用的方法有如下幾個:
-
向執行緒池提交執行緒
Future<?> submit() void execute(Runnable command)
-
關閉執行緒池
void shutdown() List<Runnable> shutdownNow()
-
檢查執行緒池的狀態
boolean isShutdown() boolean isTerminated()
常見執行緒池使用示例
一、newFixedThreadPool
執行緒池中的執行緒數目是固定的,不管你來了多少的任務。
示例程式碼
public class MyFixThreadPool { public static void main(String[] args) throws InterruptedException { // 建立一個執行緒數固定為5的執行緒池 ExecutorService service = Executors.newFixedThreadPool(5); System.out.println("初始執行緒池狀態:" + service); for (int i = 0; i < 6; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println("執行緒提交完畢之後執行緒池狀態:" + service); service.shutdown();//會等待所有的執行緒執行完畢才關閉,shutdownNow:立馬關閉 System.out.println("是否全部執行緒已經執行完畢:" + service.isTerminated());//所有的任務執行完了,就會返回true System.out.println("是否已經執行shutdown()" + service.isShutdown()); System.out.println("執行完shutdown()之後執行緒池的狀態:" + service); TimeUnit.SECONDS.sleep(5); System.out.println("5秒鐘過後,是否全部執行緒已經執行完畢:" + service.isTerminated()); System.out.println("5秒鐘過後,是否已經執行shutdown()" + service.isShutdown()); System.out.println("5秒鐘過後,執行緒池狀態:" + service); } } 複製程式碼
執行結果:
初始執行緒池狀態:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
執行緒提交完畢之後執行緒池狀態:[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
是否全部執行緒已經執行完畢:false
是否已經執行shutdown():true
執行完shutdown()之後執行緒池的狀態:[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-2
pool-1-thread-1
pool-1-thread-4
pool-1-thread-5
pool-1-thread-3
pool-1-thread-2
5秒鐘過後,是否全部執行緒已經執行完畢:true
5秒鐘過後,是否已經執行shutdown():true
5秒鐘過後,執行緒池狀態:[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
程式分析
-
當我們建立好一個FixedThreadPool之後,該執行緒池就處於
Running
狀態了,但是pool size
(執行緒池執行緒的數量)、active threads
(當前活躍執行緒)queued tasks
(當前排隊執行緒)、completed tasks
(已完成的任務數)都是0 -
當我們把6個任務都提交給執行緒池之後,
-
pool size = 5
:因為我們建立的是一個固定執行緒數為5的執行緒池(注意:如果這個時候我們只提交了3個任務,那麼pool size = 3
,說明執行緒池也是通過懶載入的方式去建立執行緒)。 -
active threads = 5
:雖然我們向執行緒池提交了6個任務,但是執行緒池的固定大小為5,所以活躍執行緒只有5個 -
queued tasks = 1
:雖然我們向執行緒池提交了6個任務,但是執行緒池的固定大小為5,只能有5個活躍執行緒同時工作,所以有一個任務在等待
-
-
我們第一次執行
shutdown()
的時候,由於任務還沒有全部執行完畢,所以isTerminated()
返回false
,shutdown()
返回true,而執行緒池的狀態會由Running
變為Shutting down
-
從任務的執行結果我們可以看出,名為
pool-1-thread-2
執行了兩次任務,證明執行緒池中的執行緒確實是重複利用的。 -
5秒鐘後,
isTerminated()
返回true
,shutdown()
返回true
,證明所有的任務都執行完了,執行緒池也關閉了,我們再次檢查執行緒池的狀態[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]
,狀態已經處於Terminated
了,然後已完成的任務顯示為6
二、newSingleThreadExecutor
從頭到尾整個執行緒池都只有一個執行緒在工作。
例項程式碼
public class SingleThreadPool { public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { final int j = i; service.execute(() -> { System.out.println(j + " " + Thread.currentThread().getName()); }); } } } 複製程式碼
執行結果
0 pool-1-thread-1 1 pool-1-thread-1 2 pool-1-thread-1 3 pool-1-thread-1 4 pool-1-thread-1
程式分析可以看到只有pool-1-thread-1
一個執行緒在工作。
三、newCachedThreadPool
來多少任務,就建立多少執行緒(前提是沒有空閒的執行緒在等待執行任務,否則還是會複用之前舊(快取)的執行緒),直接你電腦能支撐的執行緒數的極限為止。
例項程式碼
public class CachePool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); System.out.println("初始執行緒池狀態:" + service); for (int i = 0; i < 12; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println("執行緒提交完畢之後執行緒池狀態:" + service); TimeUnit.SECONDS.sleep(50); System.out.println("50秒後執行緒池狀態:" + service); TimeUnit.SECONDS.sleep(30); System.out.println("80秒後執行緒池狀態:" + service); } } 複製程式碼
執行結果
初始執行緒池狀態:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
執行緒提交完畢之後執行緒池狀態:[Running, pool size = 12, active threads = 12, queued tasks = 0, completed tasks = 0]
pool-1-thread-3
pool-1-thread-4
pool-1-thread-1
pool-1-thread-2
pool-1-thread-5
pool-1-thread-8
pool-1-thread-9
pool-1-thread-12
pool-1-thread-7
pool-1-thread-6
pool-1-thread-11
pool-1-thread-10
50秒後執行緒池狀態:[Running, pool size = 12, active threads = 0, queued tasks = 0, completed tasks = 12]
80秒後執行緒池狀態:[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 12]
程式分析
- 因為我們每個執行緒任務至少需要500毫秒的執行時間,所以當我們往執行緒池中提交12個任務的過程中,基本上沒有空閒的執行緒供我們重複使用,所以執行緒池會建立12個執行緒。
- 快取中的執行緒預設是60秒沒有活躍就會被銷燬掉,可以看到在50秒鐘的時候回,所有的任務已經完成了,但是執行緒池執行緒的數量還是12。
- 80秒過後,可以看到執行緒池中的執行緒已經全部被銷燬了。
四、newScheduledThreadPool
可以在指定延遲後或週期性地執行執行緒任務的執行緒池。
ScheduledThreadPoolExecutor
-
newScheduledThreadPool()
方法返回的其實是一個ScheduledThreadPoolExecutor
物件,ScheduledThreadPoolExecutor
定義如下:
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { 複製程式碼
-
可以看到,它還是繼承了
ThreadPoolExecutor
並實現了ScheduledExecutorService
介面,而ScheduledExecutorService
也是繼承了ExecutorService
介面,所以我們也可以像使用之前的執行緒池物件一樣使用,只不過是該物件會額外多了一些方法用於控制延遲與週期:public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit) public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,ong initialDelay,long delay,TimeUnit unit)
示例程式碼
下面程式碼每500毫秒列印一次當前執行緒名稱以及一個隨機數字。
public class MyScheduledPool { public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(4); service.scheduleAtFixedRate(() -> { System.out.println(Thread.currentThread().getName() + new Random().nextInt(1000)); }, 0, 500, TimeUnit.MILLISECONDS); } } 複製程式碼
五、newWorkStealingPool
每個執行緒維護著自己的佇列,執行完自己的任務之後,會去主動執行其他執行緒佇列中的任務。
示例程式碼
public class MyWorkStealingPool { public static void main(String[] args) throws IOException { ExecutorService service = Executors.newWorkStealingPool(4); System.out.println("cpu核心:" + Runtime.getRuntime().availableProcessors()); service.execute(new R(1000)); service.execute(new R(2000)); service.execute(new R(2000)); service.execute(new R(2000)); service.execute(new R(2000)); //由於產生的是精靈執行緒(守護執行緒、後臺執行緒),主執行緒不阻塞的話,看不到輸出 System.in.read(); } static class R implements Runnable { int time; R(int time) { this.time = time; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(time + " " + Thread.currentThread().getName()); } } } 複製程式碼
執行結果
cpu核心:4 1000 ForkJoinPool-1-worker-1 2000 ForkJoinPool-1-worker-0 2000 ForkJoinPool-1-worker-3 2000 ForkJoinPool-1-worker-2 2000 ForkJoinPool-1-worker-1
程式分析ForkJoinPool-1-worker-1
任務的執行時間是1秒,它會最先執行完畢,然後它會去主動執行其他執行緒佇列中的任務。
六、ForkJoinPool
-
ForkJoinPool
可以將一個任務拆分成多個“小任務”平行計算,再把多個“小任務”的結果合併成總的計算結果。ForkJoinPool
提供瞭如下幾個方法用於建立ForkJoinPool
例項物件:-
ForkJoinPool(int parallelism)
:建立一個包含parallelism個並行執行緒的ForkJoinPool
,parallelism的預設值為Runtime.getRuntime().availableProcessors()
方法的返回值 -
ForkJoinPool commonPool()
:該方法返回一個通用池,通用池的執行狀態不會受shutdown()
或shutdownNow()
方法的影響。
-
-
建立了
ForkJoinPool
示例之後,就可以呼叫ForkJoinPool
的submit(ForkJoinTask task)
或invoke(ForkJoinTask task)
方法來執行指定任務了。其中ForkJoinTask
(實現了Future介面)代表一個可以並行、合併的任務。ForkJoinTask
是一個抽象類,他還有兩個抽象子類:RecursiveAction
和RecursiveTask
。其中RecursiveTask
代表有返回值的任務,而RecursiveAction
代表沒有返回值的任務。
示例程式碼
下面程式碼演示了使用ForkJoinPool
對1000000個隨機整數進行求和。
public class MyForkJoinPool { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random random = new Random(); static { for (int i = 0; i < nums.length; i++) { nums[i] = random.nextInt(1000); } System.out.println(Arrays.stream(nums).sum()); } //static class AddTask extends RecursiveAction { // //int start, end; // //AddTask(int start, int end) { //this.start = start; //this.end = end; //} // //@Override //protected void compute() { //if (end - start <= MAX_NUM) { //long sum = 0L; //for (int i = 0; i < end; i++) sum += nums[i]; //System.out.println("from:" + start + " to:" + end + " = " + sum); //} else { //int middle = start + (end - start) / 2; // //AddTask subTask1 = new AddTask(start, middle); //AddTask subTask2 = new AddTask(middle, end); //subTask1.fork(); //subTask2.fork(); //} //} //} static class AddTask extends RecursiveTask<Long> { int start, end; AddTask(int start, int end) { this.start = start; this.end = end; } @Override protected Long compute() { // 當end與start之間的差大於MAX_NUM,將大任務分解成兩個“小任務” if (end - start <= MAX_NUM) { long sum = 0L; for (int i = start; i < end; i++) sum += nums[i]; return sum; } else { int middle = start + (end - start) / 2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); // 並行執行兩個“小任務” subTask1.fork(); subTask2.fork(); // 把兩個“小任務”累加的結果合併起來 return subTask1.join() + subTask2.join(); } } } public static void main(String[] args) throws IOException { ForkJoinPool forkJoinPool = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); forkJoinPool.execute(task); long result = task.join(); System.out.println(result); forkJoinPool.shutdown(); } } 複製程式碼
額外補充
上面我們說到過:其實常用Java執行緒池都是由ThreadPoolExecutor
或者ForkJoinPool
兩個類生成的,只是其根據建構函式傳入不同的實參來生成相應執行緒池而已。那我們現在一起來看看Executors中幾個建立執行緒池物件的靜態方法相關的原始碼:
ThreadPoolExecutor建構函式原型
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 複製程式碼
引數說明
corePoolSize maximumPoolSize keepAliveTime unit workQueue
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } 複製程式碼
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } 複製程式碼
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 複製程式碼
newScheduledThreadPool
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } 複製程式碼
newWorkStealingPool
public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } 複製程式碼
拉票環節
覺得文章寫得不錯的朋友可以點贊、轉發、加關注呀!你們的支援就是我最大的動力,筆芯!