【開發小記】 Java 執行緒池 之 被“吃掉”的執行緒異常(附原始碼分析和解決方法)
前言
今天遇到了一個bug,現象是,一個任務放入執行緒池中,似乎“沒有被執行”,日誌也沒有打。
經過原生代碼除錯之後,發現在任務邏輯的前半段,丟擲了 NPE
,但是程式碼外層沒有 try-catch
,導致這個異常被吃掉。
這個問題解決起來是很簡單的,外層加個 try-catch
就好了,但是這個異常如果沒有被catch,執行緒池內部邏輯是怎麼處理這個異常的呢?這個異常最後會跑到哪裡呢?
帶著疑問和好奇心,我研究了一下執行緒池那一塊的原始碼,並且做了以下的總結。
原始碼分析
專案中出問題的程式碼差不多就是下面這個樣子
ExecutorService threadPool = Executors.newFixedThreadPool(3); threadPool.submit(() -> { String pennyStr = null; Double penny = Double.valueOf(pennyStr); ... })
先進到 newFixedThreadPool
這個工廠方法中看生成的具體實現類,發現是 ThreadPoolExecutor
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
再看這個類的繼承關係,
再進到 submit
方法,這個方法在 ExecutorService
介面中約定,其實是在 AbstractExectorService
中實現, ThreadPoolExecutor
並沒有override這個方法。
public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
對應的 FutureTask物件的
構造方法
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW;// state由volatile 修飾 保證多執行緒下的可見性 }
對應 Callable
物件的構造方法
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
對應 RunnableAdapter
物件的構造方法
/** * A callable that runs given task and returns given result * 一個能執行所給任務並且返回結果的Callable物件 */ static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
總結上面的, newTaskFor
就是把我們提交的 Runnable
物件包裝成了一個 Future
。
接下來就是會把任務提交到佇列中給執行緒池排程處理:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
因為主要關心的是這個執行緒怎麼執行,異常的丟擲和處理,所以我們暫時不解析多餘的邏輯。很容易發現,如果任務要被執行,肯定是進到了 addWorker
方法當中,所以我們再進去看,鑑於 addWorker
方法的很長,不想列太多的程式碼,我就摘了關鍵程式碼段:
private boolean addWorker(Runnable firstTask, boolean core) { ... boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 例項化一個worker物件 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 從Worker物件的構造方法看,當這個thread物件start之後, // 之後實際上就是呼叫Worker物件的run() t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } // Worker的構造方法 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
我們再看這個 ThreadPoolExecutor
的內部類 Worker
物件:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ... /** Delegates main run loop to outer runWorker*/ public void run() { runWorker(this); } ... }
看來真正執行任務的是在這個外部的 runWorker
當中,讓我們再看看這個方法是怎麼消費 Worker
執行緒的。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; // ==== 關鍵程式碼 start ==== try { // 很簡潔明瞭,呼叫了任務的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } // ==== 關鍵程式碼 end ==== } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
終於走到底了,可以看到關鍵程式碼中的try-catch block程式碼塊中,呼叫了本次執行任務的 run
方法。
// ==== 關鍵程式碼 start ==== try { // 很簡潔明瞭,呼叫了任務的run方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } // ==== 關鍵程式碼 end ====
可以看到捕捉了異常之後,會再向外丟擲,只不過再finally block 中有個 afterExecute()
方法,似乎在這裡是可以處理這個異常資訊的,進去看看
protected void afterExecute(Runnable r, Throwable t) { }
可以看到 ThreadPoolExecutor#afterExecute()
方法中,是什麼都沒做的,看來是讓使用者通過override這個方法來定製化任務執行之後的邏輯,其中可以包括異常處理。
那麼這個異常到底是拋到哪裡去了呢。我在一個大佬的文章找到了hotSpot JVM處理執行緒異常的邏輯,
if (!destroy_vm || JDK_Version::is_jdk12x_version()) { // JSR-166: change call from from ThreadGroup.uncaughtException to // java.lang.Thread.dispatchUncaughtException if (uncaught_exception.not_null()) { //如果有未捕獲的異常 Handle group(this, java_lang_Thread::threadGroup(threadObj())); { KlassHandle recvrKlass(THREAD, threadObj->klass()); CallInfo callinfo; KlassHandle thread_klass(THREAD, SystemDictionary::Thread_klass()); /* 這裡類似一個方法表,實際就會去呼叫Thread#dispatchUncaughtException方法 template(dispatchUncaughtException_name,"dispatchUncaughtException") */ LinkResolver::resolve_virtual_call(callinfo, threadObj, recvrKlass, thread_klass, vmSymbols::dispatchUncaughtException_name(), vmSymbols::throwable_void_signature(), KlassHandle(), false, false, THREAD); CLEAR_PENDING_EXCEPTION; methodHandle method = callinfo.selected_method(); if (method.not_null()) { JavaValue result(T_VOID); JavaCalls::call_virtual(&result, threadObj, thread_klass, vmSymbols::dispatchUncaughtException_name(), vmSymbols::throwable_void_signature(), uncaught_exception, THREAD); } else { KlassHandle thread_group(THREAD, SystemDictionary::ThreadGroup_klass()); JavaValue result(T_VOID); JavaCalls::call_virtual(&result, group, thread_group, vmSymbols::uncaughtException_name(), vmSymbols::thread_throwable_void_signature(), threadObj,// Arg 1 uncaught_exception,// Arg 2 THREAD); } if (HAS_PENDING_EXCEPTION) { ResourceMark rm(this); jio_fprintf(defaultStream::error_stream(), "\nException: %s thrown from the UncaughtExceptionHandler" " in thread \"%s\"\n", pending_exception()->klass()->external_name(), get_thread_name()); CLEAR_PENDING_EXCEPTION; } } }
程式碼是C寫的,有興趣可以去全文,根據英文註釋能稍微看懂一點
http://hg.openjdk.java.net/jd...
可以看到這裡最終會去呼叫 Thread#dispatchUncaughtException
方法:
/** * Dispatch an uncaught exception to the handler. This method is * intended to be called only by the JVM. */ private void dispatchUncaughtException(Throwable e) { getUncaughtExceptionHandler().uncaughtException(this, e); }
/** * Called by the Java Virtual Machine when a thread in this * thread group stops because of an uncaught exception, and the thread * does not have a specific {@link Thread.UncaughtExceptionHandler} * installed. * */ public void uncaughtException(Thread t, Throwable e) { if (parent != null) { parent.uncaughtException(t, e); } else { Thread.UncaughtExceptionHandler ueh = Thread.getDefaultUncaughtExceptionHandler(); if (ueh != null) { ueh.uncaughtException(t, e); } else if (!(e instanceof ThreadDeath)) { //可以看到會打到System.err裡面 System.err.print("Exception in thread \"" + t.getName() + "\" "); e.printStackTrace(System.err); } } }
jdk的註釋也說明的很清楚了,當一個執行緒丟擲了一個未捕獲的異常,JVM會去呼叫這個方法。如果當前執行緒沒有宣告 UncaughtExceptionHandler
成員變數並且重寫 uncaughtException
方法的時候,就會看執行緒所屬的執行緒組(如果有執行緒組的話)有沒有這個類,沒有就會打到 System.err
裡面。
IBM這篇文章也提倡我們使用 ThreadGroup
提供的 uncaughtException
處理程式來線上程異常終止時進行檢測。
https://www.ibm.com/developer...
總結 (解決方法)
從上述原始碼分析中可以看到,對於本篇的異常“被吃掉”的問題,有以下幾種方法
- 用try-catch 捕捉,一般都是用這種
-
執行緒或者執行緒組物件設定UncaughtExceptionHandler成員變數
Thread t = new Thread(r); t.setUncaughtExceptionHandler( (t1, e) -> LOGGER.error(t1 + " throws exception: " + e)); return t;
- override 執行緒池的
afterExecute
方法。
本篇雖然是提出問題的解決方法,但主旨還是分析原始碼,瞭解了整個過程中異常的經過的流程,希望能對您產生幫助。