[ Laravel 5.8 文件 ] 進階系列 —— 佇列
簡介
注:Laravel 現在提供了基於 Redis 的,擁有美觀的後臺和配置系統的 Horizon 佇列擴充套件包,完整資訊參考Horizon文件。
Laravel 佇列為不同的後臺佇列服務提供了統一的 API,例如 Beanstalk,Amazon SQS,Redis,甚至其他基於關係型資料庫的佇列。佇列的目的是將耗時的任務延時處理,比如傳送郵件,從而大幅度縮短 Web 請求和響應的時間。
佇列配置檔案存放在 config/queue.php
。每一種佇列驅動的配置都可以在該檔案中找到,包括資料庫、 Beanstalkd 、 Amazon SQS 、 Redis 以及同步(本地使用)驅動。其中還包含了一個 null
佇列驅動用於那些放棄佇列的任務。
連線 Vs. 佇列
在開始使用 Laravel 佇列以前,瞭解“連線”和“佇列”的關係非常重要。在配置檔案 config/queue.php
有一個 connections
配置項。該配置項定義了後臺佇列服務的特定連線,如 Amazon SQS, Beanstalk, 或 Redis。每種佇列連線都可以有很多佇列,可以想象在銀行辦理現金業務的各個視窗佇列。
請注意 queue
配置檔案中的每個連線配置示例都有一個 queue
屬性。當新的佇列任務被新增到指定的連線時,該配置項的值就是預設監聽的佇列(名稱)。換種說法,如果你沒有指派特別的佇列名稱,那麼 queue
的值,也是該任務預設新增到的佇列(名稱):
// 以下的任務將被委派到預設佇列... dispatch(new Job); // 以下任務將被委派到 "emails" 佇列... dispatch((new Job)->onQueue('emails'));
有些應用並不需要將任務分配到多個佇列,單個佇列已經非常適用。但是,應用的任務有優先順序差異或者類別差異的時候,推送任務到多個佇列將是更好地選擇,因為 Laravel 的佇列程序支援通過優先順序指定處理的佇列。舉個例子,你可以將高優先順序的任務委派到 high
(高優先順序)佇列,從而讓它優先執行。
php artisan queue:work --queue=high,default
驅動預備知識
資料庫
要使用 database
佇列驅動,你需要資料表儲存任務資訊。要生成建立這些表的遷移,可以執行 Artisan 命令 queue:table
,遷移被建立之後,可以使用 migrate
命令生成這些表:
php artisan queue:table php artisan migrate
Redis
要使用 redis
佇列驅動,需要在配置檔案 config/database.php
中配置 Redis 資料庫連線。
Redis 叢集
如果 Redis 佇列連線使用 Redis Cluster(叢集),佇列名稱必須包含 key hash tag ,以確保給定佇列對應的所有 Redis keys 都存放到同一個 hash slot:
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => '{default}', 'retry_after' => 90, ],
學院君注:對一般中小型應用推薦使用 Redis 作為佇列驅動。
阻塞
使用 Redis 佇列時,可以使用 block_for
配置項來指定驅動在迭代佇列程序迴圈並重新輪詢 Redis 資料庫之前等待可用佇列任務的時間。
根據佇列負載來調整此配置值會比輪詢 Redis 資料庫來查詢新任務更加高效。例如,你可以設定該值為 5 來告訴驅動在等待可用佇列任務時需要阻塞五秒:
'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'retry_after' => 90, 'block_for' => 5, ],
注:阻塞是一個實驗性功能,如果 Redis 伺服器或佇列程序與檢索佇列任務同時崩潰,那麼佇列任務有可能會丟失。
其他驅動預備知識
如果使用以下幾種佇列驅動,需要安裝相應的依賴:
aws/aws-sdk-php ~3.0 pda/pheanstalk ~4.0 predis/predis ~1.0
建立任務
生成任務類
通常,所有的任務類都儲存在 app/Jobs
目錄。如果 app/Jobs
不存在,在執行 Artisan 命令 make:job
的時候,它將會自動建立。你可以通過 Artisan CLI 來生成佇列任務類:
php artisan make:job ProcessPodcast
生成的類都實現了 Illuminate\Contracts\Queue\ShouldQueue
介面, 告訴 Laravel 將該任務推送到佇列,而不是立即執行:
任務類結構
任務類非常簡單,通常只包含處理該任務的 handle
方法,讓我們看一個任務類的例子。在這個例子中,我們模擬管理播客釋出服務,並在釋出以前上傳相應的播客檔案:
<?php namespace App\Jobs; use App\Podcast; use App\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; class ProcessPodcast implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 建立任務例項 * * @paramPodcast$podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 執行任務 * * @paramAudioProcessor$processor * @return void */ public function handle(AudioProcessor $processor) { // 處理上傳的播客… } }
在本示例中,我們將Eloquent 模型作為引數直接傳遞到建構函式。因為該任務使用了 SerializesModels
trait,Eloquent 模型將會在任務被執行時優雅地序列化和反序列化。如果你的佇列任務在建構函式中接收 Eloquent 模型,只有模型的主鍵會被序列化到佇列,當任務真正被執行的時候,佇列系統會自動從資料庫中獲取整個模型例項。這對應用而言是完全透明的,從而避免序列化整個 Eloquent 模型例項引起的問題。
handle
方法在任務被處理的時候呼叫,注意我們可以在任務的 handle
方法中進行依賴注入。Laravel服務容器會自動注入這些依賴。
如果你想要完全控制容器如何將依賴注入到 handle
方法,可以使用容器的 bindMethod
方法。 bindMethod
方法接收一個回撥,該回調支援傳入任務和容器例項,在這個回撥中,你可以隨意呼叫 handle
方法。通常,我們在某個服務提供者中呼叫這個方法:
use App\Jobs\ProcessPodcast; $this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) { return $job->handle($app->make(AudioProcessor::class)); });
注:二進位制資料,如原生圖片內容,在傳遞給佇列任務之前先經過 base64_encode
方法處理,此外,該任務被推送到佇列時將不會被序列化為 JSON 格式。
分發任務
建立好任務類後,就可以通過任務自身的 dispatch
方法將其分發到佇列。 dispatch
方法需要的唯一引數就是該任務的例項:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller { /** * Store a new podcast. * * @paramRequest$request * @return Response */ public function store(Request $request) { // Create podcast... ProcessPodcast::dispatch($podcast); } }
延時分發
有時候你可能想要延遲佇列任務的執行,這可以通過在分發任務時使用 delay
方法實現。例如你希望將某個任務在建立 10 分鐘以後才執行:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller { /** * Store a new podcast. * * @paramRequest$request * @return Response */ public function store(Request $request) { // Create podcast... ProcessPodcast::dispatch($podcast) ->delay(now()->addMinutes(10)); } }
注:Amazon SQS 的佇列服務最長延時 15 分鐘。
同步分發
如果你立即想要分發任務(同步),可以使用 dispatchNow
方法。使用這個方法時,對應任務不會被推送到佇列,而是立即在當前程序中執行:
<?php namespace App\Http\Controllers; use Illuminate\Http\Request; use App\Jobs\ProcessPodcast; use App\Http\Controllers\Controller; class PodcastController extends Controller { /** * Store a new podcast. * * @paramRequest$request * @return Response */ public function store(Request $request) { // Create podcast... ProcessPodcast::dispatchNow($podcast); } }
任務鏈
任務鏈允許你指定一個需要在一個序列中執行的佇列任務列表,如果序列中的某個任務失敗,其它任務將不再執行。要執行一個佇列任務鏈,可以使用任意可分發任務上的 withChain
方法:
ProcessPodcast::withChain([ new OptimizePodcast, new ReleasePodcast ])->dispatch();
注:使用 $this->delete()
方法刪除任務不會阻斷正在被處理的任務鏈中的任務。任務鏈中的任務只有在執行失敗時才會停止執行。
連結連線 & 佇列
如果你想要指定任務鏈使用的預設連線和佇列,可以使用 allOnConnection
和 allOnQueue
方法。這些方法指定需要用到的佇列連線和佇列名稱,除非佇列任務顯式指定了分配的連線/佇列:
ProcessPodcast::withChain([ new OptimizePodcast, new ReleasePodcast ])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');
自定義佇列 & 連線
分發到指定的佇列
通過推送任務到不同佇列,你可以將佇列任務進行“分類”,甚至根據優先順序來分配每個佇列的程序數。請注意,這並不意味著使用了配置項中那些不同的連線來管理佇列,實際上只有單一連線會被用到。要指定佇列,請在任務例項使用 onQueue
方法:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller { /** * Store a new podcast. * * @paramRequest$request * @return Response */ public function store(Request $request) { // Create podcast... ProcessPodcast::dispatch($podcast)->onQueue('processing'); } }
分發到指定的連線
如果你使用了多個連線來管理佇列,那麼可以分發任務到指定的連線。請在任務例項中使用 onConnection
方法來指定連線:
<?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller { /** * Store a new podcast. * * @paramRequest$request * @return Response */ public function store(Request $request) { // Create podcast... ProcessPodcast::dispatch($podcast)->onConnection('sqs'); } }
當然,你可以同時使用 onConnection
和 onQueue
方法來指定任務的連線和佇列:
$job = (new ProcessPodcast($podcast)) ->onConnection('sqs') ->onQueue('processing');
指定最大失敗次數/超時時間
最大失敗次數
指定佇列任務最大失敗次數的一種實現方式是通過 Artisan 命令 --tries
切換:
php artisan queue:work --tries=3
不過,你還可以在任務類自身定義最大失敗次數來實現更加細粒度的控制,如果最大失敗次數在任務中指定,則其優先順序高於命令列指定的數值:
<?php namespace App\Jobs; class ProcessPodcast implements ShouldQueue { /** * The number of times the job may be attempted. * * @var int */ public $tries = 5; }
基於時間的嘗試次數
除了定義在任務失敗前的最大嘗試次數外,還可以定義在指定時間內允許任務的最大嘗試次數,這可以通過在任務類中新增 retryUntil
方法來實現:
/** * Determine the time at which the job should timeout. * * @return \DateTime */ public function retryUntil() { return now()->addSeconds(5); }
注:還可以在佇列時間監聽器中定義 retryUntil
方法。
超時
注: timeout
方法為 PHP 7.1+ 和 pcntl
擴充套件做了優化。
類似的,佇列任務最大執行時長(秒)可以通過 Artisan 命令上的 --timeout
開關來指定:
php artisan queue:work --timeout=30
同樣,你也可以在任務類中定義該任務允許執行的最大時長(單位:秒),任務中指定的超時時間優先順序也高於命令列定義的數值:
<?php namespace App\Jobs; class ProcessPodcast implements ShouldQueue { /** * The number of seconds the job can run before timing out. * * @var int */ public $timeout = 120; }
頻率限制
注:該功能要求應用可以與Redis 伺服器進行互動。
如果應用使用了 Redis,那麼可以使用時間或併發來控制佇列任務。該功能特性在佇列任務與有頻率限制的 API 互動時很有幫助,例如,通過 throttle
方法,你可以限定給定型別任務每 60 秒只執行 10 次。如果不能獲取鎖,需要將任務釋放回佇列以便可以再次執行:
Redis::throttle('key')->allow(10)->every(60)->then(function () { // Job logic... }, function () { // Could not obtain lock... return $this->release(10); });
注:在上面的例子中, key
可以是任意可以唯一標識你想要限定訪問頻率的任務型別的字串。舉個例子,這個鍵可以基於任務類名和操作 Eloquent 模型的 ID 進行構建。
注:將受限制的任務釋放回佇列依然會增加任務的總執行次數 attempts
的值。
除此之外,還可以指定可以同時處理給定任務的最大程序數量。這個功能在佇列任務正在編輯一次只能由一個任務進行處理的資源時很有用。例如,使用 funnel
方法你可以給定型別任務一次只能由一個工作程序進行處理:
Redis::funnel('key')->limit(1)->then(function () { // Job logic... }, function () { // Could not obtain lock... return $this->release(10); });
注:使用頻率限制時,任務在執行成功之前需要的最大嘗試次數很難權衡,因此,將頻率限制和基於時間的嘗試次數結合起來使用是個不錯的選擇。
處理錯誤
如果任務在處理的時候有異常丟擲,則該任務將會被自動釋放回佇列以便再次嘗試執行。任務會持續被釋放直到嘗試次數達到應用允許的最大次數。最大嘗試次數通過 Artisan 命令 queue:work
上的 --tries
開關來定義。此外,該次數也可以在任務類自身上定義。關於執行佇列監聽器的更多資訊可以在下面看到。
佇列閉包
除了將任務類推送到佇列之外,還可以推送閉包到佇列。這對於需要在當前請求生命週期之外執行的簡單快捷的任務來說非常方便:
$podcast = App\Podcast::find(1); dispatch(function () use ($podcast) { $podcast->publish(); });
推送閉包到佇列時,閉包的程式碼內容以加密方式簽名,所以不會在傳輸過程中被篡改。
執行佇列程序
Laravel 自帶了一個佇列程序用來處理被推送到佇列的新任務。你可以使用 queue:work
命令執行這個佇列程序。請注意,佇列程序開始執行後,會持續監聽佇列,直至你手動停止或關閉終端:
php artisan queue:work
注:為了保持佇列程序 queue:work
持續在後臺執行,需要使用程序守護程式,比如Supervisor 來確保佇列程序持續執行。
請記住,佇列程序是長生命週期的程序,會在啟動後駐留記憶體。若應用有任何改動將不會影響到已經啟動的程序。所以請在釋出程式後,重啟佇列程序。
指定連線和佇列
佇列程序同樣可以自定義連線和佇列。傳遞給 work
命令的連線名需要與配置檔案 config/queue.php
中定義的某個連線配置相匹配:
php artisan queue:work redis
你可以自定義將某個佇列程序指定某個連線來管理。舉例來說,如果所有的郵件任務都是通過 redis
連線上的 emails
佇列處理,那麼可以用以下命令來啟動單一程序只處理單一佇列:
php artisan queue:work redis --queue=emails
處理單個任務
--once
選項可用於告知程序只處理佇列中的單個任務:
php artisan queue:work --once
處理所有佇列任務然後退出
--stop-when-empty
選項可用於告知程序處理所有任務然後優雅退出。當我們在 Docker 容器中處理 Laravel 佇列時,如果你想要在佇列為空時關閉容器,則該選項很有用:
php artisan queue:work --stop-when-empty
資源注意事項
後臺佇列程序不會再處理每個任務前重啟框架,因此你需要在每次任務完成後釋放所有重量級的資源。例如,如果你在使用 GD 庫處理圖片,需要在完成的時候使用 imagedestroy
來釋放記憶體。
佇列優先順序
有時候你需要區分任務的優先順序。比如,在配置檔案 config/queue.php
中,你可以定義 redis
連線的預設 queue
為 low
。不過,如果需要將任務分發到高優先順序 high
,可以這麼做:
dispatch((new Job)->onQueue('high'));
如果期望所有 high
高優先順序的佇列都將先於 low
低優先順序的任務執行,可以像這樣啟動佇列程序:
php artisan queue:work --queue=high,low
佇列程序 & 部署
前文已經提到佇列程序是長生命週期的程序,在重啟以前,所有原始碼的修改並不會對其產生影響。所以,最簡單的方法是在每次釋出新版本後重新啟動佇列程序。你可以通過 Aritisan 命令 queue:restart
來優雅地重啟佇列程序:
php artisan queue:restart
該命令將在佇列程序完成正在進行的任務後,結束該程序,避免佇列任務的丟失或錯誤。由於佇列程序會在執行 queue:restart
命令後死掉,你仍然需要通過程序守護程式如Supervisor 來自動重啟佇列程序。
注:佇列使用快取來儲存重啟訊號,所以在使用此功能前你需要驗證快取驅動配置正確。
任務過期 & 超時
任務過期
在配置檔案 config/queue.php
中,每個連線都定義了 retry_after
項。該配置項的目的是定義任務在執行以後多少秒後釋放回佇列。如果 retry_after
設定的值為 90
, 任務在執行 90
秒後還未完成,那麼將被釋放回佇列而不是刪除掉。毫無疑問,你需要把 retry_after
的值設定為任務執行時間的最大可能值。
注:只有 Amazon SQS 配置資訊不包含 retry_after
項。Amazon SQS 的任務執行時間基於 Default Visibility Timeout ,該項在 Amazon AWS 控制檯配置。
佇列程序超時
佇列程序 queue:work
可以設定超時 --timeout
項。該 --timeout
控制佇列程序執行每個任務的最長時間,如果超時,該程序將被關閉。各種錯誤都可能導致某個任務處於“凍結”狀態,比如 HTTP 無響應等。佇列程序超時就是為了將這些“凍結”的程序關閉:
php artisan queue:work --timeout=60
配置項 retry_after
和 Aritisan 引數項 --timeout
不同,但目的都是為了確保任務的安全,並且只被成功的執行一次。
注:引數項 --timeout
的值應該始終小於配置項 retry_after
的值,這是為了確保佇列程序總在任務重試以前關閉。如果 --timeout
比 retry_after
大,那麼你的任務可能被執行兩次。
程序休眠時間
當任務在佇列中有效時,程序會持續處理任務,沒有延遲。不過,我們可以使用 sleep
配置項來指定沒有新的有效任務產生時的休眠時間。休眠期間,佇列程序不會處理任何新任務直到佇列程序醒來:
php artisan queue:work --sleep=3
配置 Supervisor
安裝 Supervisor
Supervisor 是 Linux 系統中常用的程序守護程式。如果佇列程序 queue:work
意外關閉,它會自動重啟啟動佇列程序。在 Ubuntu 安裝Supervisor 非常簡單:
sudo apt-get install supervisor
注:如果自己配置 Supervisor 有困難,可以考慮使用 Laravel Forge,它會為 Laravel 專案自動安裝並配置 Supervisor。
配置 Supervisor
Supervisor 配置檔案通常存放在 /etc/supervisor/conf.d
目錄,在該目錄下,可以建立多個配置檔案指示 Supervisor
如何監視程序,例如,讓我們建立一個開啟並監視 queue:work
程序的 laravel-worker.conf
檔案:
[program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 autostart=true autorestart=true user=forge numprocs=8 redirect_stderr=true stdout_logfile=/home/forge/app.com/worker.log
在本例中, numprocs
指令讓 Supervisor 執行 8 個 queue:work
程序並監視它們,如果失敗的話自動重啟。當然,你需要修改 queue:work sqs
的 command
指令來對映你的佇列連線。
啟動 Supervisor
當成功建立配置檔案後,需要重新整理 Supervisor 的配置資訊並使用如下命令啟動程序:
sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start laravel-worker:*
你可以通過 Supervisor 官方文件 獲取更多資訊。
處理失敗的任務
不可避免會出現執行失敗的任務。你不必為此擔心,Laravel 可以輕鬆設定任務允許的最大嘗試次數,若是執行次數達到該限定,該任務會被插入到 failed_jobs
表,要建立一個 failed_jobs
表的遷移,可以使用 queue:failed-table
命令
php artisan queue:failed-table php artisan migrate
然後,執行佇列程序時,通過 --tries
引數項來設定佇列任務允許的最大嘗試次數,如果沒有指定 --tries
選項的值,任務會被無限期重試:
php artisan queue:work redis --tries=3
清理失敗的任務
你可以在任務類中定義 failed
方法, 從而允許你在失敗發生時執行指定的動作,比如傳送任務失敗的通知,記錄日誌等。導致任務失敗的 Exception
會被傳遞到 failed
方法:
<?php namespace App\Jobs; use Exception; use App\Podcast; use App\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; class ProcessPodcast implements ShouldQueue { use InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * Create a new job instance. * * @paramPodcast$podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * Execute the job. * * @paramAudioProcessor$processor * @return void */ public function handle(AudioProcessor $processor) { // Process uploaded podcast... } /** * The job failed to process. * * @paramException$exception * @return void */ public function failed(Exception $exception) { // 傳送失敗通知, etc... } }
任務失敗事件
如果你期望在任務失敗的時候觸發某個事件,可以使用 Queue::failing
方法。該事件通過郵件或 HipChat 通知團隊。舉個例子,我麼可以在 Laravel 自帶的 AppServiceProvider
中新增一個回撥到該事件:
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Queue\Events\JobFailed; use Illuminate\Support\ServiceProvider; class AppServiceProvider extends ServiceProvider { /** * 啟動應用服務. * * @return void */ public function boot() { Queue::failing(function (JobFailed $event) { // $event->connectionName // $event->job // $event->exception }); } /** * 註冊服務提供者. * * @return void */ public function register() { // } }
重試失敗的任務
要檢視已插入到 failed_jobs
資料表中的所有失敗任務,可以使用 Artisan 命 queue:failed
:
php artisan queue:failed
該命令將會列出任務 ID、連線、佇列和失敗時間,任務 ID 可用於重試失敗任務,例如,要重試一個 ID 為 5
的失敗任務,可以執行下面的命令:
php artisan queue:retry 5
要重試所有失敗任務,執行如下命令即可:
php artisan queue:retry all
如果你要刪除一個失敗任務,可以使用 queue:forget
命令:
php artisan queue:forget 5
要刪除所有失敗任務,可以使用 queue:flush
命令:
php artisan queue:flush
忽略缺失的模型
當注入一個 Eloquent 模型到佇列任務時,它會在被放到佇列之前自動序列化,然後在任務被處理時恢復。不過,如果該模型例項在任務等待被處理期間被刪除,對應任務在執行時會失敗並丟擲 ModelNotFoundException
異常。
為了方便起見,你可以通過設定佇列任務的 deleteWhenMissingModels
屬性為 true
來選擇自動刪除缺失模型例項的任務:
/** * Delete the job if its models no longer exist. * * @var bool */ public $deleteWhenMissingModels = true;
任務事件
通過 Queue
門面提供的 before
和 after
方法可以在任務被處理之前或之後指定要執行的回撥。這些回撥可用來記錄日誌或者記錄統計資料。通常,你可以在服務提供者中使用這些方法。比如,我們可以在 AppServiceProvider
中這樣用:
<?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Support\ServiceProvider; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; class AppServiceProvider extends ServiceProvider { /** * Bootstrap any application services. * * @return void */ public function boot() { Queue::before(function (JobProcessing $event) { // $event->connectionName // $event->job // $event->job->payload() }); Queue::after(function (JobProcessed $event) { // $event->connectionName // $event->job // $event->job->payload() }); } /** * Register the service provider. * * @return void */ public function register() { // } }
使用 Queue
門面上的 looping
方法,你可以在程序嘗試從佇列中獲取任務之前指定要執行的回撥。例如,你可以註冊一個閉包來回滾之前失敗任務遺留下來的事務:
Queue::looping(function () { while (DB::transactionLevel() > 0) { DB::rollBack(); } });