多執行緒程式設計模型:Pipeline模式 中
實戰案例解析
需求:將資料庫中符合指定條件的記錄資料以FTP形式傳輸(同步)到指定的主機上。
1. 每個資料檔案最多隻包含N(如10000)條記錄;當一個數據檔案被寫滿時,其他待寫記錄會寫入新的資料檔案。
2. 每個資料檔案可以被傳輸到多臺主機上。
3. 本地要備份。
功能:查詢資料庫(Stage1)、根據查詢結果集生成本地資料檔案(Stage2)、FTP傳輸各個資料檔案到指定的主機(支援多臺主機,Stage3)、備份(傳輸完畢或者失敗)的資料檔案(Stage4)。
該任務涉及比較多的I/O操作:查詢資料庫和傳輸資料檔案均涉及網路I/O和檔案I/O,備份檔案涉及檔案I/O。
該任務不適宜用單執行緒實現,也不適合採用多個執行緒,每個執行緒中仍是按序列處理(每個執行緒先後執行Stage2、Stage3和Stage4),這會導致執行Stage2時多個執行緒需要徵用同一個資料檔案(因為這個檔案還沒有被寫滿)。
Pipeline模式:採用一個執行緒去負責Stage1的執行。其餘處理步驟(Stage2、Stage3和Stage4)中的每一個步驟都有專門的工作者執行緒去負責處理。這樣,從個體任務上看,上述步驟仍然是按順序序列處理,但從整體任務上看幾個步驟卻是並行的,從而提升了計算效率。
資料同步定時任務
package io.github.viscent.mtpattern.ch13.pipeline.example; import io.github.viscent.mtpattern.ch13.pipeline.AbstractParallelPipe; import io.github.viscent.mtpattern.ch13.pipeline.AbstractPipe; import io.github.viscent.mtpattern.ch13.pipeline.Pipe; import io.github.viscent.mtpattern.ch13.pipeline.PipeContext; import io.github.viscent.mtpattern.ch13.pipeline.PipeException; import io.github.viscent.mtpattern.ch13.pipeline.Pipeline; import io.github.viscent.mtpattern.ch13.pipeline.SimplePipeline; import java.io.File; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class DataSyncTask implements Runnable { public void run() { ResultSet rs = null; SimplePipeline<RecordSaveTask, String> pipeline = buildPipeline(); pipeline.init(pipeline.newDefaultPipelineContext()); Connection dbConn = null; try { dbConn = getConnection(); rs = qryRecords(dbConn); processRecords(rs, pipeline); } catch (Exception e) { e.printStackTrace(); } finally { if (null != dbConn) { try { dbConn.close(); } catch (SQLException e) { ; } } } pipeline.shutdown(360, TimeUnit.SECONDS); } private ResultSet qryRecords(Connection dbConn) throws Exception { dbConn.setReadOnly(true); PreparedStatement ps = dbConn .prepareStatement( "select id,productId,packageId,msisdn,operationTime,operationType," + "effectiveDate,dueDate from subscriptions order by operationTime", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ResultSet rs = ps.executeQuery(); return rs; } protected Connection getConnection() throws Exception { Connection dbConn = null; Class.forName("org.hsqldb.jdbc.JDBCDriver"); dbConn = DriverManager.getConnection( "jdbc:hsqldb:hsql://192.168.1.105:9001/viscent-test", "SA", ""); return dbConn; } private static Record makeRecordFrom(ResultSet rs) throws SQLException { Record record = new Record(); record.setId(rs.getInt("id")); record.setProductId(rs.getString("productId")); record.setPackageId(rs.getString("packageId")); record.setMsisdn(rs.getString("msisdn")); record.setOperationTime(rs.getTimestamp("operationTime")); record.setOperationType(rs.getInt("operationType")); record.setEffectiveDate(rs.getTimestamp("effectiveDate")); record.setDueDate(rs.getTimestamp("dueDate")); return record; } private static class RecordSaveTask { public final Record[] records; public final int targetFileIndex; public final String recordDay; public RecordSaveTask(Record[] records, int targetFileIndex) { this.records = records; this.targetFileIndex = targetFileIndex; this.recordDay = null; } public RecordSaveTask(String recordDay, int targetFileIndex) { this.records = null; this.targetFileIndex = targetFileIndex; this.recordDay = recordDay; } } @SuppressWarnings("unchecked") private SimplePipeline<RecordSaveTask, String> buildPipeline() { /* * 執行緒池的本質是重複利用一定數量的執行緒,而不是針對每個任務都有一個專門的工作者執行緒。 * 這裡,各個Pipe的初始化話完全可以在上游Pipe初始化完畢後再初始化化其後繼Pipe,而不必多個Pipe同時初始化。 * 因此,這個初始化的動作可以由一個執行緒來處理。該執行緒處理完各個Pipe的初始化後,可以繼續處理之後可能產生的任務, 如錯誤處理。 * 所以,上述這些先後產生的任務可以由執行緒池中的一個工作者執行緒從頭到尾負責執行。 */ final ExecutorService helperExecutor = Executors.newSingleThreadExecutor(); final SimplePipeline<RecordSaveTask, String> pipeline = new SimplePipeline<RecordSaveTask, String>( helperExecutor); // 根據資料庫記錄生成相應的資料檔案 Pipe<RecordSaveTask, File> stageSaveFile = new AbstractPipe<RecordSaveTask, File>() { @Override protected File doProcess(RecordSaveTask task) throws PipeException { final RecordWriter recordWriter = RecordWriter.getInstance(); final Record[] records = task.records; File file; if (null == records) { file = recordWriter.finishRecords(task.recordDay, task.targetFileIndex); } else { try { file = recordWriter.write(records, task.targetFileIndex); } catch (IOException e) { throw new PipeException(this, task, "Failed to save records.", e); } } return file; } }; /* * 由於這裡的幾個Pipe都是處理I/O的,為了避免使用鎖(以減少不必要的上下文切換) 但又能保證執行緒安全,故每個Pipe都採用單執行緒處理。 * 若各個Pipe要改用執行緒池來處理,需要注意:1)執行緒安全 2)死鎖 */ pipeline.addAsWorkerThreadBasedPipe(stageSaveFile, 1); final String[][] ftpServerConfigs = retrieveFTPServConf(); final ThreadPoolExecutor ftpExecutorService = new ThreadPoolExecutor(1, ftpServerConfigs.length, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100), new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); } catch (InterruptedException e) { ; } } } }); // 將生成的資料檔案傳輸到指定的主機上。 Pipe<File, File> stageTransferFile = new AbstractParallelPipe<File, File, File>( new SynchronousQueue<File>(), ftpExecutorService) { final Future<FTPClientUtil>[] ftpClientUtilHolders = new Future[ftpServerConfigs.length]; @Override public void init(PipeContext pipeCtx) { super.init(pipeCtx); String[] ftpServerConfig; for (int i = 0; i < ftpServerConfigs.length; i++) { ftpServerConfig = ftpServerConfigs[i]; ftpClientUtilHolders[i] = FTPClientUtil.newInstance( ftpServerConfig[0], ftpServerConfig[1], ftpServerConfig[2]); } } @Override protected List<Callable<File>> buildTasks(final File file) { List<Callable<File>> tasks = new LinkedList<Callable<File>>(); for (Future<FTPClientUtil> ftpClientUtilHolder : ftpClientUtilHolders) { tasks.add(new ParallelTask(ftpClientUtilHolder, file)); } return tasks; } @Override protected File combineResults(List<Future<File>> subTaskResults) throws Exception { if (0 == subTaskResults.size()) { return null; } File file = null; file = subTaskResults.get(0).get(); return file; } @Override public void shutdown(long timeout, TimeUnit unit) { super.shutdown(timeout, unit); ftpExecutorService.shutdown(); try { ftpExecutorService.awaitTermination(timeout, unit); } catch (InterruptedException e1) { ; } for (Future<FTPClientUtil> ftpClientUtilHolder : ftpClientUtilHolders) { try { ftpClientUtilHolder.get().disconnect(); } catch (Exception e) { ; } } } class ParallelTask implements Callable<File> { public final Future<FTPClientUtil> ftpUtilHodler; public final File file2Transfer; public ParallelTask(Future<FTPClientUtil> ftpUtilHodler, File file2Transfer) { this.ftpUtilHodler = ftpUtilHodler; this.file2Transfer = file2Transfer; } @Override public File call() throws Exception { File transferedFile = null; ftpUtilHodler.get().upload(file2Transfer); transferedFile = file2Transfer; return transferedFile; } } }; pipeline.addAsWorkerThreadBasedPipe(stageTransferFile, 1); // 備份已經傳輸的資料檔案 Pipe<File, Void> stageBackupFile = new AbstractPipe<File, Void>() { @Override protected Void doProcess(File transferedFile) throws PipeException { RecordWriter.backupFile(transferedFile); return null; } @Override public void shutdown(long timeout, TimeUnit unit) { // 所有檔案備份完畢後,清理掉空資料夾 RecordWriter.purgeDir(); } }; pipeline.addAsWorkerThreadBasedPipe(stageBackupFile,1); return pipeline; } protected String[][] retrieveFTPServConf() { String[][] ftpServerConfigs = new String[][] { { "192.168.1.105", "datacenter", "abc123" } //, //{ "192.168.1.104", "datacenter", "abc123" } //, //{ "192.168.1.103", "datacenter", "abc123" } }; return ftpServerConfigs; } private void processRecords(ResultSet rs, Pipeline<RecordSaveTask, String> pipeline) throws Exception { Record record; Record[] records = new Record[Config.RECORD_SAVE_CHUNK_SIZE]; int targetFileIndex = 0; int nextTargetFileIndex = 0; int recordCountInTheDay = 0; int recordCountInTheFile = 0; String recordDay = null; String lastRecordDay = null; SimpleDateFormat sdf = new SimpleDateFormat("yyMMdd"); while (rs.next()) { record = makeRecordFrom(rs); lastRecordDay = recordDay; recordDay = sdf.format(record.getOperationTime()); if (recordDay.equals(lastRecordDay)) { records[recordCountInTheFile] = record; recordCountInTheDay++; } else { // 實際已發生的不同日期記錄檔案切換 if (null != lastRecordDay) { if (recordCountInTheFile >= 1) { pipeline.process(new RecordSaveTask(Arrays.copyOf(records, recordCountInTheFile), targetFileIndex)); } else { pipeline .process(new RecordSaveTask(lastRecordDay, targetFileIndex)); } // 在此之前,先將records中的內容寫入檔案 records[0] = record; recordCountInTheFile = 0; } else { // 直接賦值 records[0] = record; } recordCountInTheDay = 1; } if (nextTargetFileIndex == targetFileIndex) { recordCountInTheFile++; if (0 == (recordCountInTheFile % Config.RECORD_SAVE_CHUNK_SIZE)) { pipeline.process(new RecordSaveTask(Arrays.copyOf(records, recordCountInTheFile), targetFileIndex)); recordCountInTheFile = 0; } } nextTargetFileIndex = (recordCountInTheDay) / Config.MAX_RECORDS_PER_FILE; if (nextTargetFileIndex > targetFileIndex) { // 預測到將發生同日期記錄檔案切換 if (recordCountInTheFile > 1) { pipeline.process(new RecordSaveTask(Arrays.copyOf(records, recordCountInTheFile), targetFileIndex)); } else { pipeline.process(new RecordSaveTask(recordDay, targetFileIndex)); } recordCountInTheFile = 0; targetFileIndex = nextTargetFileIndex; } else if (nextTargetFileIndex < targetFileIndex) { // 實際已發生的異日期記錄檔案切換,recordCountInTheFile保持當前值 targetFileIndex = nextTargetFileIndex; } } if (recordCountInTheFile > 0) { pipeline.process(new RecordSaveTask(Arrays.copyOf(records, recordCountInTheFile), targetFileIndex)); } } }
資料同步定時任務DataSyncTask建立了一個Pipeline。該Pipeline包含了3個Pipe:stageSaveFile、stageTransferFile和stageBackupFile。
stageSaveFile將資料庫記錄寫入資料檔案。若當前資料檔案已經寫滿,則該檔案(如fileX)會被提交給stageTransferFile。當stageTransferFile將檔案傳輸到多臺FTP主機時,stageSaveFile可能正將資料庫記錄寫入新的資料檔案。因此,這就形成了依賴關係(資料必須先寫到檔案中,相應的資料才能進行FTP傳輸),但從整體上產生了平行計算。同理,當stageTransferFile將檔案(如fileX)傳輸完畢後,該檔案被提交給stageBackupFile。
程式碼都是按照單執行緒模型去編寫的,這降低了編碼難度,並減少了上下文切換。
package io.github.viscent.mtpattern.ch13.pipeline.example; import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPClientConfig; import org.apache.commons.net.ftp.FTPReply; //模式角色:Promise.Promisor、Promise.Result public class FTPClientUtil { private final FTPClient ftp = new FTPClient(); private final Map<String, Boolean> dirCreateMap = new HashMap<String, Boolean>(); /* * helperExecutor是個靜態變數,這使得newInstance方法在生成不同的FTPClientUtil例項時可以共用同一個執行緒池。 * 模式角色:Promise.TaskExecutor */ private volatile static ExecutorService helperExecutor; static { helperExecutor = new ThreadPoolExecutor(1, Runtime.getRuntime() .availableProcessors() * 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }, new ThreadPoolExecutor.CallerRunsPolicy()); } // 私有構造器 private FTPClientUtil() { } // 模式角色:Promise.Promisor.compute public static Future<FTPClientUtil> newInstance(final String ftpServer, final String userName, final String password) { Callable<FTPClientUtil> callable = new Callable<FTPClientUtil>() { @Override public FTPClientUtil call() throws Exception { FTPClientUtil self = new FTPClientUtil(); self.init(ftpServer, userName, password); return self; } }; // task相當於模式角色:Promise.Promise final FutureTask<FTPClientUtil> task = new FutureTask<FTPClientUtil>( callable); helperExecutor.execute(task); return task; } private void init(String ftpServer, String userName, String password) throws Exception { FTPClientConfig config = new FTPClientConfig(); ftp.configure(config); int reply; ftp.connect(ftpServer); System.out.print(ftp.getReplyString()); reply = ftp.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftp.disconnect(); throw new RuntimeException("FTP server refused connection."); } boolean isOK = ftp.login(userName, password); if (isOK) { System.out.println(ftp.getReplyString()); } else { throw new RuntimeException("Failed to login." + ftp.getReplyString()); } reply = ftp.cwd("~/subspsync"); if (!FTPReply.isPositiveCompletion(reply)) { ftp.disconnect(); throw new RuntimeException("Failed to change working directory.reply:" + reply); } else { System.out.println(ftp.getReplyString()); } ftp.setFileType(FTP.ASCII_FILE_TYPE); } public void upload(File file) throws Exception { InputStream dataIn = new BufferedInputStream(new FileInputStream(file), 1024 * 8); boolean isOK; String dirName = file.getParentFile().getName(); String fileName = dirName + '/' + file.getName(); ByteArrayInputStream checkFileInputStream = new ByteArrayInputStream( "".getBytes()); try { if (!dirCreateMap.containsKey(dirName)) { ftp.makeDirectory(dirName); dirCreateMap.put(dirName, null); } try { isOK = ftp.storeFile(fileName, dataIn); } catch (IOException e) { throw new RuntimeException("Failed to upload " + file, e); } if (isOK) { ftp.storeFile(fileName + ".c", checkFileInputStream); } else { throw new RuntimeException("Failed to upload " + file + ",reply:" + "," + ftp.getReplyString()); } } finally { dataIn.close(); } } public void disconnect() { if (ftp.isConnected()) { try { ftp.disconnect(); } catch (IOException ioe) { // 什麼也不做 } } } }
package io.github.viscent.mtpattern.ch13.pipeline.example; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; import java.text.DateFormat; import java.text.DecimalFormat; import java.text.FieldPosition; import java.text.SimpleDateFormat; import java.util.HashMap; import java.util.Map; public class RecordWriter { private static final RecordWriter INSTANCE = new RecordWriter(); // HashMap不是執行緒安全的,但RecordWriter例項是在單執行緒中使用的,因此不會產生問題。 private static Map<String, PrintWriter> printWriterMap = new HashMap<String, PrintWriter>(); private static String baseDir; private static final char FIELD_SEPARATOR = '|'; // SimpleDateFormat不是執行緒安全的,但RecordWriter例項是在單執行緒中使用的,因此不會產生問題。 private static final SimpleDateFormat DIRECTORY_NAME_FORMATTER = new SimpleDateFormat( "yyMMdd"); private static final SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:MM:ss"); private static final DecimalFormat FILE_INDEX_FORMATTER = new DecimalFormat( "0000"); private static final int RECORD_JOIN_SIZE = Config.RECORD_JOIN_SIZE; private static final FieldPosition FIELD_POS = new FieldPosition( DateFormat.Field.DAY_OF_MONTH); //私有構造器 private RecordWriter() { baseDir = System.getProperty("user.home") + "/tmp/subspsync/"; } public static RecordWriter getInstance() { return INSTANCE; } public File write(Record[] records, int targetFileIndex) throws IOException { if (null == records || 0 == records.length) { throw new IllegalArgumentException("records is null or empty"); } int recordCount = records.length; String recordDay; recordDay = DIRECTORY_NAME_FORMATTER.format(records[0].getOperationTime()); String fileKey = recordDay + '-' + targetFileIndex; PrintWriter pwr = printWriterMap.get(fileKey); if (null == pwr) { File file = new File(baseDir + '/' + recordDay + "/subspsync-gw-" + FILE_INDEX_FORMATTER.format(targetFileIndex) + ".dat"); File dir = file.getParentFile(); if (!dir.exists() && !dir.mkdirs()) { throw new IOException("No such directory:" + dir); } pwr = new PrintWriter(new BufferedWriter(new FileWriter(file, true), Config.WRITER_BUFFER_SIZE)); printWriterMap.put(fileKey, pwr); } StringBuffer strBuf = new StringBuffer(40); int i = 0; for (Record record : records) { i++; pwr.print(String.valueOf(record.getId())); pwr.print(FIELD_SEPARATOR); pwr.print(record.getMsisdn()); pwr.print(FIELD_SEPARATOR); pwr.print(record.getProductId()); pwr.print(FIELD_SEPARATOR); pwr.print(record.getPackageId()); pwr.print(FIELD_SEPARATOR); pwr.print(String.valueOf(record.getOperationType())); pwr.print(FIELD_SEPARATOR); strBuf.delete(0, 40); pwr.print(sdf.format(record.getOperationTime(), strBuf, FIELD_POS)); pwr.print(FIELD_SEPARATOR); strBuf.delete(0, 40); pwr.print(sdf.format(record.getEffectiveDate(), strBuf, FIELD_POS)); strBuf.delete(0, 40); pwr.print(FIELD_SEPARATOR); pwr.print(sdf.format(record.getDueDate(), strBuf, FIELD_POS)); pwr.print('\n'); if (0 == (i % RECORD_JOIN_SIZE)) { pwr.flush(); i = 0; // Thread.yield(); } } if (i > 0) { pwr.flush(); } File file = null; // 處理當前檔案中的最後一組記錄 if (recordCount < Config.RECORD_SAVE_CHUNK_SIZE) { pwr.close(); file = new File(baseDir + '/' + recordDay + "/subspsync-gw-" + FILE_INDEX_FORMATTER.format(targetFileIndex) + ".dat"); printWriterMap.remove(fileKey); } return file; } public File finishRecords(String recordDay, int targetFileIndex) { String fileKey = recordDay + '-' + targetFileIndex; PrintWriter pwr = printWriterMap.get(fileKey); File file = null; if (null != pwr) { pwr.flush(); pwr.close(); file = new File(baseDir + '/' + recordDay + "/subspsync-gw-" + FILE_INDEX_FORMATTER.format(targetFileIndex) + ".dat"); printWriterMap.remove(fileKey); } return file; } public static void backupFile(final File file) { String recordDay = file.getParentFile().getName(); File destFile = new File(baseDir + "/backup/" + recordDay); if (!destFile.exists()) { if (!destFile.mkdirs()) { return; } } destFile = new File(destFile, file.getName()); if (!file.renameTo(destFile)) { throw new RuntimeException("Failed to backup file " + file); } file.delete(); } public static void purgeDir() { File[] dirs = new File(baseDir).listFiles(); for (File dir : dirs) { if (dir.isDirectory() && 0 == dir.list().length) { dir.delete(); } } } }