Flask任務排程APScheduler
ofollow,noindex">APScheduler
是一款功能非常強大的定時任務框架。利用APScheduler
框架我們可以很方便實現一個基於Python
的定時任務系統。Flask
提供了對應的Flask-APScheduler
來整合APScheduler
功能。
安裝Flask-APScheduler
pip install Flask-APScheduler
配置
新增配置
- app/config/scheduler.py
from app.tasks.task import test_task SCHEDULER_API_ENABLED = True JOBS = [ { 'id': 'job_announce', 'func': test_task, 'args': '', 'trigger': 'interval', 'seconds': 2 } ]
- app.py
def create_app(): app = Flask(__name__) app.config.from_object('app.config.secure') app.config.from_object('app.config.setting') app.config.from_object('app.config.scheduler') register_blueprint(app) scheduler = APScheduler() scheduler.init_app(app) scheduler.start() return app
定義任務
- app.tasks.task.py
def test_task(): print(1111) return True
啟動flask,任務就可以執行了
動態新增任務
這裡通過呼叫url
來實現動態任務的新增
from flask import current_app from app.tasks.php_api import test_task @api.route('/create/') def creat_task(): current_app.apscheduler.add_job(func=test_task, id="test-tasks", trigger="interval", seconds=2) return 'create task'
啟動flask後發現並沒有定時任務,我們呼叫一下這個url
後發現定時任務開始