Java併發程式設計 - Callable、Future和FutureTask的實現
啟動執行緒執行任務,如果需要在任務執行完畢之後得到任務執行結果,可以使用從Java 1.5開始提供的Callable和Future
下面就分析一下Callable、Future以及FutureTask的具體實現及使用方法
原始碼分析基於JDK 1.7
一、Callable 與 Runnable
java.lang.Runnable是一個介面,只有一個run()方法
public interface Runnable { public abstract void run(); }
run()方法
的返回值是void,故在執行完任務後無法返回任何結果
Callable是java.util.concurrent包下的,也是一個介面,也只有一個call()方法
,類似於java.lang.Runnable的run()方法
,實現Callable介面的類和實現Runnable介面的類都是可以被其它執行緒執行的任務
public interface Callable<V> { V call() throws Exception; }
可以看到call()方法是有返回值的,可以將執行的結果返回
Callable和Runnable的區別:
1、Callable中定義的是call()方法,Runnable中定義的是run()方法
2、Callable中的call()方法可以返回執行任務後的結果,Runnable中的run()方法無法獲得返回值
3、Callable中的call()方法定義了throws Exception丟擲異常,丟擲的異常可以在主執行緒Future.get()時被主執行緒捕獲;Runnable中的run()方法沒有定義丟擲異常,執行任務時發生異常時也會上拋,因為即使不加預設也會上拋RuntimeException,但異常無法被主執行緒獲取
4、執行Callable任務可以拿到一個Future物件代表非同步運算的結果
二、Future
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future是java.util.concurrent包下的一個介面,代表著一個非同步計算的結果,可以通過get()
獲取執行緒執行的返回值,cancel()
取消任務執行,isCancelled()
和isDone()
獲得任務執行的情況
boolean cancel(boolean mayInterruptIfRunning)
嘗試取消任務的執行,取消成功返回true,取消失敗返回false
mayInterruptIfRunning表示是否允許中斷正在執行的任務
1、如果任務還未開始,cancel返回true,且任務永遠不會被執行
2、如果任務正在執行,根據mayInterruptIfRunning的值判斷是否需要中斷執行中的任務,且如果mayInterruptIfRunning為true,會呼叫中斷邏輯,返回true;如果mayInterruptIfRunning為false,不會呼叫執行緒中斷,只是將任務取消
3、如果任務結束(可能是正常完成、異常終止、被取消),返回false
4、如果cancel()操作返回true,後續呼叫isDone()、isCancelled()都返回true
boolean isCancelled()
表示任務是否被取消成功,如果在任務正常完成前被取消成功,則返回true
boolean isDone()
表示任務是否已經完成,則返回true,注意:正常完成、異常 或 取消操作都代表任務完成
V get() 和 V get(long timeout, TimeUnit unit)
get()
用來獲取執行結果,這個方法會產生阻塞,會一直等到任務執行完畢才返回
get(long timeout, TimeUnit unit)
用來獲取執行結果,如果在指定時間內還沒獲取到結果,會丟擲TimeoutException
Future提供了三種功能:
1、獲取任務執行的結果
2、取消任務
3、判斷任務是否完成 或 是否取消
因為Future只是一個介面,所以是無法直接用來建立物件使用的,因此就有了下面的FutureTask
三、FutureTask
public class FutureTask<V> implements RunnableFuture<V>
FutureTask實現了RunnableFuture介面,那麼RunnableFuture又是什麼呢?
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
RunnableFuture介面繼承了Runnable
和Future
,所以它既是一個可以讓執行緒執行的Runnable任務,又是一個可以獲取Callable返回值的Future
FutureTask的屬性
/** The run state of this task */ private volatile int state; private static final int NEW= 0; private static final int COMPLETING= 1; private static final int NORMAL= 2; private static final int EXCEPTIONAL= 3; private static final int CANCELLED= 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED= 6; /** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;
state 是任務的執行狀態
- 初始化時是NEW
- 任務終止的狀態有NORMAL(正常結束)、EXCEPTIONAL(異常結束)、CANCELLED(被取消)、INTERRUPTED(執行中被中斷),這些狀態是通過set() 、setException 、cancel() 方法觸發的
- COMPLETING 和 INTERRUPTING是兩個中間狀態,當正常結束設定outcome屬性前是COMPLETING,設定後變成NORMAL;當中斷執行中執行緒前是INTERRUPTING,呼叫thread.interrupt()後是INTERRUPTED
可能的狀態轉換:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
callable 是執行緒執行的有返回值的任務
outcome 是任務執行後的結果或異常
waiters 表示等待獲取結果的阻塞執行緒,連結串列結構,後等待執行緒的會排在連結串列前面
FutureTask的構造方法
FutureTask有兩個構造方法:
FutureTask(Callable callable)
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW;// ensure visibility of callable }
構造方法引數是Callable定義的任務,並將state置為NEW,只有當state為NEW時,callable才能被執行
FutureTask(Runnable runnable, V result)
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW;// ensure visibility of callable }
引數為Runnable和帶泛型的result物件,由於Runnable本身是沒有返回值的,故執行緒的執行結果通過result返回
可以看到通過runnable和result封裝了個Callable,實際上是new RunnableAdapter<T>(task, result)
,這個Adapter介面卡將Runnable和result轉換成Callable,並返回result
FutureTask.run()的實現
執行緒執行時真正執行的方法,Callable.call()
會在其中執行,幷包含設定返回值或異常的邏輯
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
1、任務執行狀態不是NEW,直接返回;將runner屬性從null->當前執行緒不成功,��接返回
2、呼叫call()方法,呼叫成功,使用set()設定返回值
3、呼叫過程發生異常,使用setException()儲存異常
set() 和 setException()
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
set()
和setException()
的實現基本一樣,都是先將任務執行狀態從NEW->COMPLETING
,分別設定返回值或異常給outcome,再將狀態分別置為NORMAL和EXCEPTIONAL
,最後呼叫finishCompletion()
依次喚醒等待獲取結果的阻塞執行緒
finishCompletion()實現
/** * Removes and signals all waiting threads, invokes done(), and nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { //將成員變數waiters置為null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //迴圈喚醒WaitNode中的等待執行緒 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //由子類實現的方法 done(); callable = null;// to reduce footprint }
1、執行FutureTask類的get方法時,會把主執行緒封裝成WaitNode節點並儲存在waiters連結串列中
2、FutureTask任務執行完成後,通過UNSAFE設定waiters的值為null,並通過LockSupport.unpark方法依次喚醒等待獲取結果的執行緒
FutureTask.get()的實現
get()
方法有兩個實現,一個是一直等待獲取結果,直到任務執行完;一個是等待指定時間,超時後任務還未完成會上拋TimeoutException
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
內部通過awaitDone()
對主執行緒進行阻塞,具體實現如下:
/** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; //截止時間 WaitNode q = null; boolean queued = false; for (;;) { //如果主執行緒已經被中斷,removeWaiter(),並上拋InterruptedException //注意:Thread.interrupted()後會導致執行緒的中斷狀態為false if (Thread.interrupted()) { removeWaiter(q); //執行緒被中斷的情況下,從waiters連結串列中刪除q throw new InterruptedException(); } int s = state; //如果任務已經完成(可能是正常完成、異常、中斷),直接返回,即還沒有開始等待,任務已經完成了 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //如果任務正在完成,讓出CPU資源,等待state變成NORMAL或EXCEPTIONAL else if (s == COMPLETING) // cannot time out yet Thread.yield(); //s<COMPLETING 且 還沒有建立WaitNode else if (q == null) q = new WaitNode(); //s<COMPLETING 且 已經建立WaitNode,但還沒有入隊 else if (!queued) /** * 1、將當前waiters賦值給q.next,即“q-->當前waiters” * 2、CAS,將waiters屬性,從“當前waiters-->q” * 所以後等待的會排在連結串列的前面,而任務完成時會從連結串列前面開始依次喚醒等待執行緒 */ queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //所有準備工作完成,判斷等待是否需要計時 else if (timed) { nanos = deadline - System.nanoTime(); //如果已經等待超時,remove當前WaiterNode if (nanos <= 0L) { removeWaiter(q); //等待超時的情況下,從waiters連結串列中刪除q return state; } LockSupport.parkNanos(this, nanos); //掛起一段時間 } else LockSupport.park(this); //一直掛起,等待喚醒 } }
1、判斷主執行緒是否被中斷,如果被中斷,將當前WaitNode節點從waiters連結串列中刪除,並上拋InterruptedException
2、如果任務已經完成(可能是正常完成、異常、中斷),直接返回(即還沒有開始等待,任務已經完成了,就返回了)
3、如果任務正在完成,讓出CPU資源,等待state變成NORMAL或EXCEPTIONAL
4、如果任務沒有被中斷,也沒有完成,new WaitNode()
5、如果任務沒有被中斷,也沒有完成,也建立了WaitNode,使用UNSAFE.CAS()操作將WaitNode加入waiters連結串列
6、所有準備工作完畢,通過LockSupport的park或parkNanos掛起執行緒
而WaitNode 就是一個簡單的連結串列節點,記錄這等待的執行緒和下一個WaitNode
/** * Simple linked list nodes to record waiting threads in a Treiber * stack.See other classes such as Phaser and SynchronousQueue * for more detailed explanation. */ static final class WaitNode { volatile Thread thread; //等待的執行緒 volatile WaitNode next; //下一個WaitNode WaitNode() { thread = Thread.currentThread(); } }
FutureTask.cancel()的實現
public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; if (mayInterruptIfRunning) { if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); //中斷執行緒 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
1、如果任務不是執行狀態,直接返回false失敗
2、如果mayInterruptIfRunning==true,中斷執行中的任務,使用CAS操作將狀態NEW-->INTERRUPTING ,再呼叫runner.interrupt(),最後將狀態置為INTERRUPTED
3、如果mayInterruptIfRunning==false,將任務置為CANCELLED取消狀態
4、呼叫finishCompletion()
依次喚醒等待獲取結果的執行緒,返回true取消成功
四、使用示例
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestFuture { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(1); Task task = new Task(); //callable任務 Future<Integer> result = executor.submit(task); executor.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主執行緒在執行任務"); try { System.out.println("task執行結果:"+result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("所有任務執行完畢"); } static class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("子執行緒在進行計算"); Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } } }
執行結果:
子執行緒在進行計算 主執行緒在執行任務 task執行結果:4950 所有任務執行完畢
如果只是想控制在某些情況下可以將任務取消,可以使用Future<?> future = executor.submit(runnable)
,這樣返回結果肯定為null,但可以使用future.cancel()取消任務執行
五、總結
1、有了Runnable,為什麼還需要Callable,它們的區別是什麼?
Runnable和Callable都表示執行的任務,但不同的是Runnable.run()方法沒有返回值,Callable.call()有返回值
但其實執行緒在執行任務時還是執行的Runnable.run()方法,所以在使用ThreadPoolExecutor.submit()時會將Callable封裝為FutureTask,而FutureTask是Runnable和Future的實現類
所以在執行Callable的任務時,執行緒其實是執行FutureTask這個Runnable的run()方法,其中封裝了呼叫Callable.call()並返回結果的邏輯
執行Runnable任務如果發生異常,主執行緒無法知曉;而執行Callable任務如果發生異常,在Future.get()時會丟擲java.util.concurrent.ExecutionException,其中封裝了真實異常
2、Future.get()是如何獲取執行緒返回值的?
首先得益於Callable.call()方法定義了返回值,提交Callable任務後,Callable會被封裝成FutureTask,其既可以作為Runnable被執行,也可以作為Future獲取返回值,FutureTask.run()方法會呼叫Callable.call()中的任務程式碼
在任務執行完成前,如果主執行緒使用Future.get(),其實是呼叫FutureTask.get(),其中會判斷任務狀態尚未結束,將主執行緒加入waiters等待連結串列,並掛起主執行緒
待任務執行結束後,FutureTask會喚醒所有等待獲取返回值的執行緒,此時主執行緒的FutureTask.get()就會返回了
所以,主執行緒和執行執行緒是通過FutureTask作為橋樑獲取執行緒返回值的
3、Future.cancel()真的能取消任務的執行嗎?
首先答案是“不一定”,根據JDK中的方法註釋“Attempts to cancel execution of this task”,即嘗試去取消執行的任務
如果任務正在執行,且呼叫cancel()時引數mayInterruptIfRunning傳的是true,那麼會對執行執行緒呼叫interrupt()方法
那麼問題就變成了interrupt()方法能中斷執行緒執行嗎?
interrupt()方法不會中斷正在執行的執行緒。這一方法實際上完成的是線上程受到阻塞時丟擲一箇中斷訊號,這樣執行緒就得以退出阻塞的狀態。更確切的說,如果執行緒被Object.wait()、Thread.join()、Thread.sleep()等阻塞,那麼它將接收到一箇中斷異常(InterruptedException),從而提早地終結被阻塞狀態。
如果執行緒沒有被阻塞,呼叫interrupt()將不起作用
那麼即使執行緒正在阻塞狀態,並丟擲了InterruptedException,執行緒能否真的取消執行還要看程式碼中是否捕獲了InterruptedException和有沒有做相應的對中斷標示的判斷邏輯
Linux公社的RSS地址 :https://www.linuxidc.com/rssFeed.aspx
本文永久更新連結地址:https://www.linuxidc.com/Linux/2019-02/156799.htm