Python RQ(Redis Queue)新增gevent支援
python-rq簡單好用,但缺點是,預設的實現是使用fork的模式,關於這點可以看python-rq原始碼分析 。
所以我們要對他進行改造,每次執行任務,我們就使用一個coroutine。gevent的文件中這樣寫道:
Patching should be done as early as possible in the lifecycle of the program.
因此,我們在最上面就開始進行 monkey patch。此外,我把queue定義在了jobs/queue.py
裡。直接上程式碼:
# worker.py import gevent.monkey gevent.monkey.patch_all()# noqa import logging from rq.worker import ( Worker, WorkerStatus, ) import redis from config import config from jobs import ( money_q, message_q, ) class GeventWorker(Worker): def execute_job(self, job, queue): self.set_state(WorkerStatus.BUSY) self.log.debug("gonna spawn a greenlet to execute job %s from queue", job, queue) gevent.spawn(self.perform_job, job, queue).join() self.log.debug("job %s from queue %s executed", job, queue) self.set_state(WorkerStatus.IDLE) def gevent_worker(queues): worker = GeventWorker( queues=queues, connection=redis.StrictRedis.from_url(config.WORKER_BROKER) ) worker.work() if __name__ == "__main__": gevent_worker([money_q, message_q])
# queue.py from rq import Queue import redis from config import config __conn = redis.StrictRedis.from_url(config.WORKER_BROKER) money_q = Queue("money", connection=__conn) message_q = Queue("message", connection=__conn)
解釋一下實現原理:
首先閱讀 rq 預設的worker實現,就會發現,所有的worker都有execute_job
這個方法,因此我們繼承Worker
並且
重寫這個方法,在我們的實現裡,新起一個coroutine來執行相關程式碼。