任務佇列怎麼寫?python rq原始碼分析
rq的意思是 Redis Queue。這個專案和redis是強結合的,此外還有一個重度依賴是pickle。 這是這個專案很簡單的原因之一。
拷貝原始碼
首先我們需要fork一份原始碼例如:ofollow,noindex" target="_blank">https://github.com/jiajunhuang/rq
,然後拷貝
到本地。進入原始碼資料夾$ cd rq/rq
,我們可以看到目錄結構:
$ tree . ├── cli │├── cli.py │├── helpers.py │└── __init__.py ├── compat │├── connections.py │├── dictconfig.py │└── __init__.py ├── connections.py ├── contrib │├── __init__.py │├── legacy.py │└── sentry.py ├── decorators.py ├── defaults.py ├── dummy.py ├── exceptions.py ├── handlers.py ├── __init__.py ├── job.py ├── local.py ├── logutils.py ├── queue.py ├── registry.py ├── suspension.py ├── timeouts.py ├── utils.py ├── version.py └── worker.py 3 directories, 26 files
入口
那麼我們應該怎麼讀原始碼呢?入口點是什麼?編寫程式碼的時候我們的入口點是main函式, 那麼讀原始碼的時候入口點應該是什麼呢?我們是怎麼啟動rq的worker呢?
$ rq worker
所以我們看看rq這個命令是怎麼來的:
cat `which rq` #!/home/jiajun/.py3k/bin/python3 # -*- coding: utf-8 -*- import re import sys from rq.cli import main if __name__ == '__main__': sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0]) sys.exit(main())
說明入口點在rq.cli
的main函式裡。接下來我們看看rq.cli
從何而來。
cat cli/__init__.py # flake8: noqa from .cli import main # TODO: the following imports can be removed when we drop the `rqinfo` and # `rqworkers` commands in favor of just shipping the `rq` command. from .cli import info, worker
接下來我們看看cli/cli.py
這個檔案,裡面可以看到def worker
,這就是我們要找
的入口點。可以看到真正開始工作的地方是worker.work(burst=burst)
預設值
DEFAULT_JOB_CLASS = 'rq.job.Job' DEFAULT_QUEUE_CLASS = 'rq.Queue' DEFAULT_WORKER_CLASS = 'rq.Worker' DEFAULT_CONNECTION_CLASS = 'redis.StrictRedis' DEFAULT_WORKER_TTL = 420 DEFAULT_RESULT_TTL = 500
一路追查worker初始化的地方的來源:
queues = [cli_config.queue_class(queue, connection=cli_config.connection, job_class=cli_config.job_class) for queue in queues] worker = cli_config.worker_class(queues, name=name, connection=cli_config.connection, default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, job_class=cli_config.job_class, queue_class=cli_config.queue_class, exception_handlers=exception_handlers or None) worker.work(burst=burst)
就可以追查到上述預設值,這些值我們之後還會看到。
探究work
開啟worker.py
,找到def work
:
def work(self, burst=False, logging_level="INFO"): """Starts the work loop. Pops and performs all jobs on the current list of queues.When all queues are empty, block and wait for new jobs to arrive on any of the queues, unless `burst` mode is enabled. The return value indicates whether any jobs were processed. """ setup_loghandlers(logging_level) self._install_signal_handlers() did_perform_work = False self.register_birth() self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION)) self.set_state(WorkerStatus.STARTED) try: while True: try: self.check_for_suspension(burst) if self.should_run_maintenance_tasks: self.clean_registries() if self._stop_requested: self.log.info('Stopping on request') break timeout = None if burst else max(1, self.default_worker_ttl - 60) result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: if burst: self.log.info("RQ worker {0!r} done, quitting".format(self.key)) break job, queue = result self.execute_job(job, queue) self.heartbeat() did_perform_work = True except StopRequested: break finally: if not self.is_horse: self.register_death() return did_perform_work
可以看到這一段程式碼做的事情:
- 配置好日誌
- 安裝好訊號處理器
- 註冊worker
- 把狀態設定成開始工作
-
然後開始進入迴圈
- 檢查當前worker是否被暫停了
- 彈出一個job來
- 開始執行job
- 執行完成之後傳送心跳
job?這應該就是我們的任務了,那麼,它是從何而來呢?我們的worker是怎麼知道哪個任務從何而來呢?
job從何而來
我們可以看到,job是從job, queue = result
來的,而 result 是從result = self.dequeue_job_and_maintain_ttl(timeout)
來的。我們看看後者:
def dequeue_job_and_maintain_ttl(self, timeout): result = None qnames = self.queue_names() self.set_state(WorkerStatus.IDLE) self.procline('Listening on {0}'.format(','.join(qnames))) self.log.info('') self.log.info('*** Listening on {0}...'.format(green(', '.join(qnames)))) while True: self.heartbeat() try: result = self.queue_class.dequeue_any(self.queues, timeout, connection=self.connection, job_class=self.job_class) if result is not None: job, queue = result self.log.info('{0}: {1} ({2})'.format(green(queue.name), blue(job.description), job.id)) break except DequeueTimeout: pass self.heartbeat() return result
繼續追查self.queue_class.dequeue_any
就是queue.py
裡的Queue
的函式:
@classmethod def dequeue_any(cls, queues, timeout, connection=None, job_class=None): """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. When all of the Queues are empty, depending on the `timeout` argument, either blocks execution of this function for the duration of the timeout or until new messages arrive on any of the queues, or returns None. See the documentation of cls.lpop for the interpretation of timeout. """ job_class = backend_class(cls, 'job_class', override=job_class) while True: queue_keys = [q.key for q in queues] result = cls.lpop(queue_keys, timeout, connection=connection) if result is None: return None queue_key, job_id = map(as_text, result) queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class) try: job = job_class.fetch(job_id, connection=connection) except NoSuchJobError: # Silently pass on jobs that don't exist (anymore), # and continue in the look continue except UnpickleError as e: # Attach queue information on the exception for improved error # reporting e.job_id = job_id e.queue = queue raise e return job, queue return None, None
看到了result = cls.lpop
,繼續追查下去:
@classmethod def lpop(cls, queue_keys, timeout, connection=None): """Helper method.Intermediate method to abstract away from some Redis API details, where LPOP accepts only a single key, whereas BLPOP accepts multiple.So if we want the non-blocking LPOP, we need to iterate over all queues, do individual LPOPs, and return the result. Until Redis receives a specific method for this, we'll have to wrap it this way. The timeout parameter is interpreted as follows: None - non-blocking (return immediately) > 0 - maximum number of seconds to block """ connection = resolve_connection(connection) if timeout is not None:# blocking variant if timeout == 0: raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') result = connection.blpop(queue_keys, timeout) if result is None: raise DequeueTimeout(timeout, queue_keys) queue_key, job_id = result return queue_key, job_id else:# non-blocking variant for queue_key in queue_keys: blob = connection.lpop(queue_key) if blob is not None: return queue_key, blob return None
原來就是從給定的queue裡lpop
出來,然後,查一下 blpop 的返回值,是返回的值所在
的list名和值。
Once new data is present on one of the lists, the client returns with the name of the key unblocking it and the popped value.
然後我們跳回到上一個函式。發現接下來的步驟是根據所得的job_id和queue_key例項化 Queue和Job。
那麼我們看看其中呼叫的Job.fetch
:
@classmethod def fetch(cls, id, connection=None): """Fetches a persisted job from its corresponding Redis key and instantiates it. """ job = cls(id, connection=connection) job.refresh() return job
job.refresh()
很可疑,因為到這一步之前,我們的 job的資訊都還只是字串。
在worker端worker是怎麼知道要去呼叫哪裡的函式呢?
我仔細看了看,差點就放過了self.data = obj['data']
這一步,跟進去一看,結果發現不是,
其他地方也沒有看到。
尷尬。
那就很奇怪了哈,肯定有個地方從字串轉回python物件的吧。於是我去翻了翻文件
,
發現文件上寫了,它是用pickle的,那肯定有地方用了pickle.loads
,於是就搜到了loads = pickle.loads
。繼續搜看哪裡用到了loads。
$ ack loads job.py 25:loads = pickle.loads 46:This is a helper method to not have to deal with the fact that `loads()` 51:obj = loads(pickled_string) 394:self._result = loads(rv)
分別看了一下51行和394行,感覺51行更像,於是就搜unpickle
,於是找到了 Job 裡的
def _unpickle_data(self): self._func_name, self._instance, self._args, self._kwargs = unpickle(self.data)
繼續搜_unpickle_data
,我們發現有四個地方用了它:
func_name instance args kwargs
原來在引用這些 property 的時候,如果還沒有反序列化,就會先反序列化一下,算了,我們 先放下這個,看看接下來是如何執行job的好了。
Job是如何執行的
繼續看看Worker.work
的程式碼,拿到job之後,就開始執行self.execute_job(job, queue)
,
跟進去看,看到了self.fork_work_horse(job, queue)
,繼續跟進去看,看到了:
def fork_work_horse(self, job, queue): """Spawns a work horse to perform the actual work and passes it a job. """ child_pid = os.fork() os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_JOB_ID'] = job.id if child_pid == 0: self.main_work_horse(job, queue) else: self._horse_pid = child_pid self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
fork之後返回0的是子程序,我們繼續看self.main_work_horse
:
def main_work_horse(self, job, queue): """This is the entry point of the newly spawned work horse.""" # After fork()'ing, always assure we are generating random sequences # that are different from the worker. random.seed() self.setup_work_horse_signals() self._is_horse = True self.log = logger success = self.perform_job(job, queue) # os._exit() is the way to exit from childs after a fork(), in # constrast to the regular sys.exit() os._exit(int(not success))
繼續看self.perform_job
,發現中間執行了job.perform
,然後發現呼叫了self._execute
,然後發現使用了self.func
這個屬性,進去一看,發現使用了self.func_name
!恍然大悟!這個時候終於發序列化了:
@property def func(self): func_name = self.func_name if func_name is None: return None if self.instance: return getattr(self.instance, func_name) return import_attribute(self.func_name)
找到了 func_name,然後匯入,然後把 args和kwargs塞進去執行。就是這樣!
那enqueue呢?
猜測一下,enqueue是怎麼執行的?肯定是把函式的func_name,args,kwargs全部dump
然後塞到對應的queue裡啊,答案就在queue.py
的enqueue_call
裡。我就不繼續寫
了,就當作是練習吧 :)