深入理解[Master-Worker模式]原理與技術
Master-Worker模式是常用的並行模式之一。它的核心思想是,系統由兩類程序協作工作:Master程序和Worker程序。Master程序負責接收和分配任務,Worker程序負責處理子任務。當各個Worker程序將子任務處理完成後,將結果返回給Master程序,由Master程序做歸納和彙總,從而得到系統的最終結果,其處理過程如圖1所示。
Master-Worker模式的好處,它能夠將一個大任務分解成若干個小任務,並且執行,從而提高系統的吞吐量。而對於系統請求者Client來說,任務一旦提交,Master程序會分配任務並立即返回,並不會等待系統全部處理完成後再返回,其處理過程是非同步的。因此Client不會出現等待現象。
1.Master-Worker的模式結構
Master-Worker模式是一種使用多執行緒進行資料結構處理的結構。
Master程序為主要程序,它維護了一個Worker程序佇列、子任務佇列和子結果集。Worker程序佇列中的Worker程序,不停地從任務佇列中提取要處理的子任務,並將子任務的處理結果寫入結果集。
2.Master-Worker的程式碼實現
基於以上的思路實現一個簡易的master-worker框架。其中Master部分的程式碼如下:
public class Master { //任務佇列 protected Queue<Object> workQuery = new ConcurrentLinkedQueue<Object>(); //worker程序佇列 protected Map<String, Thread> threadMap = new HashMap<>(); //子任務處理結果集 protected Map<String, Object> resultMap = new ConcurrentHashMap<>(); //是否所有的子任務都結束了 public boolean isComplete() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { if (entry.getValue().getState()!=Thread.State.TERMINATED){ return false; } } return true; } //Master 的構造,需要一個Worker 程序邏輯,和需要的Worker程序數量 public Master(Worker worker,int countWorker){ worker.setWorkQueue(workQuery); worker.setResultMap(resultMap); for (int i = 0; i < countWorker; i++) { threadMap.put(Integer.toString(i),new Thread(worker)); } } //提交一個任務 public void submit(Object job){ workQuery.add(job); } //返回子任務結果集 public Map<String,Object> getResultMap(){ return resultMap; } //開始執行所有的worker程序,進行處理 public voidexecute(){ for (Map.Entry<String,Thread> entry : threadMap.entrySet()){ entry.getValue().start(); } } }
對應的Worker程序的程式碼實現:
public class Worker implements Runnable { //任務佇列,用於取得子任務 protected Queue<Object> workQueue; //子任務處理結果集 protected Map<String, Object> resultMap; public void setWorkQueue(Queue<Object> workQueue) { this.workQueue = workQueue; } public void setResultMap(Map<String, Object> resultMap) { this.resultMap = resultMap; } //子任務處理的邏輯,在子類中實現具體邏輯 public Object handle(Object input) { return input; } @Override public void run() { while (true) { //獲取子任務 Object input = workQueue.poll(); if (input == null) { break; } //處理子任務 Object re = handle(input); //將處理結果寫入結果集 resultMap.put(Integer.toString(input.hashCode()), re); } } }
以上兩段程式碼已經展示了Master-Worker框架的全貌。應用程式通過過載 Worker.handle() 方法實現應用層邏輯。
例如,要實現計算1+2+..+100的結果,程式碼如下:
public class PlusWorker extends Worker { @Override public Object handle(Object input) { Integer i = (Integer) input; return i+1; } public static void main(String[] args) { Master master = new Master(new PlusWorker(), 5); for (int i = 0; i < 100; i++) { master.submit(i); //提交一百個子任務 } master.execute(); //開始計算 int re = 0; Map<String, Object> resultMap = master.getResultMap(); while (resultMap.size() > 0 || !master.isComplete()) { Set<String> keys = resultMap.keySet(); String key = null; for (String k : keys) { key = k; break; } Integer i = null; if (key != null) { i = (Integer) resultMap.get(key);//從結果集中獲取結果 } if (i != null) { re += i;//最終結果 } if (key != null) { resultMap.remove(key);//移除已經被計算過的項 } } System.out.println("result: " + re); } }
執行結果:
result: 5050
在應用層程式碼中,建立了5個Worker工作程序和Worker工作例項PlusWorker。在提交了100個子任務後,便開始子任務的計算。這些子任務中,由生成的5個Worker程序共同完成。Master並不等待所有的Worker執行完畢,就開始訪問子結果集進行最終結果的計算,直到子結果集中所有的資料都被處理,並且5個活躍的Worker程序全部終止,才給出最終計算結果。
Master-Worker模式是一種將序列任務並行化的方法,被分解的子任務在系統中可以被並行處理。同時,如果有需要,Master程序不需要等待所有子任務都完成計算,就可以根據已有的部分結果集計算最終結果。
3.Amino框架提供的Master-Worker模式
在Amino框架中為Master-Worker模式提供了較為完善的實現和便捷的操作介面。Amino實現了兩套Master-Worker實現:一種是靜態的Master-Worker實現,另一種是動態實現。
靜態實現不允許在任務開始時新增新的子任務,而動態的Master-Worker允許在任務執行過程中,由Master或Worker新增新的子任務。
在Amino框架中, MasterWorkerFactory.newStatic(new Pow3(),20)
用於建立靜態的Master-Worker模式,
第二個引數為Worker執行緒數,第一個引數為執行的任務類,該類需實現 Doable<Integer,Integer>
介面,該介面泛型的第一個型別為任務方法的引數型別,第二個型別為方法返回型別。 MasterWorkerFactory.newDynamic(new Pow3Dyn())
用於建立動態的Master-Worker模式,其中引數為實現 DynamicWorker
介面的例項。
submit()
方法用於提交應用層任務, execute()
方法將執行所有任務。
Amino框架需要自行下載,下載地址: ofollow,noindex" target="_blank">https://sourceforge.net/projects/amino-cbbs/files/cbbs/0.5.3/ ,找到 cbbs-java-bin-0.5.3.tar.gz 下載即可。
下面用Amino框架演示1+2+..+100的完整示例。
public class Pow3 implements Doable<Integer,Integer> { @Override public Integer run(Integer input) { //業務邏輯 return input; } }
public class Pow3Dyn implements DynamicWorker<Integer,Integer> { @Override public Integer run(Integer integer, WorkQueue<Integer> workQueue) { //業務邏輯 return integer; } }
public class AminoDemo { / * Amino 框架提供開箱即用的Master-Worker模式 * 其它用法參考API文件 */ public static void main(String[] args) { new AminoDemo().testDynamic(); new AminoDemo().testStatic(); } / * 靜態模式,不允許在任務開始後新增新的任務 */ public void testStatic(){ MasterWorker<Integer,Integer> mw = MasterWorkerFactory.newStatic(new Pow3(),20);//靜態模式,可指定執行緒數 List<MasterWorker.ResultKey> keyList = new Vector<>(); for (int i = 1; i <= 100; i++) { keyList.add(mw.submit(i)); //傳參並排程任務,key用於取得任務結果 } mw.execute();//執行所有任務 int re = 0; while (keyList.size()> 0){ //不等待全部執行完成,就開始求和 MasterWorker.ResultKey k = keyList.get(0); Integer i = mw.result(k); //由Key取得一個任務結果 if (i!=null){ re+=i; keyList.remove(0); //累加完成後 } } System.out.println("result:"+re); mw.shutdown();//關閉master-worker,釋放資源 } / * 動態模式,可在開始執行任務後繼續新增任務 */ public void testDynamic(){ MasterWorker<Integer,Integer> mw = MasterWorkerFactory.newDynamic(new Pow3Dyn());//動態模式,可指定執行緒數 List<MasterWorker.ResultKey> keyList = new Vector<>(); for (int i = 1; i < 50; i++) { keyList.add(mw.submit(i)); //傳參並排程任務,key用於取得任務結果 } mw.execute(); for (int i = 50; i <= 100; i++) { keyList.add(mw.submit(i)); //傳參並排程任務,key用於取得任務結果 } int re = 0; while (keyList.size()> 0){ MasterWorker.ResultKey k = keyList.get(0); Integer i = mw.result(k); //由Key取得一個任務結果 if (i!=null){ re+=i; keyList.remove(0); //累加完成後 } } System.out.println("result:"+re); mw.shutdown(); } }
執行結果:
result:5050 result:5050
MasterWorker類的方法摘要,其它請自行下載API文件。 cbbs-java-apidocs-0.5.3.tar.gz
方法摘要 | |
---|---|
boolean |
execute() Begin processing of the work items submitted. |
boolean |
execute(long timeout, java.util.concurrent.TimeUnit unit) Begin processing of the work items submitted. |
void |
finished() Indicate to the master/worker that there is not more work coming. |
java.util.Collection<T> |
getAllResults() Obtain all of the results from the processing work items. |
boolean |
isCompleted() Poll an executing master/worker for completion. |
boolean |
isStatic() Determine if a master/worker is static. |
int |
numWorkers() Get the number of active workers. |
T |
result(MasterWorker.ResultKey k) Obtain the results from the processing of a work item. |
void |
shutdown() Shutdown the master/worker. |
MasterWorker.ResultKey |
submit(S w) Submit a work item for processing. |
MasterWorker.ResultKey |
submit(S w, long timeout, java.util.concurrent.TimeUnit unit) Submit a work item for processing and block until it is either submitted successfully or the specified timeout period has expired. |
boolean |
waitForCompletion() Wait until all workers have completed. |
boolean |
waitForCompletion(long timeout, java.util.concurrent.TimeUnit unit) Wait until all workers have completed or the specified timeout period expires. |