Laravel 4でキューを使ってみる

キューエンジンとして、beanstalkdを使うんで、

yum install beanstalkd
service beanstalkd start

とかやってbeanstalkdをデフォルト設定で動かしておく。

Laravel 4のapp/config/queue.phpで、

'default' => 'beanstalkd',

に変更する。beanstalkdを使うには追加でライブラリをインストールしておく必要があるんで、composer.jsonのrequireに、

"pda/pheanstalk": "2.0.*"

を追加して、composer updateしてインストールしておく。

これでbeanstalkdコネクタを使ってキューを使う準備OK。

キューを使うサンプルとしては、単純なDBカウンタを作ってみる。

事前準備として、app/config/database.phpを書き換えて、mysqlデータベースが動くように適当にしておく。続いて、

php artisan migrate:make create_counter_table

で、カウンター用のテーブルを作る。app/database/migrations/*_create_counter_table.phpに、

	public function up()
	{
		Schema::create('counters', function ($table)
		{
		    $table->increments('id');
		    $table->integer('value');
		    $table->integer('total');
		    $table->timestamps();
		});
	}

としておく。要は、

create table counters (
id integer primary key,
value integer,
total integer,
created_at datetime,
updated_at datetime
);

で、valueに加算する整数値、totalにそこまでのvalueの合計が入るイメージ。

これを操作するモデルも作っておく。app/models/Counter.phpとして、

<?php
class Counter extends Eloquent
{
}

を作っておく。

で、まずはcounterにキューを使わずに(同期処理で)カウントするAPIを用意する。app/routes.phpに以下のルートを追加する。

Route::get('count/{value?}', function ($value = null) {
    $value = is_null($value) ? rand(1, 5) : $value;
    $total = Counter::sum('value') + $value;
    sleep(rand(0, 2));
    $counter = Counter::create(array('value' => $value, 'total' => $total));
    $counter->save();
    return $total;
})->where(array('id' => '\d+'));

http://[sample_app_host]/countにアクセスすると、現在DBに登録されているvalueの合計に、1~5までのランダムな整数が加算され、その結果の数値が表示される。

ちなみに、4行目に0~2秒のランダムなsleepが入っているのは、DBから合計値を取得するタイミングと、新しい計算結果をDBに登録するまでの間に強制的にタイムラグを発生させる(処理に時間がかかった場合に、不整合が起こるようなシチュエーションを再現させる)ためだ。

カウントするAPIだけだと状況がわかりにくいんで、現在DBに登録されている内容を表示するAPIも追加しておこう。

Route::get('list', function () {
   $list = Counter::orderBy('id')->get();
   $result = '';
   $total = 0;
   foreach ($list as $row) {
       $result .= $total . '+' . $row->value . '=' . $row->total . "<br />\n";
       $total = $row->total;
   }
   return $result;
});

これでhttp://[sample_app_host]/listにアクセスすると、現在DBに登録されている内容が以下のように表示される。

0+4=4
4+2=6
6+1=7
7+5=12
12+2=14
14+5=19

ちなみに上記の内容は、カウントAPIをブラウザごしに何回かリロードした結果となっている。しかし、実際にはWeb APIは一つのブラウザから順序よくアクセスされるとは限らない。そこで、いったんcountersテーブルをリセットし、apache benchを使って、以下のような複数同時アクセスを発生させてみよう。

ab.exe -c 5 -n 20 http://[sample_app_host]/count

その結果は以下のようになった。

0+1=1
1+1=2
2+3=4
4+2=3
3+5=6
6+1=13
13+5=18
18+3=4
4+2=20
20+3=26
26+5=26
26+4=25
25+3=38
38+3=24
24+2=23
23+3=46
46+2=28
28+3=38
38+2=43
43+2=43

集計処理とその結果をDBに登録する処理の間にランダムなタイムラグ(sleep)があるため、正しい結果になっていない。

そこで、キューを使って、呼び出し&登録処理が非同期でも、実際の計算処理は必ず順番通りに行われるようにしてみる。

Route::get('qcount/{value?}', function ($value = null) {
    Queue::push(function ($job) use ($value) {
        $value = is_null($value) ? rand(1, 5) : $value;
        $total = Counter::sum('value') + $value;
        sleep(rand(0, 2));
        $counter = Counter::create(array('value' => $value, 'total' => $total));
        $counter->save();
        $job->delete();
    });
    return 'Queued.';
})->where(array('id' => '\d+'));

http://[sample_app_host]/qcountにアクセスすると、先ほどcountで実行した内容と同じ処理をクロージャとしてキューに登録するように書き直した。キューに登録した時点では処理は行われないので、この時点ではDBに登録処理は行われない。

php artisan queue:listen

を実行すると、キューに登録された処理を順次実行していくことになる。

はずだったのだが、実際には上記コードはうまく動かなかった。listenでキューの内容を順次実行するのではなく、

php artisan queue:work

を実行して、最初の一件のジョブだけを実行してみると、以下のようなエラーが発生していた。

{"error":{"type":"Symfony\\Component\\Debug\\Exception\\FatalErrorException","message":"syntax error, unexpected ';'","file":"\/path\/to\/app\/vendor\/laravel\/framework\/src\/Illuminate\/Support\/SerializableClosure.php(173) : eval()'d code","line":12}}

クロージャをevalする際に構文エラーが発生しているようだが、深追いする気になれなかったので、クロージャを使う方法はあきらめた。

クロージャを使わない方法としては、キュージョブを実行するクラスを用意し、そのクラス名を登録する方法がある。app/libs/QueueTest.phpとして、

<?php

class QueueTest
{
    public function fire($job, $data)
    {
        $value = is_null($data['value']) ? rand(1, 5) : $data['value'];
        $total = Counter::sum('value') + $value;
        sleep(rand(0, 2));
        $counter = Counter::create(array('value' => $value, 'total' => $total));
        $counter->save();
        $job->delete();
    }
}

を用意し、QueueTestクラスをオートロードするために、app/start/global.phpのClassLoader::addDirectoriesに、「app_path().’/libs’」ディレクトリを追加しておく。

さらにキューを使ったルートの内容を以下のように変更する。

Route::get('qcount/{value?}', function ($value = null) {
    Queue::push('QueueTest', array('value' => $value));
    return 'Queued.';
})->where(array('id' => '\d+'));

これで、キューに登録されたジョブを処理する際には、QueueTest::fire()メソッドが呼ばれるようになる。apache benchを使ってこちらの実行結果も試してみると、

ab.exe -c 5 -n 20 http://[sample_app_host]/qcount
0+1=1
1+4=5
5+5=10
10+2=12
12+5=17
17+3=20
20+2=22
22+2=24
24+3=27
27+2=29
29+2=31
31+1=32
32+4=36
36+2=38
38+4=42
42+5=47
47+3=50
50+4=54
54+2=56
56+2=58

こちらはAPIは同時アクセスで呼び出されても、処理自体は順次(シリアルに)実行されているのがわかる。

というわけで、Laravel 4のキューを実際に使ってみたら、なんかうまく動かない部分もあったけれども、一応回避して使えることはちゃんと使えました、よかったよかった、という話でした。

※すでにクロージャを使ったキュージョブが登録されている場合は、クラスを使ったキュー登録処理を実行する前に、beanstalkdを再起動するなどして、クロージャを使った(=実行できない)ジョブをクリアしておく必要がある。

※キューをリッスンしておくために、あらかじめsupervisordなどを使って、php artisan queue:listenを自動起動&自動再起動するように設定しておくと便利。

追記)クロージャでジョブを登録するとSyntax Errorが出る件は、

    Queue::push(function ($job) use ($value) {

    Queue::push(function($job) use ($value) {

のように、クロージャの「function」と「(」の間の半角スペースを除去すれば動くようになった。see: SerializableClosure::getCodeFromFile()。

現状のLaravel 4の仕様を厳密に捉えると、バグといえばバグなんだろうけど、修正するよりもPSR-2に準拠して書こうってことで、使う側が対処する方が真っ当な気もする。strposの代わりにpreg_matchを使ってクロージャの開始位置を探すようにすれば修正は可能だけど。

関連する投稿: