Python定時任務--apscheduler
APScheduler是一個python的第三方庫,用來提供python的後臺程式。包含四個元件,分別是:
triggers: 任務觸發器元件,提供任務觸發方式
job stores: 任務商店元件,提供任務儲存方式
executors: 任務排程元件,提供任務排程方式
schedulers: 任務排程元件,提供任務工作方式
官網文件:https://apscheduler.readthedocs.io/en/3.3.1/
1.安裝
$ pip install apscheduler # python3 使用pip3
2.簡單例項
from apscheduler.schedulers.blocking import BlockingScheduler import time # 例項化一個排程器 scheduler = BlockingScheduler() def job_task(): print "%s: 執行任務"% time.asctime() # 新增任務並設定觸發方式為3s一次 scheduler.add_job(job_task, 'interval', seconds=3) # 開始執行排程器 scheduler.start()
輸出:
$ python first.py Fri Sep8 20:41:55 2017: 執行任務 Fri Sep8 20:41:58 2017: 執行任務 ...
3.trigger元件
trigger提供任務的觸發方式,共三種方式:
-
date:只在某個時間點執行一次run_date(datetime|str)
scheduler.add_job(my_job, 'date', run_date=date(2019, 4, 19), args=[]) scheduler.add_job(my_job, 'date', run_date=datetime(2019, 4, 19, 21, 30, 5), args=[]) scheduler.add_job(my_job, 'date', run_date='2019-4-19 21:30:05', args=[]) # The 'date' trigger and datetime.now() as run_date are implicit sched.add_job(my_job, args=[[])
-
interval: 每隔一段時間執行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None
scheduler.add_job(my_job, 'interval', hours=2) scheduler.add_job(my_job, 'interval', hours=2, start_date='2019-4-19 21:30:00', end_date='2019-04-20 21:30:00) @scheduler.scheduled_job('interval', id='my_job_id', hours=2) def my_job(): print("Hello World")
- cron: 使用同linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)
sched.add_job(my_job, 'cron', hour=3, minute=30) sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10-30') @sched.scheduled_job('cron', id='my_job_id', day='last sun') def some_decorated_task(): print("I am printed at 00:00:00 on the last Sunday of every month!")
4.scheduler元件
scheduler元件提供執行的方式,在不同的運用環境中選擇合適的方式
- BlockingScheduler: 程序中只執行排程器時的方式
from apscheduler.schedulers.blocking import BlockingScheduler import time scheduler = BlockingScheduler() def job1(): print "%s: 執行任務"% time.asctime() scheduler.add_job(job1, 'interval', seconds=3) scheduler.start()
- BackgroundScheduler: 不想使用任何框架時的方式
from apscheduler.schedulers.background import BackgroundScheduler import time scheduler = BackgroundScheduler() def job1(): print "%s: 執行任務"% time.asctime() scheduler.add_job(job1, 'interval', seconds=3) scheduler.start() while True: pass
- AsyncIOScheduler: asyncio module的方式(Python3)
from apscheduler.schedulers.asyncio import AsyncIOScheduler try: import asyncio except ImportError: import trollius as asyncio ... ... # while True:pass try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): pass
- GeventScheduler: gevent方式
from apscheduler.schedulers.gevent import GeventScheduler ... ... g = scheduler.start() # while True:pass try: g.join() except (KeyboardInterrupt, SystemExit): pass
- ornadoScheduler: Tornado方式
from tornado.ioloop import IOLoop from apscheduler.schedulers.tornado import TornadoScheduler ... ... # while True:pass try: IOLoop.instance().start() except (KeyboardInterrupt, SystemExit): pass
- TwistedScheduler: Twisted方式
from twisted.internet import reactor from apscheduler.schedulers.twisted import TwistedScheduler ... ... # while True:pass try: reactor.run() except (KeyboardInterrupt, SystemExit): pass
- QtScheduler: Qt方式
5.executors元件
executors元件提供任務的排程方式
- base
- debug
- gevent
- pool(max_workers=10)
- twisted
6.jobstore元件
jobstore提供任務的各種持久化方式
- base
- memory
- mongodb scheduler.add_jobstore('mongodb', collection='example_jobs')
- redis scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')
- rethinkdb scheduler.add_jobstore('rethinkdb', database='apscheduler_example')
- sqlalchemy scheduler.add_jobstore('sqlalchemy', url=url)
- zookeeper scheduler.add_jobstore('zookeeper', path='/example_jobs')
7.任務操作
-
新增任務add_job 果使用了任務的儲存,開啟時最好新增replace_existing=True,否則每次開啟都會建立任務的副本 開啟後任務不會馬上啟動,可修改trigger引數
-
刪除任務remove_job
# 根據任務例項刪除 job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove() # 根據任務id刪除 scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id')
- 任務的暫停pause_job和繼續resume_job
job = scheduler.add_job(myfunc, 'interval', minutes=2) # 根據任務例項 job.pause() job.resume() # 根據任務id暫停 scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.pause_job('my_job_id') scheduler.resume_job('my_job_id')
-
任務的修飾modify和重設reschedule_job
修飾:job.modify(max_instances=6, name='Alternate name')
重設:scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
8.排程器操作
- 開啟 scheduler.start()
- 關閉 scheduler.shotdown(wait=True | False)
- 暫停 scheduler.pause()
- 繼續 scheduler.resume()
- 監聽https://apscheduler.readthedocs.io/en/v3.3.0/modules/events.html#module-apscheduler.events
def my_listener(event): if event.exception: print('The job crashed :(') else: print('The job worked :)') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)