Java併發之執行緒池
一.使用執行緒池的好處
與“為每一個任務分配一個執行緒”相比,執行緒池有一些好處。
- 重用已經建立的執行緒,減少了建立、銷燬執行緒的開銷。
- 任務到達時,可能執行緒池中已經有建立好的執行緒供使用了,避免了等待執行緒建立的時間開銷。
二.Java執行緒池實現原理
在Java中建立執行緒池可以使用Executors提供的四個靜態方法建立適用於特定情況的幾種執行緒池。但這些構造方法還是根據需求直接傳入特定引數例項化了ThreadPoolExecutor類。所以,我們要想從原理上理解執行緒池,還是要先學習一下ThreadPoolExecutor的構造方法,看看都有哪些引數。這些引數其實可以理解為執行緒池的配置資訊,根據自己的需求傳入不同的引數就能構造出不同的執行緒池。
ThreadPollExector類有4個構造方法,其它三個的引數較少,使用了一些但歸根結底是傳入的7個關鍵引數決定了這個執行緒池是什麼樣的。所以關鍵是這七個引數。
2.1 ThreadPoolExecutor構造方法的7個引數
方法宣告如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler);
corePoolSize:
核心執行緒數。當執行緒池剛剛建立時執行緒數是0,這時如果每來一個任務就會建立一個新的執行緒,直到已建立的執行緒數等於核心執行緒數後就不再建立了,而是把新任務放入阻塞隊裡去。
關於初始執行緒數:如果呼叫了prestartAllCoreThreads()會直接創建出corePoolSize個執行緒;如果呼叫了prestartCoreThread()則會提前創建出一個執行緒。
關於執行緒存活時間:核心執行緒不受keepAliveTime的影響,建立後會一直存在,直到執行緒池關閉。除非呼叫了allowCoreThreadTimeOut(true)方法
這裡的疑問是,如果現有執行緒是有空閒的,但沒達到核心執行緒數,來了新任務會建立新的執行緒嗎?
答:這個疑問如果理解了執行緒池的工作過程就不會問了。詳見下面的執行緒池工作流程。
maximumPoolSize
最大執行緒數。是執行緒池最多允許存在的執行緒總數。如果當前執行緒數已經達到corePoolSize。那麼就將任務放入佇列,如果佇列也滿了。就判斷一下當前存線上程數是否小於maximumPoolSize,如果是,則建立新的執行緒執行任務。這裡建立的執行緒處於核心執行緒池外,受keepAliveTime的影響,如果空閒到達執行時間就會銷燬。
*keepAliveTime
控制非核心執行緒在空閒狀態下的存活時間。如果呼叫allowCoreThreadTimedOut方法,核心執行緒也可以受它的影響。
unit
keepAliveTime的時間單位,有秒、毫秒、分鐘、小時、天等。
workQueue
指定快取佇列
threadFactory
執行緒工廠,用來建立執行緒。
handler
表示當前拒絕處理任務時的策略,當快取佇列已滿,執行緒數也達到maximumPoolSize時就按指定的策略處理新提交的任務。
主要有以下四種取值:
- ThreadPoolExecutor.AbortPolicy:丟棄任務並丟擲RejectedExecutionException異常。
- ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不丟擲異常。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄佇列最前面的任務,然後重新嘗試執行任務(重複此過程)
- ThreadPoolExecutor.CallerRunsPolicy:由呼叫執行緒處理該任務
2.2 執行緒池工作流程
- 當有新任務提交給執行緒池時,首先檢查一下當前執行緒數是否達到了corePoolSize,如果沒達到則新建立執行緒,將當前任務作為該執行緒的第一個任務執行。
- 如果已經到達了corePoolSize或超過了,則將任務放入快取佇列。
- 如果快取佇列已滿,則判斷當前執行緒數是否已經達到了maximumPoolSize,如果沒到達則建立新執行緒執行任務
- 如果執行緒數達到了maximumPoolSize,則使用拒絕策略處理
2.3ThreadPoolExecutor原始碼分析
2.3.1 執行緒池狀態
執行緒池有running,shutdown,stop,tidying,termienated幾種狀態。在jdk1.8的實現中,複用了一個AtomicInteger物件來同時儲存執行緒狀態和當前執行緒數。具體程式碼不做展開,理解這種做法即可。
2.3.2 引數介紹
- ReentrantLock mainLook 一個鎖,新增工作執行緒時要先獲取鎖
- HashSet<Worker> workers 儲存工作執行緒
- int largesetPoolSize記錄曾經達到的最大執行緒數
-
long completedTaskCount 記錄已經完成的任務數量
其他還有構造方法傳入的7個引數也都有相應的屬性進行儲存
2.3.3 execute(Runnable command)方法分析
通過這個方法將任務提交給執行緒池執行。這個方法基本是2.2節執行緒池工作流程執行的。
public void execute(Runnable command) { if (command == null) //空指標異常 throw new NullPointerException(); //獲取clt,這個AtomicInteger物件中儲存著當前執行緒數和執行緒執行狀態 int c = ctl.get(); //如果當前執行緒數小於核心執行緒數則執行新增執行緒動作 if (workerCountOf(c) < corePoolSize) { //addWorker的第二個引數表示新增的是否是核心執行緒,這裡是true if (addWorker(command, true)) return; //新增後重新獲取狀態值,因為執行緒數已經有變化了,執行緒池狀態也可能變了 c = ctl.get(); } //執行到這裡,說明新增核心執行緒不成功,可能是數量達到corePoolSize或執行緒池shutdown了 //如果執行緒池還在執行嘗試將任務加入緩衝佇列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //如果新增到佇列後,執行緒池停止運行了,將任務從佇列移除 if (! isRunning(recheck) && remove(command)) reject(command);//移除成功後使用拒絕策略處理任務 else if (workerCountOf(recheck) == 0)//分析1:如果核心執行緒池為空,新增一個非核心執行緒,處理佇列中可能的任務 addWorker(null, false); } else if (!addWorker(command, false)) //佇列滿了,啟動非核心執行緒執行任務 reject(command);//非核心執行緒啟動失敗,執行拒絕策略 }
2.3.4 addWorker(Runnable,Boolean)分析
addWorker方法用來給執行緒池中新增執行緒。第二個引數表示新增的是否是核心執行緒。下面來看一下它是怎麼工作的吧!
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); //分析1:注意這個if語句是和2.3.3中分析1相呼應的。 //呼叫shutdown()將空閒執行緒interrupt,正在執行的執行緒繼續執行,將狀態設為shutdown //呼叫shutdownNow()將所有執行緒中斷,不管有沒有執行完。 //如果shutdown()後,核心執行緒都關閉了,佇列中卻還有元素,2.3.3分析1就添加了新的非核心執行緒處理,就是這裡的!(rs==SHUTDOWN&&……)這種情況 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //超過執行執行緒數,返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //cas將執行緒計數加1,失敗後重試 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get();// Re-read ctl //執行緒狀態發生變化重新迴圈 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //下面是新增執行緒的過程 boolean workerStarted = false; //標識執行緒是否啟動 boolean workerAdded = false; //標識執行緒是否被新增 Worker w = null; try { w = new Worker(firstTask);//新生成worker,執行緒池中的執行緒用它表達 final Thread t = w.thread;//獲取實際的執行緒 if (t != null) { //新增執行緒要先獲取鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. //獲取當前狀態 int rs = runStateOf(ctl.get()); //不是shutdown狀態或者,處於shutdown狀態但新增的tash是null,屬於shutdown下新增執行緒處理佇列中剩餘任務的情況 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w);//新增到workers,workers持有執行緒 int s = workers.size(); if (s > largestPoolSize) //記錄最大執行緒數記錄 largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start();//啟動執行緒執行 workerStarted = true; } } } finally { if (! workerStarted)//沒成功 addWorkerFailed(w); } return workerStarted; }
2.3.5 worker的執行
新增執行緒後,worker就開始執行了,在它的執行方法run裡會直接呼叫tast的run方法,執行要乾的事情。
執行完成之後會去佇列獲取新的任務執行。
如果沒有新任務執行呢?
Worker是ThreadPoolExecutor的一個內部類,它的實現了Runnable方法。這裡有個問題是Runnable需要傳遞給Thread才能執行。Worker是如何做到的呢?
原來,Worker類持有了一個Thread型別的變數thread,並在初始化時使用Worker本身初始化了thread
Worker的構造方法:
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
Worker的run方法呼叫了runWoker(this)。Worker還繼承了AQS,這一塊還有待學習。
第一次啟動會執行初始化傳進來的任務firstTask;然後會從workQueue中取任務執行,如果佇列為空則等待keepAliveTime這麼長時間
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //第一次時執行外部傳過來的task,後面從getTask獲取,getTask從佇列獲取任務執行, //如果佇列為空則等待keepAliveTime這麼長的時間 while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
2.3.5 其他內容
下一步應該分析getTask是如何從佇列獲取任務的了,這裡不再展開講了。
此外,還應該有佇列的實現和選擇問題,拒絕策略的具體等,目前先不做如此多的分析了,待未來時機成熟了再完善。
二.使用Executors建立具有預設配置的執行緒池
java.util.concurrent.Executors中提供了4個靜態方法,可以用來建立具有指定特性的執行緒池。都是例項化了ThreadPoolExecutor物件。
2.1 newFixedThreadPool()方法
返回一個帶快取的執行緒池,該池在必要的時候建立執行緒,線上程空閒60s之後終止執行緒.
下面看一下這個方法的原始碼實現:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
2.2 newFixedThreadPool
建立一個固定長度的執行緒池,每當提交一個任務時就建立一個執行緒,指導達到執行緒池的最大數量,這是執行緒池的規模將不再變化(如果某個執行緒由於發生了未預期的Exception而結束,那麼執行緒池會補充一個新的執行緒)
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
2.3newScheduledThreadPool
建立一個固定長度的執行緒池,而且以延遲或定時的方式來執行任務,類似於Timer
public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
2.4newSingleThreadExecutor
是一個單執行緒的Executor,它建立單個工作者執行緒來執行任務,如果這個執行緒異常結束,會建立另一個執行緒來替代。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }