Celery中文翻譯-Application
Celery在使用前必須例項化,稱為application或app。app是執行緒安全的,具有不同配置、元件、task的多個Celery應用可以在同一個程序空間共存。
# 建立Celery應用 >>> from celery import Celery >>> app = Celery() >>> app <Celery __main__:0x100469fd0>
最後一行文字化顯示了Celery應用:包含應用所屬類的名稱,當前主模組名,以及記憶體地址。唯一重要的資訊是模組名稱。
Main Name
在Celery中傳送task訊息時,該訊息僅包含要執行的task的名稱。每一個worker維護一個task名稱和對應函式的對映,這稱為task registry
。
當定義一個task時,該task將註冊到本地:
>>> @app.task ... def add(x, y): ...return x + y >>> add <@task: __main__.add> >>> add.name __main__.add >>> app.tasks['__main__.add'] <@task: __main__.add>
當Celery無法檢測task函式屬於哪個模組時,使用main模組名生成初始task名稱。
這種方式僅適用於以下兩種場景:
- 定義task的模組作為程式執行
- app在python shell中建立
# tasks.py from celery import Celery app = Celery() @app.task def add(x, y): return x + y if __name__ == '__main__': app.worker_main()
如果直接執行tasks.py,task名將以__main__
為字首,但如果tasks.py被其他程式匯入,task名將以tasks
為字首。如下:
>>> from tasks import add >>> add.name tasks.add
也可以直接指定主模組名:
>>> app = Celery('tasks') >>> app.main 'tasks' >>> @app.task ... def add(x, y): ...return x + y >>> add.name tasks.add
Configuration
可以通過直接設定,或使用專用配置模組對Celery進行配置。
通過app.conf
屬性檢視或直接設定配置:
>>> app.conf.timezone 'Europe/London' >>> app.conf.enable_utc = True
或用app.conf.update
方法一次更新多個配置:
>>> app.conf.update( ...enable_utc=True, ...timezone='Europe/London', ...)
config_from_object
app.config_from_object()
方法從配置模組或物件中匯入配置。需要注意的是:呼叫config_from_object()方法將重置在這之前配置的任何設定
。
使用模組名
app.config_from_object()方法接收python模組的完全限定名(fully qualified name
)或具體到其中的某個屬性名,例如"celeryconfig", "myproj.config.celery", 或"myproj.config:CeleryConfig":
from celery import Celery app = Celery() app.config_from_object('celeryconfig')
只要能夠正常執行import celeryconfig
,app就能正常配置。
使用模組物件
也可以傳入一個已匯入的模組物件,但不建議這樣做。
import celeryconfig from celery import Celery app = Celery() app.config_from_object(celeryconfig)
更推薦使用模組名的方式,因為這樣在使用prefork pool時不需要序列化該模組。如果在實際應用中出現配置問題或序列化錯誤,請嘗試使用模組名的方式。
使用配置類或物件
from celery import Celery app = Celery() class Config: enable_utc = True timezone = 'Europe/London' app.config_from_object(Config)
config_from_envvar
app.config_from_envvar()
方法從環境變數中接收配置模組名。
import os from celery import Celery #: Set default configuration module name os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig') app = Celery() app.config_from_envvar('CELERY_CONFIG_MODULE')
通過環境變數指定配置模組:
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info
Censored configuration
如果要顯示Celery配置,可能需要過濾某些敏感資訊如密碼、金鑰等。Celery提供了幾種用於幫助顯示配置的實用方法。
humanize()
該方法返回列表字串形式的配置,預設只包含改動過的配置,如果要顯示內建的預設配置,設定with_defaults
引數為True:
>>> app.conf.humanize(with_defaults=False, censored=True)
table()
該方法返回字典形式的配置:
>>> app.conf.table(with_defaults=False, censored=True)
Celery可能不會移除所有的敏感資訊,因為它使用正則表示式匹配鍵並判斷是否移除。如果使用者添加了包含敏感資訊的自定義配置,可以使用Celery可能標記為敏感配置的名稱來命名(API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE)。
Laziness
應用例項是惰性的。
建立Celery例項只會執行以下操作:
logical clock instance task registry set_as_current app.on_init()
app.task()
裝飾器不會在task定義時立即建立task,而是在task使用時或finalized
應用後建立。
下例說明了在使用task或訪問其屬性前,都不會建立task:
>>> @app.task >>> def add(x, y): ...return x + y >>> type(add) <class 'celery.local.PromiseProxy'> >>> add.__evaluated__() False >>> add# <-- causes repr(add) to happen <@task: __main__.add> >>> add.__evaluated__() True
應用的Finalization
指顯式地呼叫app.finalize()
方法或隱式地訪問app.tasks屬性。
finalized
應用將會:
shared
Breaking the chain
雖然可以依賴於當前應用,但最佳實踐是將應用例項傳遞給任何需要它的物件,這個行為可以稱為app chain
。
# 依賴於當前應用(bad) from celery import current_app class Scheduler(object): def run(self): app = current_app
# 傳遞應用例項(good) class Scheduler(object): def __init__(self, app): self.app = app
在開發模式設定CELERY_TRACE_APP環境變數,可以在應用鏈斷開時丟擲異常:
$ CELERY_TRACE_APP=1 celery worker -l info
Abstract Tasks
使用task()
裝飾器建立的task都繼承自celery.app.task
模組的Task
基類。繼承該類可以自定義task類:
from celery import Task # 或者 from celery.app.task import Task class DebugTask(Task): def __call__(self, *args, **kwargs): print('TASK STARTING: {0.name}[{0.request.id}]'.format(self)) return super(DebugTask, self).__call__(*args, **kwargs)
如果要重寫__call__()
方法,記得呼叫super。這樣在task直接呼叫時會執行基類的預設事件。
Task
基類是特殊的,因為它並未繫結到任何特定的應用。一旦task繫結到應用,它將讀取配置以設定預設值等。
-
通過
base
引數指定基類@app.task(base=DebugTask) def add(x, y): return x + y
-
通過
app.Task
屬性指定基類>>> from celery import Celery, Task >>> app = Celery() >>> class MyBaseTask(Task): ...queue = 'hipri' >>> app.Task = MyBaseTask >>> app.Task <unbound MyBaseTask> >>> @app.task ... def add(x, y): ...return x + y >>> add <@task: __main__.add> >>> add.__class__.mro() [<class add of <Celery __main__:0x1012b4410>>, <unbound MyBaseTask>, <unbound Task>, <type 'object'>]