Python之celery的簡介與使用
celery的簡介
celery是一個基於分散式訊息傳輸的非同步任務佇列,它專注於實時處理,同時也支援任務排程。它的執行單元為任務(task),利用多執行緒,如 Eventlet , gevent 等,它們能被併發地執行在單個或多個職程伺服器(worker servers)上。任務能非同步執行(後臺執行)或同步執行(等待任務完成)。
在生產系統中,celery能夠一天處理上百萬的任務。它的完整架構圖如下:
元件介紹:
- Producer:呼叫了Celery提供的API、函式或者裝飾器而產生任務並交給任務佇列處理的都是任務生產者。
- Celery Beat:任務排程器,Beat程序會讀取配置檔案的內容,週期性地將配置中到期需要執行的任務傳送給任務佇列。
- Broker:訊息代理,又稱訊息中介軟體,接受任務生產者傳送過來的任務訊息,存進佇列再按序分發給任務消費方(通常是訊息佇列或者資料庫)。Celery目前支援RabbitMQ、Redis、MongoDB、Beanstalk、SQLAlchemy、Zookeeper等作為訊息代理,但適用於生產環境的只有RabbitMQ和Redis, 官方推薦 RabbitMQ。
- Celery Worker:執行任務的消費者,通常會在多臺伺服器執行多個消費者來提高執行效率。
- Result Backend:任務處理完後儲存狀態資訊和結果,以供查詢。Celery預設已支援Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。
在客戶端和消費者之間傳輸資料需要序列化和反序列化。 Celery 支出的序列化方案如下所示:
準備工作
在本文中,我們使用的celery的訊息代理和後端儲存資料庫都使用redis,序列化和反序列化選擇msgpack。
首先,我們需要安裝redis資料庫,具體的安裝方法可參考: http://www.runoob.com/redis/r... 。啟動redis,我們會看到如下介面:
在redis視覺化軟體rdm中,我們看到的資料庫如下:
裡面沒有任何資料。
接著,為了能夠在python中使用celery,我們需要安裝以下模組:
- celery
- redis
- msgpack
這樣,我們的準備工作就完畢了。
一個簡單的例子
我們建立的工程名稱為proj,結構如下圖:
首先是主程式app_test.py,程式碼如下:
from celery import Celery app = Celery('proj', include=['proj.tasks']) app.config_from_object('proj.celeryconfig') if __name__ == '__main__': app.start()
分析一下這個程式:
- "from celery import Celery"是匯入celery中的Celery類。
- app是Celery類的例項,建立的時候添加了proj.tasks這個模組,也就是包含了proj/tasks.py這個檔案。
- 把Celery配置存放進proj/celeryconfig.py檔案,使用app.config_from_object載入配置。
接著是任務函式檔案tasks.py,程式碼如下:
import time from proj.app_test import app @app.task def add(x, y): time.sleep(1) return x + y
tasks.py只有一個任務函式add,讓它生效的最直接的方法就是新增app.task這個裝飾器。add的功能是先休眠一秒,然後返回兩個數的和。
接著是配置檔案celeryconfig.py,程式碼如下:
BROKER_URL = 'redis://localhost' # 使用Redis作為訊息代理 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把任務結果存在了Redis CELERY_TASK_SERIALIZER = 'msgpack' # 任務序列化和反序列化使用msgpack方案 CELERY_RESULT_SERIALIZER = 'json' # 讀取任務結果一般效能要求不高,所以使用了可讀性更好的JSON CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過期時間 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接受的內容型別
最後是呼叫檔案diaoyong.py,程式碼如下:
from proj.tasks import add import time t1 = time.time() r1 = add.delay(1, 2) r2 = add.delay(2, 4) r3 = add.delay(3, 6) r4 = add.delay(4, 8) r5 = add.delay(5, 10) r_list = [r1, r2, r3, r4, r5] for r in r_list: while not r.ready(): pass print(r.result) t2 = time.time() print('共耗時:%s' % str(t2-t1))
在這個程式中,我們呼叫了add函式五次,delay()用來呼叫任務。
例子的執行
到此為止,我們已經理解了整個專案的結構與程式碼。
接下來,我們嘗試著把這個專案執行起來。
首先,我們需要啟動redis。接著,切換至proj專案所在目錄,並執行命令:
celery -A proj.app_test worker -l info
介面如下:
然後,我們執行diaoyong.py,輸出的結果如下:
3 6 9 12 15 共耗時:1.1370790004730225
後臺輸出如下:
接著,我們看一下rdm中的資料:
至此,我們已經成功運行了這個專案。
下面,我們嘗試著對這個執行結果做些分析。首先,我們一次性呼叫了五次add函式,但是執行的總時間才1秒多。這是celery非同步執行的結果,如果是同步執行,那麼,至少需要5秒多,因為每呼叫add函式一次,就會休眠一秒。這就是celery的強大之處。
從後臺輸出可以看到,程式會先將任務分發出來,每個任務一個ID,在後臺統一處理,處理完後會有相應的結果返回,同時該結果也會儲存之後臺數據庫。可以利用ready()判斷任務是否執行完畢,再用result獲取任務的結果。
本文專案的github地址為: https://github.com/percent4/c... 。
本次分享到此結束,感謝閱讀~
注意:本人現已開通微信公眾號: Python爬蟲與演算法(微訊號為:easy_web_scrape), 歡迎大家關注哦~~
參考文獻
- Celery 初步: http://docs.jinkan.org/docs/c...
- 使用Celery: https://zhuanlan.zhihu.com/p/...
- 非同步神器celery: https://www.jianshu.com/p/9be...