このリポジトリはPHP Webアプリフレームワークである、LaravelのLTSバージョンである5.5の公式英文ドキュメントを日本語へ翻訳しています。
This project is maintained by okinaka
{tip} 現在、LaravelはRedisで動作するキューのための美しいダッシュボードと設定システムを備えたHorizonを提供しています。詳細は、Horizonのドキュメントで確認してください。
Laravelのキューサービスは、Beanstalk、Amazon SQS、Redis、さらにはリレーショナル・データベースなど様々なキューバックエンドに対し共通のAPIを提供しています。キューによりメール送信のような時間を費やす処理を遅らせることが可能です。時間のかかるタスクを遅らせることで、よりアプリケーションのリクエストをドラマチックにスピードアップできます。
キューの設定ファイルはconfig/queue.phpです。このファイルにはフレームワークに含まれているそれぞれのドライバーへの接続設定が含まれています。それにはデータベース、Beanstalkd、Amazon SQS、Redis、ジョブが即時に実行される同期(ローカル用途)ドライバーが含まれています。 nullキュードライバはキューされたジョブが実行されないように、破棄するだけです。
Laravelのキューに取り掛かる前に、「接続」と「キュー」の区別を理解しておくことが重要です。config/queue.php設定ファイルの中には、connections設定オプションがあります。このオプションはAmazon SQS、Beanstalk、Redisなどのバックエンドサービスへの個々の接続を定義します。しかし、どんな指定されたキュー接続も、複数の「キュー」を持つことができます。「キュー」とはキュー済みのジョブのスタック、もしくは積み重ねのことです。
queue接続ファイルのqueue属性を含んでいる、各接続設定例に注目してください。ジョブがディスパッチされ、指定された接続へ送られた時にのデフォルトキューです。言い換えれば、どのキューへディスパッチするのか明確に定義していないジョブをディスパッチすると、そのジョブは接続設定のqueue属性で定義したキューへ送られます。
// このジョブはデフォルトキューへ送られる
Job::dispatch();
// このジョブは"emails"キューへ送られる
Job::dispatch()->onQueue('emails');
あるアプリケーションでは複数のキューへジョブを送る必要はなく、代わりに1つのシンプルなキューが適しているでしょう。しかし、複数のキューへジョブを送ることは、優先順位づけしたい、もしくはジョブの処理を分割したいアプリケーションでは、特に便利です。Laravelのキューワーカはプライオリティによりどのキューで処理するかを指定できるからです。たとえば、ジョブをhighキューへ送れば、より高い処理プライオリティのワーカを実行できます。
php artisan queue:work --queue=high,default
databaseキュードライバを使用するには、ジョブを記録するためのデータベーステーブルが必要です。このテーブルを作成するマイグレーションはqueue:table Artisanコマンドにより生成できます。マイグレーションが生成されたら、migrateコマンドでデータベースをマイグレートしてください。
php artisan queue:table
php artisan migrate
redisキュードライバーを使用するには、config/database.php設定ファイルでRedisのデータベースを設定する必要があります。
Redisキュー接続でRedisクラスタを使用している場合は、キュー名にキーハッシュタグを含める必要があります。これはキューに指定した全Redisキーが同じハッシュスロットに確実に置かれるようにするためです。
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
以下の依存パッケージがリストしたキュードライバを使用するために必要です。
aws/aws-sdk-php ~3.0pda/pheanstalk ~3.0predis/predis ~1.0キュー投入可能なアプリケーションの全ジョブは、デフォルトでapp/Jobsディレクトリへ保存されます。app/Jobsディレクトリが存在しなくても、make:job Artisanコマンドの実行時に生成されます。新しいキュージョブをArtisan CLIで生成できます。
php artisan make:job ProcessPodcast
非同期で実行するため、ジョブをキューへ投入することをLaravelへ知らせる、Illuminate\Contracts\Queue\ShouldQueueインターフェイスが生成されたクラスには実装されます。
ジョブクラスは通常とてもシンプルで、キューによりジョブが処理される時に呼び出される、handleメソッドのみで構成されています。手始めに、ジョブクラスのサンプルを見てみましょう。この例は、ポッドキャストの公開サービスを管理し、公開前にアップロードしたポッドキャストファイルを処理する必要があるという仮定です。
<?php
namespace App\Jobs;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 新しいジョブインスタンスの生成
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* ジョブの実行
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// アップロード済みポッドキャストの処理…
}
}
この例中、キュージョブのコンテナーに直接Eloquentモデルが渡せることに注目してください。ジョブが使用しているSerializesModelsトレイトによりEloquentモデルは優雅にシリアライズされ、ジョブが処理される時にアンシリアライズされます。キュー投入されたジョブがコンテナでEloquentモデルを受け取ると、モデルの識別子のみシリアライズされています。ジョブが実際に処理される時、キューシステムは自動的にデータベースから完全なモデルインスタンスを再取得します。これらは全てアプリケーションの完全な透過性のためであり、Eloquentモデルインスタンスをシリアライズするときに発生する問題を防ぐことができます。
handleメソッドはキューによりジョブが処理されるときに呼びだされます。ジョブのhandleメソッドにタイプヒントにより依存を指定できることに注目してください。Laravelのサービスコンテナが自動的に依存を注入します。
{note} Rawイメージコンテンツのようなバイナリデータは、キュージョブへ渡す前に、
base64_encode関数を通してください。そうしないと、そのジョブはキューへ設置する前にJSONへ正しくシリアライズされません。
ジョブクラスを書き上げたら、ジョブクラス自身のdispatchメソッドを使い、ディスパッチできます。dispatchメソッドへ渡す引数は、ジョブのコンストラクタへ渡されます。
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 新ポッドキャストの保存
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// ポッドキャスト作成…
ProcessPodcast::dispatch($podcast);
}
}
キュー投入されたジョブの実行を遅らせたい場合は、ジョブのディスパッチ時にdelayメソッドを使います。例として、ディスパッチ後10分経つまでは、処理が行われないジョブを指定してみましょう。
<?php
namespace App\Http\Controllers;
use Carbon\Carbon;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 新ポッドキャストの保存
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// ポッドキャスト作成…
ProcessPodcast::dispatch($podcast)
->delay(Carbon::now()->addMinutes(10));
}
}
{note} Amazon SQSキューサービスは、最大15分の遅延時間です。
連続して実行する必要がある、キュー投入ジョブのリストをジョブチェーンで指定できます。一連のジョブの内、あるジョブが失敗すると、残りのジョブは実行されません。キュー投入ジョブチェーンを実行するには、dispatchableジョブどれかに対し、withChainメソッドを使用します。
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch();
ジョブを異なるキューへ投入することで「カテゴライズ」できますし、様々なキューにいくつのワーカを割り当てるかと言うプライオリティ付けもできます。これはキー設定ファイルで定義した、別々のキュー「接続」へのジョブ投入を意味してはいないことに気をつけてください。一つの接続内の複数のキューを指定する方法です。
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 新ポッドキャストの保存
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// ポッドキャスト作成…
ProcessPodcast::dispatch($podcast)->onQueue('processing');
}
}
複数のキュー接続を利用するなら、ジョブを投入するキューを指定できます。ジョブをディスパッチする時に、onConnectionメソッドで接続を指定します。
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 新ポッドキャストの保存
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// ポッドキャスト作成…
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
}
}
もちろん、ジョブを投入する接続とキューを指定するために、onConnectionとonQueueメソッドをチェーンすることもできます。
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
ジョブが試行する最大回数を指定するアプローチの一つは、Artisanコマンドラインへ--triesスイッチ使う方法です。
php artisan queue:work --tries=3
しかし、より粒度の高いアプローチは、ジョブクラス自身に最大試行回数を定義する方法です。これはコマンドラインで指定された値より、優先度が高くなっています。
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 最大試行回数
*
* @var int
*/
public $tries = 5;
}
失敗するまでジョブの試行を何度認めるかを定義する代わりに、ジョブのタイムアウト時間を定義することもできます。これにより、指定した時間内で複数回ジョブを試行します。タイムアウト時間を定義するには、ジョブクラスにretryUntilメソッドを追加します。
/**
* タイムアウトになる時間を決定
*
* @return \DateTime
*/
public function retryUntil()
{
return now()->addSeconds(5);
}
{tip} キューイベントリスナでも、
retryUntilメソッドを定義できます。
{note} タイムアウト機能はバージョン7.1以上のPHPと、
pcntlPHP拡張に対し、オプティマイズされています。
同様に、ジョブの最大実行秒数を指定するために、Artisanコマンドラインに--timeoutスイッチを指定することができます。
php artisan queue:work --timeout=30
しかしながら、最大実行秒数をジョブクラス自身に定義することもできます。ジョブにタイムアウト時間を指定すると、コマンドラインに指定されたタイムアウトよりも優先されます。
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* ジョブがタイムアウトになるまでの秒数
*
* @var int
*/
public $timeout = 120;
}
{note} この機能が動作するには、アプリケーションでRedisサーバが利用できる必要があります。
アプリケーションでRedisを利用しているなら、時間と回数により、キュージョブを制限できます。この機能は、キュージョブがレート制限のあるAPIに関連している場合に役立ちます。throttleメソッドの使用例として、指定したジョブタイプを60秒毎に10回だけ実行できるように制限しましょう。ロックできなかった場合、後で再試行できるように、通常はジョブをキューへ戻す必要があります。
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// ジョブのロジック処理…
}, function () {
// ロックできなかった場合の処理…
return $this->release(10);
});
{tip} 上記の例で
keyは、レート制限したいジョブのタイプを表す、一意の認識文字列です。たとえば、ジョブのクラス名と、(そのジョブに含まれているならば)EloquentモデルのIDを元に、制限できます。
もしくは、ジョブを同時に処理するワーカの最大数を指定することができます。これは、一度に一つのジョブが更新すべきリソースを変更するキュージョブを使用する場合に、役立ちます。funnelメソッドの使用例として、一度に1ワーカのみにより処理される、特定のタイプのジョブを制限してみましょう。
Redis::funnel('key')->limit(1)->then(function () {
// ジョブのロジック処理…
}, function () {
// ロックできなかった場合の処理…
return $this->release(10);
});
{tip} レート制限を使用する場合、実行を成功するまでに必要な試行回数を決めるのは、難しくなります。そのため、レート制限は時間ベースの試行と組み合わせるのが便利です。
ジョブの処理中に例外が投げられると、ジョブは自動的にキューへ戻され、再試行されます。ジョブはアプリケーションが許している最大試行回数に達するまで、連続して実行されます。最大試行回数はqueue:work Artisanコマンドへ--triesスイッチを使い定義されます。もしくは、ジョブクラス自身に最大試行回数を定義することもできます。キューワーカの実行についての情報は、以降で説明します。
Laravelには、キューに投入された新しいジョブを処理する、キューワーカも含まれています。queue:work Artisanコマンドを使いワーカを実行できます。queue:workコマンドが起動したら、皆さんが停止するか、ターミナルを閉じるまで実行指示付けることに注意してください。
php artisan queue:work
{tip} バックグランドで
queue:workプロセスを永続的に実行し続けるには、キューワーカが止まらずに実行し続けていることを確実にするため、Supervisorのようなプロセスモニタを利用する必要があります。
キューワーカは長時間起動するプロセスで、起動した状態のままメモリに保存されることを覚えておいてください。その結果、一度起動したら、コードベースの変更は反映されません。そのため、開発期間中はキューワーカを再起動することを忘れないでください。
--onceオプションは、ワーカにキュー中のジョブをひとつだけ処理するように指示します。
php artisan queue:work --once
どのキュー接続をワーカが使用するのかを指定できます。workコマンドで指定する接続名は、config/queue.php設定ファイルで定義されている接続と対応します。
php artisan queue:work redis
指定した接続の特定のキューだけを処理するように、さらにキューワーカをカスタマイズすることもできます。たとえば、メールの処理をすべて、redisキュー接続のemailsキューで処理する場合、以下のコマンドで単一のキューの処理だけを行うワーカを起動できます。
php artisan queue:work redis --queue=emails
デーモンキューワーカは各ジョブを処理する前に、フレームワークを「再起動」しません。そのため、各ジョブが終了したら、大きなリソースを開放してください。たとえば、GDライブラリでイメージ処理を行ったら、終了前にimagedestroyにより、メモリを開放してください。
時々、キューをどのように処理するかをプライオリティ付けしたいことも起きます。たとえば、config/queue.phpでredis接続のデフォルトqueueをlowに設定したとしましょう。しかし、あるジョブをhighプライオリティでキューへ投入したい場合です。
dispatch((new Job)->onQueue('high'));
lowキュー上のジョブの処理が継続される前に、全highキュージョブが処理されることを確実にするには、workコマンドのキュー名にコンマ区切りのリストで指定してください。
php artisan queue:work --queue=high,low
キューワーカは長時間起動プロセスであるため、リスタートしない限りコードの変更を反映しません。ですから、キューワーカを使用しているアプリケーションをデプロイする一番シンプルな方法は、デプロイ処理の間、ワーカをリスタートすることです。queue:restartコマンドを実行することで、全ワーカを穏やかに再起動できます。
php artisan queue:restart
このコマンドは存在しているジョブが失われないように、現在のジョブの処理が終了した後に、全キューワーカーへ穏やかに「終了する(die)」よう指示します。キューワーカはqueue:restartコマンドが実行されると、終了するわけですから、キュージョブを自動的に再起動する、Supervisorのようなプロセスマネージャーを実行すべきでしょう。
{tip} このコマンドはリスタートシグナルを保存するために、キャッシュを使用します。そのため、この機能を使用する前に、アプリケーションのキャッシュドライバーが、正しく設定されていることを確認してください。
config/queue.php設定ファイルの中で、各キュ接続はretry_afterオプションを定義しています。このオプションはジョブの処理を再試行するまで、キュー接続を何秒待つかを指定します。たとえば、retry_afterの値が90であれば、そのジョブは処理が終わってから90秒の間に削除されなければ、キューへ再投入されます。通常、retry_after値はジョブが処理を妥当に完了するまでの秒数の最大値を指定します。
{note}
retry_afterを含まない唯一の接続は、Amazon SQSです。SQSはAWSコンソールで管理する、Default Visibility Timeoutを元にリトライを行います。
queue:work Artisanコマンドは--timeoutオプションも提供しています。--timeoutオプションはLaravelキューマスタプロセスが、ジョブを処理する子のキューワーカをKillするまでどのくらい待つかを指定します。外部のHTTP呼び出しの反応が無いなど様々な理由で、時より子のキュープロセスは「フリーズ」します。--timeoutオプションは指定した実行時間を過ぎた、フリーズプロセスを取り除きます。
php artisan queue:work --timeout=60
retry_after設定オプションと--timeout CLIオプションは異なります。しかし、確実にジョブを失わずに、一度だけ処理を完了できるよう共に働きます。
{note}
--timeout値は、最低でも数秒retry_after設定値よりも短くしてください。これにより、与えられたジョブを処理するワーカが、ジョブのリトライ前に確実にkillされます。--timeoutオプションをretry_after設定値よりも長くすると、ジョブが2度実行されるでしょう。
ジョブがキュー上に存在しているとき、ワーカは各ジョブ間にディレイを取らずに実行し続けます。sleepオプションは新しく処理するジョブが存在しない時に、どの程度「スリープ」するかを決めます。スリープ中、ワーカは新しいジョブを処理しません。ジョブはワーカが目を様した後に処理されます。
php artisan queue:work --sleep=3
SupervisorはLinuxオペレーティングシステムのプロセスモニタで、queue:workプロセスが落ちると自動的に起動します。UbuntuにSupervisorをインストールするには、次のコマンドを使ってください。
sudo apt-get install supervisor
{tip} Supervisoの設定に圧倒されそうならば、Laravelプロジェクトのために、Supervisorを自動的にインストールし、設定するLaravel Forgeの使用を考慮してください。
Supervisorの設定ファイルは、通常/etc/supervisor/conf.dディレクトリに保存します。このディレクトリの中には、Supervisorにどのようにプロセスを監視するのか指示する設定ファイルを好きなだけ設置できます。たとえば、laravel-worker.confファイルを作成し、queue:workプロセスを起動、監視させてみましょう。
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
この例のnumprocsディレクティブは、Supervisorに全部で8つのqueue:workプロセスを実行・監視し、落ちている時は自動的に再起動するように指示しています。もちろんcommandディレクティブのqueue:work sqsの部分を変更し、希望のキュー接続に合わせてください。
設定ファイルができたら、Supervisorの設定を更新し起動するために以下のコマンドを実行してください。
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
Supervisorの詳細情報は、Supervisorドキュメントで確認してください。
時より、キューされたジョブは失敗します。心配ありません。物事は計画通りに進まないものです。Laravelではジョブを再試行する最大回数を指定できます。この回数試行すると、そのジョブはfailed_jobsデータベーステーブルに挿入されます。failed_jobsテーブルのマイグレーションを生成するにはqueue:failed-tableコマンドを実行して下さい。
php artisan queue:failed-table
php artisan migrate
次にキューワーカの実行時、queue:workコマンドに--triesスイッチを付け、最大試行回数を指定します。--triesオプションに値を指定しないと、ジョブは無限に試行します。
php artisan queue:work redis --tries=3
失敗時にジョブ特定のクリーンアップを実行するため、ジョブクラスでfailedメソッドを直接定義できます。これはユーザーに警告を送ったり、ジョブの実行アクションを巻き戻すために最適な場所です。failedメソッドには、そのジョブを落とすことになった例外(Exception)が渡されます。
<?php
namespace App\Jobs;
use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 新しいジョブインスタンスの生成
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* ジョブの実行
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// アップロード済みポッドキャストの処理…
}
/**
* 失敗したジョブの処理
*
* @param Exception $exception
* @return void
*/
public function failed(Exception $exception)
{
// 失敗の通知をユーザーへ送るなど…
}
}
ジョブが失敗した時に呼び出されるイベントを登録したい場合、Queue::failingメソッドが使えます。このイベントはメールやHipChatにより、チームへ通知する良い機会になります。例として、Laravelに含まれているAppServiceProviderで、このイベントのコールバックを付け加えてみましょう。
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;
class AppServiceProvider extends ServiceProvider
{
/**
* 全アプリケーションサービスの初期処理
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
/**
* サービスプロバイダの登録
*
* @return void
*/
public function register()
{
//
}
}
failed_jobsデータベーステーブルに挿入された、失敗したジョブを全部確認したい場合はqueue:failed Artisanコマンドを利用します。
php artisan queue:failed
queue:failedコマンドはジョブID、接続、キュー、失敗した時間をリスト表示します。失敗したジョブをジョブIDで指定することでリトライできます。たとえば、IDが5の失敗したジョブを再試行するため、以下のコマンドを実行します。
php artisan queue:retry 5
失敗したジョブをすべて再試行するには、IDとしてallをqueue:retryコマンドへ指定し、実行してください。
php artisan queue:retry all
失敗したジョブを削除する場合は、queue:forgetコマンドを使います。
php artisan queue:forget 5
失敗したジョブを全部削除するには、queue:flushコマンドを使います。
php artisan queue:flush
Queueファサードにbeforeとafterメソッドを使い、キューされたジョブの実行前後に実行する、コールバックを指定できます。これらのコールバックはログを追加したり、ダッシュボードの状態を増加させたりするための機会を与えます。通常、これらのメソッドはサービスプロバイダから呼び出します。たとえば、Laravelに含まれるAppServiceProviderを使っていましょう。
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 全アプリケーションサービスの初期処理
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
/**
* サービスプロバイダの登録
*
* @return void
*/
public function register()
{
//
}
}
Queue ファサードのloopingメソッドを使用し、ワーカがキューからジョブをフェッチする前に、指定したコールバックを実行できます。たとえば、直前の失敗したジョブの未処理のままのトランザクションをロールバックするクロージャを登録できます。
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});