執行緒池優化之充分利用執行緒池資源
一、前言
最近做了電子發票的需求,分省開票介面和發票下載介面都有一定的延遲。為了完成開票後自動將發票插入使用者微信卡包,目前的解決方案是利用執行緒池,將開票後插入卡包的任務(輪詢分省發票介面,直到獲取到發票相關資訊或者輪詢次數用完,如果獲取到發票資訊,執行發票插入微信卡包,結束任務)放入執行緒池非同步執行。仔細想一想,這種實現方案存在一個問題,執行緒池沒有充分的利用。為什麼沒有充分的利用?下面詳細的分析。
二、非同步執行緒池和非同步任務包裝
AsyncConfigurerSupport可以幫我們指定非同步任務(注有@Async註解)對應的執行緒池。
@Configuration public class MyAsyncConfigurer extends AsyncConfigurerSupport { private static Logger LOGGER = LoggerFactory.getLogger(MyAsyncConfigurer.class); @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(2); taskExecutor.setMaxPoolSize(4); taskExecutor.setQueueCapacity(10); taskExecutor.setRejectedExecutionHandler((runnable, executor) -> LOGGER.error("非同步執行緒池拒絕任務..." + runnable)); taskExecutor.setThreadFactory(new MyAsyncThreadFactory()); taskExecutor.initialize(); return taskExecutor; } static class MyAsyncThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; MyAsyncThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "myasync-pool-" + poolNumber.getAndIncrement() + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } }
非同步任務包裝,除了非同步,還加入了retry功能,實現指定次數的介面輪詢。
@Component public class AsyncWrapped { protected static Logger LOGGER = LoggerFactory.getLogger(AsyncWrapped.class); @Async public void asyncProcess(Runnable runnable, Callback callback, Retry retry) { try { if (retry == null) { retry = new Retry(1); } retry.execute(ctx -> { runnable.run(); return null; }, ctx -> { if (callback != null) { callback.call(); } return null; }); } catch (Exception e) { LOGGER.error("非同步呼叫異常...", e); } } }
業務程式碼大致邏輯如下。
asyncWrapped.asyncProcess(() -> { //呼叫分省介面獲取發票資訊 //如果發票資訊異常,丟擲異常(進入下次重試) //否則,插入使用者微信卡包 }, () -> { //輪詢次數用盡,使用者插入卡包失敗 } , new Retry(2, 1000) );
這裡說一下為什麼執行緒池沒有充分的利用。非同步任務中包含輪詢操作,輪詢有一定的時間間隔,導致在這段時間間隔內,執行緒一直處於被閒置的狀態。所以為了能更好的利用執行緒池資源,我們得想辦法解決時間間隔的問題。假如有個延遲佇列,佇列裡放著我們的非同步任務(不包含重試機制),然後延遲(輪詢的時間間隔)一定時間之後,將任務放入執行緒池中執行,任務執行完畢之後根據是否需要再次執行決定是否再次放入到延遲佇列去,這樣每個執行緒池中的執行緒都不會閒著,達到了充分利用的目的。
三、定時任務執行緒池和實現輪詢機制
@EnableScheduling 幫助開啟@Scheduled註解解析。註冊一個名字是ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME的定時任務執行緒池。
@Configuration @EnableScheduling @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class TaskConfiguration { @Bean(name = ScheduledAnnotationBeanPostProcessor.DEFAULT_TASK_SCHEDULER_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledExecutorService scheduledAnnotationProcessor() { return Executors.newScheduledThreadPool(5, new DefaultThreadFactory()); } private static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-schedule-"; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } return t; } } }
實現輪詢任務,實現介面SchedulingConfigurer,獲取ScheduledTaskRegistrar 並指定定時任務執行緒池。
@Override public void configureTasks(ScheduledTaskRegistrar registrar) { this.registrar = registrar; this.registrar.setScheduler(this.applicationContext.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, ScheduledExecutorService.class)); scheduledTaskRegistrarHelper = new ScheduledTaskRegistrarHelper(); }
scheduledFutures提交定時任務時返回結果集,periodTasks 定時任務結果集。
private static final ConcurrentHashMap<String, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>(); private static final ConcurrentHashMap<String, TimingTask> periodTasks = new ConcurrentHashMap<>();
定時任務包裝類,包含任務的執行次數(重試次數)、重試間隔、具體任務、重試次數用盡之後的回撥等,以及自動結束定時任務、重試計數重置功能。
private static class TimingTask { //重試次數 private Integer retry; //任務標識 private String taskId; //重試間隔 private Long period; //具體任務 private ScheduledRunnable task; //結束回撥 private ScheduledCallback callback; //重試計數 private AtomicInteger count = new AtomicInteger(0); //父執行緒MDC private Map<String, String> curContext; public TimingTask(Integer retry, String taskId, Long period, ScheduledRunnable task, ScheduledCallback callback) { this.retry = retry; this.taskId = taskId; this.period = period; this.task = task; this.callback = callback; this.curContext = MDC.getCopyOfContextMap(); } public Long getPeriod() { return period; } public void setPeriod(Long period) { this.period = period; } public String getTaskId() { return taskId; } public void setTaskId(String taskId) { this.taskId = taskId; } public Integer getRetry() { return retry; } public void setRetry(Integer retry) { this.retry = retry; } public AtomicInteger getCount() { return count; } public boolean reset() { for (int cnt = this.count.intValue(); cnt < this.retry; cnt = this.count.intValue()) { if (this.count.compareAndSet(cnt, 0)) { return true; } } return false; } public void process() { Map<String, String> preContext = MDC.getCopyOfContextMap(); try { if (this.curContext == null) { MDC.clear(); } else { // 將父執行緒的MDC內容傳給子執行緒 MDC.setContextMap(this.curContext); } this.task.run(); exitTask(false); } catch (Exception e) { LOGGER.error("定時任務異常..." + this, e); if (count.incrementAndGet() >= this.retry) { exitTask(true); } } finally { if (preContext == null) { MDC.clear(); } else { MDC.setContextMap(preContext); } } } //定時任務退出 private void exitTask(boolean execCallback) { scheduledFutures.get(this.taskId).cancel(false); scheduledFutures.remove(this.getTaskId()); periodTasks.remove(this.getTaskId()); LOGGER.info("結束定時任務: " + this); if (execCallback && callback != null) { callback.call(); } } @Override public String toString() { return ReflectionToStringBuilder.toString(this , ToStringStyle.JSON_STYLE , false , false , TimingTask.class); } }
注意上面定時任務是如何退出的,是在某一次任務執行成功之後(沒有異常丟擲)或者定時任務執行次數用盡才退出的。直接呼叫ScheduledFuture的cancel方法可以退出定時任務。還有就是定時任務中的日誌需要父執行緒中的日誌變數,所以需要對MDC進行一下處理。
@Scope("prototype") @Bean public AspectTimingTask aspectTimingTask() { return new AspectTimingTask(); } @Aspect @Component public static class ScheduledAspect { @Around("target(AspectTimingTask)") public Object executeScheduledWrapped(ProceedingJoinPoint proceedingJoinPoint) throws Throwable { if (proceedingJoinPoint instanceof MethodInvocationProceedingJoinPoint) { MethodInvocationProceedingJoinPoint methodJoinPoint = (MethodInvocationProceedingJoinPoint) proceedingJoinPoint; Method method = ((MethodSignature) methodJoinPoint.getSignature()).getMethod(); if (AnnotatedElementUtils.isAnnotated(method, ScheduledTask.class)) { LOGGER.info("電子發票定時任務日誌同步..."); //其他處理 } } return proceedingJoinPoint.proceed(); } } public static class AspectTimingTask implements Runnable { private TimingTask timingTask; @Override @ScheduledTask public void run() { timingTask.process(); } public void setTimingTask(TimingTask timingTask) { this.timingTask = timingTask; } }
AspectTimingTask 是對TimingTask 的包裝類,實現了Runnable介面。主要是為了對run介面做一層切面,獲取ProceedingJoinPoint 例項(公司中的日誌呼叫鏈系統需要這個引數)。AspectTimingTask 的bean例項的scope是prototype,這個注意下。
public static void register(Integer retry , Long period , String taskId , ScheduledRunnable task , ScheduledCallback callback) { scheduledTaskRegistrarHelper.register(retry, taskId, period, task, callback); } private class ScheduledTaskRegistrarHelper { public void register(Integer retry , String taskId , Long period , ScheduledRunnable task , ScheduledCallback callback) { //是否可以重置定時任務 TimingTask preTask = periodTasks.get(taskId); if (null != preTask && preTask.reset() && existTask(taskId)) { return; } TimingTask curTask = new TimingTask(retry, taskId, period, task, callback); AspectTimingTask aspectTimingTask = applicationContext.getBean(AspectTimingTask.class); aspectTimingTask.setTimingTask(curTask); ScheduledFuture<?> scheduledFuture = registrar.getScheduler().scheduleAtFixedRate(aspectTimingTask, period); scheduledFutures.put(taskId, scheduledFuture); periodTasks.put(taskId, curTask); LOGGER.info("註冊定時任務: " + curTask); } private boolean existTask(String taskId) { return scheduledFutures.containsKey(taskId) && periodTasks.containsKey(taskId); } }
如果taskId的定時任務已經存在則重置定時任務,否則註冊新的定時任務。AspectTimingTask 例項通過ApplicationContext獲取,每次獲取都是一個新的例項。
由 非同步輪詢任務 優化成 定時任務,充分利用了執行緒池。修改之後的業務程式碼如下。
ScheduledTaskRegistrarHelper.register(10 , 5*1000L , "taskId" , () -> { //呼叫分省介面獲取發票資訊 //如果發票資訊異常,丟擲異常(進入下次重試) //否則,插入使用者微信卡包 } () -> { //輪詢次數用盡,使用者插入卡包失敗 } );
針對電子發票插入微信卡包定時任務,重試執行次數10次,每隔5秒執行一次。任務完成之後結束定時任務,執行次數用盡之後觸發插入卡包失敗動作。