java實現任務排程
最近的一個小專案是做一個簡單的資料倉庫,需要將其他資料庫的資料抽取出來,並通過而出抽取成頁面需要的資料,以空間換時間的方式,讓後端報表查詢更快。
因為在抽取的過程中,有一定的先後順序,需要做一個任務排程器,某一優先順序的會先執行,然後會進入下一個優先順序的佇列任務中。
先定義了一個Map的集合,key是優先順序,value是任務的集合,某一個優先順序內的任務是併發執行的,而不同優先順序是序列執行的,前一個優先順序執行完之後,後面的才會執行。
ConcurrentHashMap<Integer/* 優先順序. */, List<BaseTask>/* 任務集合. */> tasks = new ConcurrentHashMap<>();
這個排程管理有一個演進的過程,我先說第一個,這個是比較好理解的。
第一個版本:
首先對tasks集合中的key進行一個排序,我定義的是數字越小就有限執行,則進行遍歷key值,並取出某個優先順序的任務佇列,執行任務佇列的任務。任務的執行交給執行緒池去執行,在遍歷內部,需要不斷的檢查這個佇列中的任務是否都執行了,沒有則一直等待否則進入到下個佇列,任務執行的時候可能會丟擲異常,但是不管任務是否異常,都將任務狀態設定已執行。
下面是其核心程式碼:
public void run() { //對key值進行排序 Enumeration<Integer> keys = tasks.keys(); List<Integer> prioritys = new ArrayList<>(); while (keys.hasMoreElements()) { prioritys.add(keys.nextElement()); } Collections.sort(prioritys);//升序 //對key進行遍歷,執行某個某個優先順序的任務佇列 for (Integer priority : prioritys) { List<BaseTask> taskList = tasks.get(priority); if (taskList.isEmpty()) { continue; } logger.info("execute priority {} task ", taskList.get(0).priority); for (BaseTask task : taskList) { executor.execute(() -> { try { task.doTask(); } catch (Exception e) { e.printStackTrace(); } });//執行緒中執行任務 } while (true) {//等待所有執行緒都執行完成之後執行下一個任務佇列 boolean finish = true; for (BaseTask t : taskList) { if (!t.finish) { finish = false; } } if (finish) {//當前任務都執行完畢 break; } Misc.sleep(1000);//Thread.sleep(1000) } Misc.sleep(1000); } }
關鍵程式碼很好理解,在任務執行之前,需要對所有任務都初始化,初始化的時候給出每個任務的優先順序和任務名稱,任務抽象類如下:
public abstract class BaseTask { public String taskName;//任務名稱 public Integer priority; //優先順序 public boolean finish; //任務完成? /** * 執行的任務 */ public abstract void doTask(Date date) throws Exception;
第一個版本的思路很簡單。
第二個版本稍微有一點點複雜。這裡主要介紹該版本的內容,後續將程式碼的連結附上。
程式是由SpringBoot搭建起來的,定時器是Spring內建的輕量級的Quartz,使用Aop方式攔截異常,使用註解的方式在任務初始化時設定任務的初始變數。使用EventBus解耦程式,其中程式簡單實現郵件傳送功能(該功能還需要自己配置引數),以上這些至少需要簡單的瞭解一下。
程式的思路:在整個佇列執行過程中會有多個管道,某個佇列上的管道任務執行完成,可以直接進行到下一個佇列中執行,也設定了等待某一個佇列上的所有任務都執行完成才執行當前任務。在某個佇列任務中會標識某些任務是一隊的,其他的為另一隊,當這一隊任務執行完成,就可以到下一個佇列中去,不需要等待另一隊。
這裡會先初始化每個佇列的每個隊的條件,這個條件就是每個隊的任務數,執行完成減1,當為0時,就進入下一個佇列中。
分四個步驟進行完成:
1.bean的初始化
2.條件的設定
3.任務的執行
4.任務異常和任務執行完成之後通知檢查是否執行下一個佇列的任務
1.bean的初始化
1.建立註解類
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented public @interface TaskAnnotation { int priority() default 0;//優先順序 String taskName() default "";//任務名稱 TaskQueueEnum[] queueName() default {};//佇列名稱 }
2.實現BeanPostProcessor,該介面是中有兩個方法postProcessBeforeInitialization和postProcessAfterInitialization,分別是bean初始化之前和bean初始化之後做的事情。
Annotation[] annotations = bean.getClass().getAnnotations();//獲取類上的註解 if (ArrayUtils.isEmpty(annotations)) {//註解為空時直接返回(不能返回空,否則bean不會被載入) return bean; } for (Annotation annotation : annotations) { if (annotation.annotationType().equals(TaskAnnotation.class)) { TaskAnnotation taskAnnotation = (TaskAnnotation) annotation;//強轉 try { Field[] fields = target.getClass().getFields();//需要通過反射將值進行修改,下面的操作僅僅是物件的引用 if (!ArrayUtils.isEmpty(fields)) { for (Field f : fields) { f.setAccessible(true); if (f.getName().equals("priority")) { f.set(target, taskAnnotation.priority()); } } } } }
上面需要注意的一點是需要通過反射的機制給bean設定值,不能直接呼叫bean的方式set值,否則bean的值是空的。
上面的程式碼通過實現BeanPostProcessor後置處理器,處理任務上的註解,完成對任務的初始化的。
2.條件的初始化
建立條件類,提供初始化的方法。
public abstract class BaseTask { public int nextPriority;//子級節點的優先順序 public String taskName;//任務名稱 public Integer priority; //優先順序 public String queueName;//佇列名稱 public boolean finish; //任務完成? public boolean allExecute; /** * 執行的任務 */ public abstract void doTask(Date date) throws Exception; //任務完成之後,通過eventBus傳送通知,是否需要執行下一個佇列 public void notifyExecuteTaskMsg(EventBus eventBus, Date date) { EventNotifyExecuteTaskMsg msg = new EventNotifyExecuteTaskMsg(); msg.setDate(date); msg.setNextPriority(nextPriority); msg.setQueueName(queueName); msg.setPriority(priority); msg.setTaskName(taskName); eventBus.post(msg); } } public class TaskExecuteCondition { private ConcurrentHashMap<String, AtomicInteger> executeMap = new ConcurrentHashMap<>(); /** * 初始化,每個佇列進行分組,每個組的任務數量放入map集合中. */ public void init(ConcurrentHashMap<Integer, List<BaseTask>> tasks) { Enumeration<Integer> keys = tasks.keys(); List<Integer> prioritys = new ArrayList<>(); while (keys.hasMoreElements()) { prioritys.add(keys.nextElement()); } Collections.sort(prioritys);//升序 for (Integer priority : prioritys) { List<BaseTask> list = tasks.get(priority); if (list.isEmpty()) { continue; } //對每個佇列進行分組 Map<String, List<BaseTask>> collect = list.stream() .collect(Collectors.groupingBy(x -> x.queueName, Collectors.toList())); for (Entry<String, List<BaseTask>> entry : collect.entrySet()) { for (BaseTask task : entry.getValue()) { addCondition(task.priority, task.queueName); } } } } /** * 執行任務完成,條件減1 */ public boolean executeTask(Integer priority, String queueName) { String name = this.getQueue(priority, queueName); AtomicInteger count = executeMap.get(name); int sum = count.decrementAndGet(); if (sum == 0) { return true; } return false; } /** * 對個某個佇列的條件 */ public int getCondition(Integer priority, String queueName) { String name = this.getQueue(priority, queueName); return executeMap.get(name).get(); } private void addCondition(Integer priority, String queueName) { String name = this.getQueue(priority, queueName); AtomicInteger count = executeMap.get(name); if (count == null) { count = new AtomicInteger(0); executeMap.put(name, count); } count.incrementAndGet(); } private void addCondition(Integer priority, String queueName, int sum) { String name = this.getQueue(priority, queueName); AtomicInteger count = executeMap.get(name); if (count == null) { count = new AtomicInteger(sum); executeMap.put(name, count); } else { count.set(sum); } } private String getQueue(Integer priority, String queueName) { return priority + queueName; } /** * 清除佇列 */ public void clear() { this.executeMap.clear(); } }
3.任務的執行
任務執行類提供run方法,執行第一個佇列,並提供獲取下一個佇列優先順序方法,執行某個佇列某個組的方法。
public class ScheduleTask { private static final Logger logger = LoggerFactory.getLogger(ScheduleTask.class); public ConcurrentHashMap<Integer/* 優先順序. */, List<BaseTask>/* 任務集合. */> tasks = new ConcurrentHashMap<>(); @Autowired private ThreadPoolTaskExecutor executor;//執行緒池 //任務會先執行第一佇列的任務. public void run(Date date) { Enumeration<Integer> keys = tasks.keys(); List<Integer> prioritys = new ArrayList<>(); while (keys.hasMoreElements()) { prioritys.add(keys.nextElement()); } Collections.sort(prioritys);//升序 Integer priority = prioritys.get(0); executeTask(priority, date);//執行第一行的任務. } //獲取下一個佇列的優先順序 public Integer nextPriority(Integer priority) { Enumeration<Integer> keys = tasks.keys(); List<Integer> prioritys = new ArrayList<>(); while (keys.hasMoreElements()) { prioritys.add(keys.nextElement()); } Collections.sort(prioritys);//升序 for (Integer pri : prioritys) { if (priority < pri) { return pri; } } return null;//沒有下一個佇列 } public void executeTask(Integer priority) { List<BaseTask> list = tasks.get(priority); if (list.isEmpty()) { return; } for (BaseTask task : list) { execute(task); } } //執行某個佇列的某個組 public void executeTask(Integer priority, String queueName) { List<BaseTask> list = this.tasks.get(priority); list = list.stream().filter(task -> queueName.equals(task.queueName)) .collect(Collectors.toList()); if (list.isEmpty()) { return; } for (BaseTask task : list) { execute(task); } } public void execute(BaseTask task) { executor.execute(() -> { try { task.doTask(date);// } catch (Exception e) {//異常處理已經Aop攔截處理 } });//執行緒中執行任務 } /** * 增加任務 */ public void addTask(BaseTask task) { List<BaseTask> baseTasks = tasks.get(task.priority); if (baseTasks == null) { baseTasks = new ArrayList<>(); List<BaseTask> putIfAbsent = tasks.putIfAbsent(task.priority, baseTasks); if (putIfAbsent != null) { baseTasks = putIfAbsent; } } baseTasks.add(task); } /** * 將任務結束標識重新設定 */ public void finishTask() { tasks.forEach((key, value) -> { for (BaseTask task : value) { task.finish = false; } }); } }
4.任務異常和任務執行完成之後通知檢查是否執行下一個佇列的任務
public class EventNotifyExecuteTaskListener { private static final Logger logger = LoggerFactory .getLogger(EventNotifyExecuteTaskListener.class); @Autowired private ScheduleTask scheduleTask; @Autowired private TaskExecuteCondition condition; @Subscribe public void executeTask(EventNotifyExecuteTaskMsg msg) { //當前佇列的某組內容是否都執行完成 boolean success = condition.executeTask(msg.getPriority(), msg.getQueueName()); if (success) { Integer nextPriority = scheduleTask.nextPriority(msg.getPriority()); if (nextPriority != null) { scheduleTask.executeTask(nextPriority, msg.getQueueName(), msg.getDate());//執行下一個佇列 } else {//執行完成,重置任務標識 scheduleTask.finishTask(); logger.info("CoreTask end!"); } } } }
整個思路介紹到這裡,那麼接下來是整個專案中出現的一些問題
1.BeanPostProcessor與Aop一起使用時,postProcessAfterInitialization呼叫之後獲取的bean分為不同的了,一個是jdk原生實體物件,一種是Aop註解下的類會被cglib代理,生成帶有後綴的物件,如果通過這個物件時反射獲取類的註解,欄位和方法,就獲取不到,在程式碼中,需要將其轉化一下,將cgLib代理之後的類轉化為不帶字尾的物件。
2.postProcessAfterInitialization的引數bean不能直接設定值,就是如下:
TaskAnnotation taskAnnotation = (TaskAnnotation) annotation;//強轉 BaseTask baseTask = (BaseTask) bean;//強轉 baseTask.priority = taskAnnotation.priority();
在使用物件時,其中物件的欄位時為空的,並需要通過反射的方式去設定欄位的值。
上面僅僅只是個人的想法,如果有更好的方式,或者有某些地方可以進行改進的,我們可以共同探討一下。
連結地址:https://github.com/wangice/task-scheduler
程式中使用了一個公共包:https://github.com/wangice/misc