Может войдёшь?
Черновики Написать статью Профиль

Очереди

перевод документация 5.х

  1. 1. Введение
    1. 1.1. Подключения или очереди
    2. 1.2. Требования для драйверов
  2. 2. Создание задач
    1. 2.1. Генерирование классов задач
    2. 2.2. Структура класса
  3. 3. Отправка задач в очередь
    1. 3.1. Отложенные задачи
    2. 3.2. Настройка очереди и подключения
    3. 3.3. Обработка ошибок
  4. 4. Запуск обработчика очереди
    1. 4.1. Приоритеты очередей
    2. 4.2. Обработчики очереди и развёртывание
    3. 4.3. Окончание срока задачи и таймауты
    4. 4.4. Получение задач из запросов
    5. 4.5. Добавление замыканий в очередь
  5. 5. Настройка Supervisor
  6. 6. Проваленные задачи
    1. 6.1. Уборка после проваленных задач
    2. 6.2. События проваленных задач
    3. 6.3. Повторный запуск проваленных задач
    4. 6.4. События задач
  7. 7. Запуск слушателя очереди
    1. 7.1. Демон-обработчик очереди
    2. 7.2. Развёртывание с помощью демонов-обработчиков очереди
  8. 8. Push-очереди
Этот перевод актуален для англоязычной документации на (ветка 5.3) , (ветка 5.2) , (ветка 5.1) и (ветка 5.0). Опечатка? Выдели и нажми Ctrl+Enter.

Введение

Очереди Laravel предоставляют единое API для различных сервисов очередей, таких как Beanstalk, Amazon SQS, Redis или даже реляционных БД. Очереди позволяют вам отложить выполнение времязатратных задач, таких как отправка e-mail, на более позднее время, таким образом на порядок ускоряя обработку веб-запросов в вашем приложении.

Настройки очередей хранятся в файле config/queue.php. В нём вы найдёте настройки для каждого драйвера очереди, которые поставляются вместе с фреймворком: база данных, Beanstalkd, IronMQ (только для версии 5.1 и ранее), Amazon SQS, Redis, null, а также синхронный драйвер (для локального использования). Драйвер null просто отменяет задачи очереди, поэтому они никогда не выполнятся.

+ 5.3

добавлено в 5.3 ()

Подключения или очереди

Перед изучением очередей Laravel важно понимать различие между «подключениями» и «очередями». В вашем файле настроек config/queue.php есть параметр connections, который определяет конкретное подключение к сервису очередей, такому как Amazon SQS, Beanstalk или Redis. Но у любого подключения может быть несколько «очередей», которые можно представить в виде отдельных стеков или наборов задач в очереди.

Обратите внимание, в каждом примере настройки подключения в файле queue есть атрибут queue. Это та очередь, в которую будут помещаться задачи при отправке по данному подключению. Другими словами, если вы отправляете задачу без явного указания очереди, задача будет помещена в очередь, заданную в атрибуте queue в настройках подключения:

PHP
// Эта задача отправлена в очередь по умолчанию...
dispatch(new Job);

// Эта задача отправлена в очередь "emails"...
dispatch((new Job)->onQueue('emails'));

Для некоторых приложений нет необходимости помещать задачи в несколько очередей, для них достаточно иметь одну простую очередь. Иметь несколько очередей особенно полезно для приложений, в которых необходимо разделение обрабатываемых задач по сегментам или приоритетам. Обработчик очереди Laravel позволяет вам указывать какие задачи имеют больший приоритет. Например, если вы отправите задачу в очередь high, то можете запустить обработчик, который даст им больший приоритет при выполнении:

shphp artisan queue:work --queue=high,default

Требования для драйверов

База данных

Для использования драйвера очереди database вам понадобится таблица в БД для хранения задач. Чтобы генерировать миграцию для создания этой таблицы, выполните Artisan-команду shqueue:table. Когда миграция будет создана, вы сможете мигрировать свою базу данных командой shmigrate:

shphp artisan queue:table

php artisan migrate
+ 5.3

добавлено в 5.3 ()

Redis

Для использования драйвера очереди redis вам надо настроить подключение к базе данных Redis в файле config/database.php.

Если ваше подключение к очереди Redis использует Redis Cluster, имена ваших очередей должны содержать хештег ключа. Это необходимо, чтобы убедиться в том, что все ключи Redis для данной очереди помещены в один хеш-слот:

conf'redis' => [
  'driver' => 'redis',
  'connection' => 'default',
  'queue' => '{default}',
  'retry_after' => 90,
],

Требования других очередей

Упомянутые выше драйвера имеют следующие зависимости:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~3.0
  • IronMQ: iron-io/iron_mq ~2.0|~4.0 (только для версии 5.1 и ранее)
  • Redis: predis/predis ~1.0

Создание задач

Генерирование классов задач

По умолчанию все помещаемые в очередь задачи вашего приложения хранятся в папке app/Jobs (в версии 5.0 App\Commands). Если папки app/Jobs не существует, она будет создана при запуске Artisan-команды shmake:job. Вы можете сгенерировать новую задачу для очереди с помощью Artisan-команды:

+ 5.3 5.2

добавлено в 5.3 () 5.2 ()

shphp artisan make:job SendReminderEmail
+ 5.1

добавлено в 5.1 ()

shphp artisan make:job SendReminderEmail --queued
+ 5.0

добавлено в 5.0 ()

shphp artisan make:command SendEmail --queued

Сгенерированный класс будет реализацией интерфейса Illuminate\Contracts\Queue\ShouldQueue, так Laravel поймёт, что задачу надо поместить в очередь, а не выполнить немедленно.

Структура класса

Классы задач очень просты, обычно они содержат только метод PHPhandle(), который вызывается при обработке задачи в очереди. Для начала давайте посмотрим на пример класса задачи.

+ 5.3

добавлено в 5.3 ()

В этом примере мы представим, что создаём сервис публикации подкастов, и нам надо обрабатывать загружаемые файлы подкастов перед их публикацией:

PHP
<?php

namespace App\Jobs;

use 
App\Podcast;
use 
App\AudioProcessor;
use 
Illuminate\Bus\Queueable;
use 
Illuminate\Queue\SerializesModels;
use 
Illuminate\Queue\InteractsWithQueue;
use 
Illuminate\Contracts\Queue\ShouldQueue;

class 
ProcessPodcast implements ShouldQueue
{
  use 
InteractsWithQueueQueueableSerializesModels;

  protected 
$podcast;

  
/**
   * Создание нового экземпляра задачи.
   *
   * @param  Podcast  $podcast
   * @return void
   */
  
public function __construct(Podcast $podcast)
  {
    
$this->podcast $podcast;
  }

  
/**
   * Выполнение задачи.
   *
   * @param  AudioProcessor  $processor
   * @return void
   */
  
public function handle(AudioProcessor $processor)
  {
    
// Обработка загруженного подкаста...
  
}
}
+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

PHP
<?php

namespace App\Jobs;

use 
App\User;
use 
App\Jobs\Job;
use 
Illuminate\Contracts\Mail\Mailer;
use 
Illuminate\Queue\SerializesModels;
use 
Illuminate\Queue\InteractsWithQueue;
use 
Illuminate\Contracts\Queue\ShouldQueue;

//для версии 5.1 и ранее:
//use Illuminate\Contracts\Bus\SelfHandling;
//
//class SendReminderEmail extends Job implements SelfHandling, ShouldQueue

class SendReminderEmail extends Job implements ShouldQueue
{
  use 
InteractsWithQueueSerializesModels;

  protected 
$user;

  
/**
   * Создание нового экземпляра задачи.
   *
   * @param  User  $user
   * @return void
   */
  
public function __construct(User $user)
  {
    
$this->user $user;
  }

  
/**
   * Выполнение задачи.
   *
   * @param  Mailer  $mailer
   * @return void
   */
  
public function handle(Mailer $mailer)
  {
    
$mailer->send('emails.reminder', ['user' => $this->user], function ($m) {
      
//
    
});

    
$this->user->reminders()->create(...);
  }
}

Обратите внимание, в этом примере мы можем передать модель Eloquent напрямую в конструктор задачи. Благодаря используемому в задаче типажу SerializesModels, модели Eloquent будут изящно сериализованы и десериализованы при выполнении задачи. Если ваша задача принимает модель Eloquent в своём конструкторе, в очередь будет сериализован только идентификатор модели. А когда очередь начнёт обработку задачи, система очередей автоматически запросит полный экземпляр модели из БД. Это всё полностью прозрачно для вашего приложения и помогает избежать проблем, связанных с сериализацией полных экземпляров моделей Eloquent.

Метод PHPhandle() вызывается при обработке задачи очередью. Обратите внимание, что мы можем указывать зависимости в методе PHPhandle() задачи. сервис-контейнер Laravel автоматически внедрит эти зависимости.

+ 5.0

добавлено в 5.0 ()

Если вы хотите, чтобы у команды был отдельный класс-обработчик, вам надо добавить ключ sh--handler в команду shmake:command:

shphp artisan make:command SendEmail --queued --handler
+ 5.3

добавлено в 5.3 ()

Бинарные данные, такие как сырое содержимое изображения, должны быть пропущены через функцию PHPbase64_encode() перед отправкой в задачу для очереди. Иначе задача может быть неправильно сериализована в JSON при помещении в очередь.

+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

Ручной выпуск задач

Если вы хотите «отпустить» задачу вручную, то типаж InteractsWithQueue, который уже включён в ваш сгенерированный класс задачи, предоставляет доступ к методу задачи PHPrelease(). Этот метод принимает один аргумент — число секунд, через которое задача станет снова доступной:

PHP
public function handle(Mailer $mailer)
{
  if (
condition) {
    
$this->release(10);
  }
}

Проверка числа попыток запуска

Как уже было сказано, при возникновении исключения при выполнении задачи она будет автоматически возвращена в очередь. Вы можете проверить число сделанных попыток выполнения задачи при помощи метода PHPattempts():

PHP
public function handle(Mailer $mailer)
{
  if (
$this->attempts() > 3) {
    
//
  
}
}

Отправка задач в очередь

+ 5.3

добавлено в 5.3 ()

Когда вы напишете класс вашей задачи, вы можете отправить её в очередь с помощью вспомогательного метода PHPdispatch(). Необходимо передать единственный аргумент — экземпляр задачи:

PHP
<?php

namespace App\Http\Controllers;

use 
App\Jobs\ProcessPodcast;
use 
Illuminate\Http\Request;
use 
App\Http\Controllers\Controller;

class 
PodcastController extends Controller
{
  
/**
   * Сохранить новый подкаст.
   *
   * @param  Request  $request
   * @return Response
   */
  
public function store(Request $request)
  {
    
// Создать подкаст...

    
dispatch(new ProcessPodcast($podcast));
  }
}

Вспомогательный метод PHPdispatch() — удобная, лаконичная, доступная глобально функция, а также её чрезвычайно просто тестировать. Узнайте больше их документации по тестированию.

+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

Стандартный контроллер Laravel расположен в app/Http/Controllers/Controller.php и использует типаж DispatchesJobs. Этот типаж предоставляет несколько методов, позволяющих вам удобно помещать задачи в очередь. Например, метод PHPdispatch():

PHP
<?php

namespace App\Http\Controllers;

use 
App\User;
use 
Illuminate\Http\Request;
use 
App\Jobs\SendReminderEmail;
use 
App\Http\Controllers\Controller;

class 
UserController extends Controller
{
  
/**
   * Послать уведомление по e-mail данному пользователю.
   *
   * @param  Request  $request
   * @param  int  $id
   * @return Response
   */
  
public function sendReminderEmail(Request $request$id)
  {
    
$user User::findOrFail($id);

    
$this->dispatch(new SendReminderEmail($user));
  }
}
+ 5.2

добавлено в 5.2 ()

Типаж DispatchesJobs

Разумеется, иногда нужно отправить задачу из другого места вашего приложения — не из маршрута или контроллера. Для этого вы можете включить типаж DispatchesJobs в любой класс своего приложения, чтобы получить доступ к его различным методам отправки. Например, вот пример класса, использующего этот типаж:

PHP
<?php

namespace App;

use 
Illuminate\Foundation\Bus\DispatchesJobs;

class 
ExampleClass
{
  use 
DispatchesJobs;
}

Функция PHPdispatch()

Или, вы можете использовать глобальную функцию PHPdispatch():

PHP
Route::get('/job', function () {
  
dispatch(new App\Jobs\PerformTask);

  return 
'Готово!';
});
+ 5.0

добавлено в 5.0 ()

Для добавления новой задачи в очередь используйте метод PHPQueue::push():

PHP
Queue::push(new SendEmail($message));

В этом примере мы используем фасад Queue напрямую. Но обычно вы будете отправлять команды в очередь через командную шину. В этой статье мы продолжим использовать фасад Queue, но также коснёмся и командной шины, поскольку она используется и для отправки синхронных задач, и для отправки задач в очередь.

Сгенерированный обработчик будет помещён в App\Handlers\Commands и будет извлечён из IoC-контейнера.

Отложенные задачи

Если вам надо отложить выполнение задачи в очереди, используйте метод PHPdelay() на экземпляре задачи. Этот метод обеспечивается типажом Illuminate\Bus\Queueable, который включён во все генерируемые классы задач по умолчанию.

+ 5.3

добавлено в 5.3 ()

Например, давайте укажем, что задача станет доступной для обработки через 10 минут после отправки в очередь:

PHP
<?php

namespace App\Http\Controllers;

use 
Carbon\Carbon;
use 
App\Jobs\ProcessPodcast;
use 
Illuminate\Http\Request;
use 
App\Http\Controllers\Controller;

class 
PodcastController extends Controller
{
  
/**
   * Сохранить новый подкаст.
   *
   * @param  Request  $request
   * @return Response
   */
  
public function store(Request $request)
  {
    
// Создать подкаст...

    
$job = (new ProcessPodcast($podcast))
                ->
delay(Carbon::now()->addMinutes(10));

    
dispatch($job);
  }
}
+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

Например, вы захотите поместить в очередь задачу, которая отправляет пользователю e-mail через 5 минут после регистрации:

PHP
<?php

namespace App\Http\Controllers;

use 
App\User;
use 
Illuminate\Http\Request;
use 
App\Jobs\SendReminderEmail;
use 
App\Http\Controllers\Controller;

class 
UserController extends Controller
{
  
/**
   * Отправить уведомление на e-mail данному пользователю.
   *
   * @param  Request  $request
   * @param  int  $id
   * @return Response
   */
  
public function sendReminderEmail(Request $request$id)
  {
    
$user User::findOrFail($id);

    
$job = (new SendReminderEmail($user))->delay(60 5);

    
$this->dispatch($job);
  }
}

В этом примере мы указали, что задача в очереди должна быть отложена на 5 минут до того, как станет доступной обработчикам.

Сервис Amazon SQS имеет ограничение на задержку не более 15 минут.

+ 5.0

добавлено в 5.0 ()

Вы можете добиться этого, используя метод PHPQueue::later():

PHP
$date Carbon::now()->addMinutes(15);

Queue::later($date, new SendEmail($message));

В этом примере мы используем библиотеку для работы со временем Carbon, чтобы указать необходимое время задержки для задачи. Другой вариант — передать необходимое число секунд для задержки.

Настройка очереди и подключения

Задание очереди для задачи

+ 5.0

добавлено в 5.0 ()

Вы можете задать очередь, в которую следует посылать задачу:

PHP
Queue::pushOn('emails', new SendEmail($message));

Помещая задачи в разные очереди, вы можете разделять их по категориям, а также задавать приоритеты по количеству обработчиков разных очередей. Это не касается различных «подключений» очередей, определённых в файле настроек очереди, а только конкретных очередей в рамках одного подключения. Чтобы указать очередь используйте метод PHPonQueue() на экземпляре задачи:

+ 5.3

добавлено в 5.3 ()

PHP
<?php

namespace App\Http\Controllers;

use 
App\Jobs\ProcessPodcast;
use 
Illuminate\Http\Request;
use 
App\Http\Controllers\Controller;

class 
PodcastController extends Controller
{
  
/**
   * Сохранить новый подкаст.
   *
   * @param  Request  $request
   * @return Response
   */
  
public function store(Request $request)
  {
    
// Создать подкаст...

    
$job = (new ProcessPodcast($podcast))->onQueue('processing');

    
dispatch($job);
  }
}
+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

PHP
<?php

namespace App\Http\Controllers;

use 
App\User;
use 
Illuminate\Http\Request;
use 
App\Jobs\SendReminderEmail;
use 
App\Http\Controllers\Controller;

class 
UserController extends Controller
{
  
/**
   * Отправить уведомление на e-mail данному пользователю.
   *
   * @param  Request  $request
   * @param  int  $id
   * @return Response
   */
  
public function sendReminderEmail(Request $request$id)
  {
    
$user User::findOrFail($id);

    
$job = (new SendReminderEmail($user))->onQueue('emails');

    
$this->dispatch($job);
  }
}
+ 5.0

добавлено в 5.0 ()

Передача одинаковых данных в несколько задач

Если вам надо передать одинаковые данные в несколько задач в очереди, вы можете использовать метод PHPQueue::bulk():

PHP
Queue::bulk([new SendEmail($message), new AnotherCommand]);
+ 5.3 5.2

добавлено в 5.3 () 5.2 ()

Указание подключения к очереди для задачи

Если вы работаете с несколькими подключениями к очередям, то можете указать, в какое из них надо поместить задачу. Для этого служит метод PHPonConnection() на экземпляре задачи:

+ 5.3

добавлено в 5.3 ()

PHP
<?php

namespace App\Http\Controllers;

use 
App\Jobs\ProcessPodcast;
use 
Illuminate\Http\Request;
use 
App\Http\Controllers\Controller;

class 
PodcastController extends Controller
{
  
/**
   * Сохранить новый подкаст.
   *
   * @param  Request  $request
   * @return Response
   */
  
public function store(Request $request)
  {
    
// Создать подкаст...

    
$job = (new ProcessPodcast($podcast))->onConnection('sqs');

    
dispatch($job);
  }
}
+ 5.2

добавлено в 5.2 ()

PHP
<?php

namespace App\Http\Controllers;

use 
App\User;
use 
Illuminate\Http\Request;
use 
App\Jobs\SendReminderEmail;
use 
App\Http\Controllers\Controller;

class 
UserController extends Controller
{
  
/**
   * Послать данному пользователю напоминание на e-mail.
   *
   * @param  Request  $request
   * @param  int  $id
   * @return Response
   */
  
public function sendReminderEmail(Request $request$id)
  {
    
$user User::findOrFail($id);

    
$job = (new SendReminderEmail($user))->onConnection('alternate');

    
$this->dispatch($job);
  }
}
+ 5.3 5.2

добавлено в 5.3 () 5.2 ()

Само собой, вы можете сцепить методы PHPonConnection() и PHPonQueue(), чтобы указать подключение и очередь для задачи:

PHP
$job = (new ProcessPodcast($podcast))
                ->
onConnection('sqs')
                ->
onQueue('processing');

Обработка ошибок

Если во время выполнения задачи возникло исключение, она будет автоматически возвращена в очередь, и можно будет снова попробовать выполнить её. Попытки будут повторяться до тех пор, пока не будет достигнуто заданное максимальное число для вашего приложения. Это число определяется параметром sh--tries, используемом в Artisan-команде shqueue:work. Подробнее о запуске обработчика очереди читайте ниже.

+ 5.3

добавлено в 5.3 ()

Запуск обработчика очереди

Laravel содержит обработчик очереди, который обрабатывает новые задачи при поступлении в очередь. Вы можете запустить его Artisan-командой shqueue:work. Запомните, команда shqueue:work будет выполняться, пока вы не остановите её вручную или закроете свой терминал:

shphp artisan queue:work

Чтобы процесс shqueue:work выполнялся в фоне постоянно, используйте монитор процессов, такой как Supervisor, для проверки, что обработчик очереди не остановился.

Запомните, обработчики очереди — долгоживущие процессы, и они хранят в памяти состояние загруженного приложения. Поэтому они не заметят изменения в вашем коде, сделанные после их запуска. Поэтому в процессе развёртывания не забудьте перезапустить обработчиков очереди.

Указание подключения и очереди

Также вы можете указать, какое подключение должен использовать обработчик очереди. Имя подключения передаётся команде shwork и должно соответствовать одному из подключений, определённых в файле config/queue.php%:

shphp artisan queue:work redis

Можно ещё точнее настроить обработчика очереди, указав для него конкретные очереди в данном подключении. Например, если все ваши email-сообщения обрабатываются в очереди emails в подключении redis, вы можете выполнить такую команду для запуска обработчика только для этой очереди:

shphp artisan queue:work redis --queue=emails

Освобождение ресурсов

Демоны-обработчики очереди не «перезагружают» фреймворк перед обработкой каждой задачи. Поэтому вам надо освобождать все тяжёлые ресурсы после завершения каждой задачи. Например, если вы выполняете обработку изображения с помощью библиотеки GD, то после завершения обработки вам надо освободить память с помощью PHPimagedestroy.

Приоритеты очередей

Иногда бывает необходимо организовать обработку очередей по их приоритетам. Например, в файле config/queue.php вы можете задать очередь low как очередь по умолчанию для подключения redis. При этом при необходимости вы можете помещать задачу в очередь high вот так:

PHP
dispatch((new Job)->onQueue('high'));

Чтобы запустить обработчика, который сначала проверит, что обработаны все задачи из очереди high, и только потом продолжит обрабатывать задачи из очереди low, передайте в команду sh список очередей через запятую:

shphp artisan queue:work --queue=high,low

Обработчики очереди и развёртывание

Поскольку обработчики очереди — долгоживущие процессы, они не подхватят изменения в вашем коде, пока не будут перезапущены. Поэтому простейший способ развернуть приложение с обработчиками очереди — перезапустить обработчиков во время процесса развёртывания. Вы можете мягко перезапустить всех своих обработчиков командой shqueue:restart:

shphp artisan queue:restart

Эта команда заставит обработчиков очереди мягко «умереть» после завершения ими текущей задачи, поэтому ни одна существующая задаче не будет потеряна. Поскольку обработчики очереди умрут при выполнении команды shqueue:restart, вам надо запустить менеджер процессов, такой как Supervisor, для автоматического перезапуска обработчиков очереди.

Окончание срока задачи и таймауты

Окончание срока задачи

В файле config/queue.php для каждого подключения определён параметр retry_after. Этот параметр указывает, сколько секунд подключение должно ждать, прежде чем заново приступить к обрабатываемой задаче. Например, если значение retry_after равно 90, то задача будет возвращена назад в очередь, если пробудет в обработке 90 секунд и не будет удалена. Обычно для retry_after лучше задать значение равное максимальному разумному количеству секунд, которое потребуется задачам для выполнения.

Единственное подключение, у которого нет значения retry_after, — Amazon SQS. SQS пробует выполнить задачу заново исходя из стандартного таймаута видимости, который настраивается в консоли AWS.

Таймауты обработчиков

Artisan-команда shqueue:work имеет параметр sh--timeout. Этот параметр указывает, сколько должен ждать главный процесс обработки очередей Laravel, прежде чем уничтожить дочернего обработчика очереди, выполняющего задачу. Дочерний процесс обработки очереди может «зависнуть» по разным причинам, таким как внешний HTTP-вызов, который не отвечает. Параметр sh--timeout удаляет зависнувшие процессы, превысившие указанный лимит времени:

shphp artisan queue:work --timeout=60

Параметр retry_after в настройках и параметр командной строки sh--timeout разные, но работают совместно для проверки, что задачи не потерялись, и что задачи выполняются успешно только один раз.

Значение sh--timeout должно быть всегда по крайней мере на несколько секунд меньше значения retry_after в настройках. Таким образом обработчик, выполняющий данную задачу, будет всегда убиваться до повторной попытки выполнения задачи. Если ваш параметр sh--timeout имеет значение больше значения retry_after, ваши задачи могут выполниться дважды.

Длительность паузы обработчика

Пока в очереди доступны задачи, обработчик будет выполнять их без задержки между ними. С помощью параметра shsleep можно задать время, на которое будет «засыпать» обработчик, если нет новых задач:

shphp artisan queue:work --sleep=3
+ 5.1 5.0

добавлено в 5.1 () 5.0 ()

Получение задач из запросов

Очень часто переменные HTTP-запросов переносятся в задачи. Поэтому вместо того, чтобы заставлять вас делать это вручную для каждого запроса, Laravel предоставляет несколько вспомогательных методов. Давайте посмотрим на метод PHPdispatchFrom() типажа DispatchesJobs. По умолчанию этот типаж включён в базовый класс контроллера Laravel:

PHP
<?php

namespace App\Http\Controllers;

use 
Illuminate\Http\Request;
use 
App\Http\Controllers\Controller;

class 
CommerceController extends Controller
{
  
/**
   * Обработка данного заказа.
   *
   * @param  Request  $request
   * @param  int  $id
   * @return Response
   */
  
public function processOrder(Request $request$id)
  {
    
// Обработка запроса...

    
$this->dispatchFrom('App\Jobs\ProcessOrder'$request);
  }
}

Этот метод проверит конструктор класса данной задачи и извлечёт переменные из HTTP-запроса (или любого другого объекта ArrayAccess) для заполнения необходимых параметров конструктора задачи. Поэтому, если класс нашей задачи принимает в конструкторе переменную productId, то шина задач попытается получить параметр productId из HTTP-запроса.

Вы также можете передать массив третьим аргументом метода PHPdispatchFrom(). Этот массив будет использован для заполнения тех параметров, которых нет в запросе:

PHP
$this->dispatchFrom('App\Jobs\ProcessOrder'$request, [
    
'taxPercentage' => 20,
]);
+ 5.0

добавлено в 5.0 ()

Добавление замыканий в очередь

Вы можете помещать в очередь и функции-замыкания. Это очень удобно для простых, быстрых задач, выполняющихся в очереди.

Добавление замыкания в очередь

PHP
Queue::push(function($job) use ($id)
{
  
Account::delete($id);

  
$job->delete();
});

Вместо того, чтобы делать объекты доступными для замыканий в очереди через директиву PHPuse, передавайте первичные ключи и повторно извлекайте связанные модели из вашей задачи в очереди. Это часто позволяет избежать неожиданного поведения при сериализации.

При использовании push-очередей Iron.io будьте особенно внимательны при добавлении замыканий. Конечная точка выполнения, получающая ваше сообщение, должна проверить входящую последовательность-ключ, чтобы удостовериться, что запрос действительно исходит от Iron.io. Например, ваша конечная push-точка может иметь адрес вида https://yourapp.com/queue/receive?token=SecretToken — где значение token можно проверять перед собственно обработкой задачи.

Настройка Supervisor

Установка Supervisor

Supervisor — монитор процессов для ОС Linux, он автоматически перезапустит ваш процесс shqueue:work, если он остановится. Для установки Supervisor в Ubuntu используйте такую команду:

shsudo apt-get install supervisor

Если самостоятельная настройка Supervisor кажется вам слишком сложной, вы можете использовать Laravel Forge, который автоматически установит и настроит Supervisor для ваших проектов Laravel.

Настройка Supervisor

Файлы настроек Supervisor обычно находятся в папке /etc/supervisor/conf.d. Там вы можете создать любое количество файлов с настройками, по которым Supervisor поймёт, как отслеживать ваши процессы. Например, давайте создадим файл laravel-worker.conf, который запускает и наблюдает за процессом queue:work:

conf[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

В этом примере numprocs указывает, что Supervisor должен запустить 8 процессов queue:work и наблюдать за ними, автоматически перезапуская их при их остановках. Само собой, вам надо изменить часть queue:work sqs директивы command в соответствии с вашим подключением очереди.

Подробнее о Supervisor читайте в его документации.

+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

Запуск Supervisor

После создания файла настроек вы можете обновить конфигурацию Supervisor и запустить процесс при помощи следующих команд:

shsudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*

Проваленные задачи

Не всегда всё идёт по плану, иногда ваши задачи в очереди будут заканчиваться ошибкой. Не волнуйтесь, такое с каждым случается! В Laravel есть удобный способ указать максимальное количество попыток выполнения задачи. После превышения этого количества попыток задача будет добавлена в таблицу failed_jobs. Для создания миграции для таблицы failed_jobs можно использовать команду shqueue:failed-table:

shphp artisan queue:failed-table

php artisan migrate

При запуске вашего обработчика очереди нужно указать максимальное количество попыток выполнения задачи с помощью параметра sh--tries команды shqueue:work. Если не задать этот параметр, то попытки выполнения задачи будут бесконечны:

shphp artisan queue:work redis --tries=3
+ 5.3

добавлено в 5.3 ()

Уборка после проваленных задач

Вы можете определить метод PHPfailed() прямо в классе задачи, это позволит вам выполнять уборку после данной конкретной задачи при возникновении ошибки. Это идеальное место для отправки предупреждения вашим пользователям или для отмены всех действий, выполненных задачей. Исключение PHPException, приведшее к провалу задачи, будет передано в метод PHPfailed():

PHP
<?php

namespace App\Jobs;

use 
Exception;
use 
App\Podcast;
use 
App\AudioProcessor;
use 
Illuminate\Bus\Queueable;
use 
Illuminate\Queue\SerializesModels;
use 
Illuminate\Queue\InteractsWithQueue;
use 
Illuminate\Contracts\Queue\ShouldQueue;

class 
ProcessPodcast implements ShouldQueue
{
  use 
InteractsWithQueueQueueableSerializesModels;

  protected 
$podcast;

  
/**
   * Создать новый экземпляр задачи.
   *
   * @param  Podcast  $podcast
   * @return void
   */
  
public function __construct(Podcast $podcast)
  {
    
$this->podcast $podcast;
  }

  
/**
   * Выполнить задачу.
   *
   * @param  AudioProcessor  $processor
   * @return void
   */
  
public function handle(AudioProcessor $processor)
  {
    
// Обработка загруженного подкаста...
  
}

  
/**
   * Ошибка выполнения задачи.
   *
   * @param  Exception  $exception
   * @return void
   */
  
public function failed(Exception $exception)
  {
    
// Отправить пользователю уведомление об ошибке, и т.п. ...
  
}
}

События проваленных задач

Если вы хотите зарегистрировать событие, которое будет вызываться при ошибке выполнения задачи, используйте метод PHPQueue::failing(). Это событие — отличная возможность оповестить вашу команду через email или HipChat. Например, мы можем прикрепить обратный вызов к данному событию из AppServiceProvider, который включён в Laravel:

PHP
<?php

namespace App\Providers;

use 
Illuminate\Support\Facades\Queue;
use 
Illuminate\Support\ServiceProvider;
//для версии 5.2 и выше:
use Illuminate\Queue\Events\JobFailed;

class 
AppServiceProvider extends ServiceProvider
{
  
/**
   * Начальная загрузка всех сервисов приложения.
   *
   * @return void
   */
  
public function boot()
  {
    
//для версии 5.2 и выше:
    
Queue::failing(function (JobFailed $event) {
      
// $event->connectionName
      // $event->job
      // $event->exception

    //для версии 5.1 и ранее:
    //Queue::failing(function ($connection, $job, $data) {
    //  // Оповещение команды о проваленной задаче...
    
});
  }

  
/**
   * Регистрация сервис-провайдера.
   *
   * @return void
   */
  
public function register()
  {
    
//
  
}
}
+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

Метод PHPfailed() класса задачи

Для более точного контроля, вы можете определить метод PHPfailed() непосредственно в классе задачи, это позволит выполнять специфичное для задачи действие при возникновении ошибки:

PHP
<?php

namespace App\Jobs;

use 
App\Jobs\Job;
use 
Illuminate\Contracts\Mail\Mailer;
use 
Illuminate\Queue\SerializesModels;
use 
Illuminate\Queue\InteractsWithQueue;
use 
Illuminate\Contracts\Queue\ShouldQueue;

//для версии 5.1 и ранее:
//use Illuminate\Contracts\Bus\SelfHandling;
//
//class SendReminderEmail extends Job implements SelfHandling, ShouldQueue

class SendReminderEmail extends Job implements ShouldQueue
{
  use 
InteractsWithQueueSerializesModels;

  
/**
   * Выполнение задачи.
   *
   * @param  Mailer  $mailer
   * @return void
   */
  
public function handle(Mailer $mailer)
  {
    
//
  
}

  
/**
   * Обработка ошибки задачи.
   *
   * @return void
   */
  
public function failed()
  {
    
// Вызывается при ошибке в задаче...
  
}
}
+ 5.0

добавлено в 5.0 ()

Если задача не само-обрабатываемая, а имеет отдельный класс-обработчик, то метод PHPfailed() должен быть определён именно там.

Повторный запуск проваленных задач

Чтобы просмотреть все проваленные задачи, которые были помещены в вашу таблицу failed_jobs, можно использовать Artisan-команду shqueue:failed:

shphp artisan queue:failed

Эта команда выведет список задач с их ID, подключением, очередью и временем ошибки. ID задачи можно использовать для повторной попытки её выполнения. Например, для повторной попытки выполнения задачи с ID = 5 выполните такую команду:

shphp artisan queue:retry 5

Чтобы повторить все проваленные задачи, выполните команду shqueue:retry указав shall в качестве ID:

shphp artisan queue:retry all

Если вы хотите удалить проваленную задачу, используйте команду shqueue:forget:

shphp artisan queue:forget 5

Для удаления всех проваленных задач используйте команду shqueue:flush:

shphp artisan queue:flush

События задач

+ 5.3

добавлено в 5.3 ()

Методы PHPbefore() и PHPafter() фасада Queue позволяют указать обратные вызовы, которые будут выполнены до выполнения задачи или после успешного выполнения задачи. Эти обратные вызовы — отличная возможность для выполнения дополнительного журналирования или увеличения статистики для панели управления. Обычно эти методы вызываются из сервис-провайдера. Например, мы можем использовать AppServiceProvider, который включён в Laravel:

PHP
<?php

namespace App\Providers;

use 
Illuminate\Support\Facades\Queue;
use 
Illuminate\Support\ServiceProvider;
use 
Illuminate\Queue\Events\JobProcessed;
use 
Illuminate\Queue\Events\JobProcessing;

class 
AppServiceProvider extends ServiceProvider
{
  
/**
   * Начальная загрузка всех сервисов приложения.
   *
   * @return void
   */
  
public function boot()
  {
    
Queue::before(function (JobProcessing $event) {
      
// $event->connectionName
      // $event->job
      // $event->job->payload()
    
});

    
Queue::after(function (JobProcessed $event) {
      
// $event->connectionName
      // $event->job
      // $event->job->payload()
    
});
  }

  
/**
   * Регистрация сервис-провайдера.
   *
   * @return void
   */
  
public function register()
  {
    
//
  
}
    }

С помощью метода PHPlooping() фасада Queue вы можете указать обратные вызовы, которые будут выполнены перед тем, как обработчик попытается получить задачу из очереди. Например, вы можете зарегистрировать замыкание для отката всех транзакций, которые были оставлены открытыми предыдущей проваленной задачей:

PHP
Queue::looping(function () {
  while (
DB::transactionLevel() > 0) {
    
DB::rollBack();
  }
});
+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

События выполнения задачи

Методы PHPQueue::before() и PHPQueue::after() позволяют зарегистрировать обратный вызов, который будет выполнен до выполнения задачи или после успешного выполнения задачи. Обратные вызовы — отличная возможность для выполнения дополнительного журналирования, помещения в очередь последующей задачи, или увеличения статистики для панели управления. Например, мы можем прикрепить обратный вызов к этому событию из AppServiceProvider, который включён в Laravel:

PHP
<?php

namespace App\Providers;

use 
Queue;
use 
Illuminate\Support\ServiceProvider;
//для версии 5.2 и выше:
use Illuminate\Queue\Events\JobProcessed;

class 
AppServiceProvider extends ServiceProvider
{
  
/**
   * Начальная загрузка всех сервисов приложения.
   *
   * @return void
   */
  
public function boot()
  {
    
//для версии 5.2 и выше:
    
Queue::after(function (JobProcessed $event) {
      
// $event->connectionName
      // $event->job
      // $event->data

    //для версии 5.1 и ранее:
    //Queue::after(function ($connection, $job, $data) {
    //  //
    
});
  }

  
/**
   * Регистрация сервис-провайдера.
   *
   * @return void
   */
  
public function register()
  {
    
//
  
}
}

Запуск слушателя очереди

Запуск слушателя очереди

Laravel включает в себя Artisan-команду, которая будет выполнять новые задачи по мере их поступления. Вы можете запустить слушателя командой shqueue:listen:
%%(sh)
php artisan queue:listen
%%

+ 5.2

добавлено в 5.2 ()

Вы также можете указать, какое именно соединение должно прослушиваться:

shphp artisan queue:listen connection-name
+ 5.1 5.0

добавлено в 5.1 () 5.0 ()

shphp artisan queue:listen connection
+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

Заметьте, что когда это задание запущено, оно будет продолжать работать, пока вы не остановите его вручную. Вы можете использовать монитор процессов, такой как Supervisor, чтобы удостовериться, что задание продолжает работать.

Приоритеты очереди

Вы можете передать команде shlisten список подключений к очереди через запятую, чтобы задать приоритеты для очереди:

shphp artisan queue:listen --queue=high,low

В этом примере задачи из подключения high-connection всегда будут обрабатывать перед задачами из low-connection.

Указание времени на выполнение задачи

Кроме этого вы можете указать число секунд, в течение которых будет выполняться каждая задача:

shphp artisan queue:listen --timeout=60

Указание перерыва между задачами

Также вы можете задать число секунд ожидания перед проверкой наличия новых задач в очереди:

shphp artisan queue:listen --sleep=5

Обратите внимание, что очередь «засыпает» только тогда, когда в ней нет задач. Если задачи есть, очередь будет продолжать обрабатывать их без перерыва.

+ 5.2 5.0

добавлено в 5.2 () 5.0 ()

Обработка только первой задачи в очереди

Для обработки только первой задачи можно использовать команду shqueue:work:

shphp artisan queue:work
+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

Демон-обработчик очереди

Команда shqueue:work также имеет опцию sh--daemon для того, чтобы обработчик принудительно продолжал обрабатывать задачи без необходимости повторной загрузки фреймворка. Это приводит к значительному снижению использования CPU по сравнению с командой shqueue:listen.

Для запуска обработчика очереди в режиме демона используйте флаг sh--daemon:

+ 5.2

добавлено в 5.2 ()

shphp artisan queue:work connection-name --daemon

php artisan queue:work connection-name --daemon --sleep=3

php artisan queue:work connection-name --daemon --sleep=3 --tries=3
+ 5.1 5.0

добавлено в 5.1 () 5.0 ()

shphp artisan queue:work connection --daemon

php artisan queue:work connection --daemon --sleep=3

php artisan queue:work connection --daemon --sleep=3 --tries=3
+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

Как видите, команда shqueue:work поддерживает те же самые опции, что и команда shqueue:listen. Для просмотра всех доступных опций используйте команду shphp artisan help queue:work.

Написание кода для демонов-обработчиков очереди

Демоны-обработчики очереди не перезапускают фреймворк перед обработкой каждой задачи. Поэтому нужно быть аккуратным при освобождении любых «тяжёлых» ресурсов до того, как закончится выполнение вашей задачи. Например, если вы обрабатываете изображение с помощью библиотеки GD, то после завершения обработки вам необходимо освобождать память с помощью PHPimagedestroy().

+ 5.1 5.0

добавлено в 5.1 () 5.0 ()

Также ваше соединение с БД может быть потеряно, когда оно используется демоном долгое время. Вы можете использовать метод PHPDB::reconnect(), для обеспечения «свежего» соединения.

+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

Развёртывание с помощью демонов-обработчиков очереди

Поскольку демоны-обработчики очереди — длительные процессы, они не подхватывают изменения в коде без перезапуска. Поэтому простейший способ развернуть приложение, используя обработчики очереди, — перезапустить их во время выполнения скрипта развёртывания. Вы можете изящно перезапустить все обработчики, включив следующую команду в ваш скрипт для развёртывания:

shphp artisan queue:restart
+ 5.2

добавлено в 5.2 ()

Эта команда сообщит всем обработчикам очереди о необходимости «умереть» после завершения обработки их текущих задач, поэтому текущие задачи не пропадут. Не забывайте, что обработчики очереди умрут при выполнении команды shqueue:restart, поэтому необходимо запустить менеджер процессов, такой как Supervisor, который автоматически перезапустит обработчики задач.

+ 5.1 5.0

добавлено в 5.1 () 5.0 ()

Эта команда сообщит всем обработчикам очереди о необходимости перезапуска после завершения обработки их текущих задач, поэтому текущие задачи не пропадут.

+ 5.2 5.1 5.0

добавлено в 5.2 () 5.1 () 5.0 ()

Эта команда полагается на систему кэша для планирования перезапуска. По умолчанию APCu не работает для терминальных задач. Если вы используете APCu, добавьте confapc.enable_cli=1 в свою конфигурацию для APCu.

+ 5.0

добавлено в 5.0 ()

Push-очереди

Push-очереди дают вам доступ ко всем мощным возможностям, предоставляемым подсистемой очередей Laravel 5, без запуска демонов и фоновых слушателей. На текущий момент push-очереди поддерживает только драйвер Iron.io. Перед тем, как начать, создайте аккаунт и впишите его данные в config/queue.php.

Регистрация подписчика push-очереди

После этого вы можете использовать Artisan-команду shqueue:subscribe для регистрации URL конечной точки, которая будет получать добавляемые в очередь задачи:

shphp artisan queue:subscribe queue_name queue/receive

php artisan queue:subscribe queue_name http://foo.com/queue/receive

Теперь, когда вы войдёте в ваш профиль Iron.io, то увидите новую push-очередь и URL её подписки. Вы можете подписать любое число URL на одну очередь. Дальше создайте маршрут для вашей конечной точки queue/receive, и пусть он возвращает результат вызова метода PHPQueue::marshal():

PHP
Route::post('queue/receive', function()
{
  return 
Queue::marshal();
});

Этот метод позаботится о вызове нужного класса-обработчика задачи. Для помещения задач в push-очередь просто используйте всё тот же метод PHPQueue::push(), который работает и для обычных очередей.

Комментарии (3)

Sawa4

Событие завершения задачи

PHP
Queue::failing(function ($connection$job$data) {
      
// Оповещение команды о проваленной задаче...
    
});

Ребята подскажите что и откуда приходит в переменных $connection, $job, $data?

WarLight

При переводе, видимо, потеряли sync:
а также синхронный драйвер (для локального использования)
Перевели — хорошо. Но название драйвера тоже наверное хорошо бы указать)

mike4921

Что значит само-обрабатываемая задача?

Написать комментарий

Разметка: ? ?

Авторизуйся, чтобы прокомментировать.