class Queue8

  1. 8.0.x lib/Drush/Queue/Queue8.php Queue8
  2. 7.x lib/Drush/Queue/Queue8.php Queue8
  3. master lib/Drush/Queue/Queue8.php Queue8

Namespace

Drush\Queue

Hierarchy

Expanded class hierarchy of Queue8

Members

Contains filters are case sensitive
Namesort descending Modifiers Type Description
Queue8::$workerManager protected property
Queue8::getQueue public function Overrides QueueInterface::getQueue
Queue8::getQueues public function Returns all queues. Overrides QueueInterface::getQueues
Queue8::run public function Runs a given queue. Overrides QueueInterface::run
Queue8::__construct public function Set the queue worker manager.
QueueBase::$queues protected static property Keep track of queue definitions.
QueueBase::getInfo public function Returns a given queue definition. Overrides QueueInterface::getInfo
QueueBase::listQueues public function Lists all available queues.

File

lib/Drush/Queue/Queue8.php, line 10

View source
class Queue8 extends QueueBase {

  /**
   * @var \Drupal\Core\Queue\QueueWorkerManager
   */
  protected $workerManager;

  /**
   * Set the queue worker manager.
   */
  public function __construct(QueueWorkerManager $manager = NULL) {
    $this->workerManager = $manager ? : \Drupal::service('plugin.manager.queue_worker');
  }

  /**
   * {@inheritdoc}
   */
  public function getQueues() {
    if (!isset(static::$queues)) {
      static::$queues = array();
      foreach ($this->workerManager->getDefinitions() as $name => $info) {
        static::$queues[$name] = $info;
      }
    }
    return static::$queues;
  }

  /**
   * {@inheritdoc}
   *
   * @return \Drupal\Core\Queue\QueueInterface
   */
  public function getQueue($name) {
    return \Drupal::queue($name);
  }

  /**
   * {@inheritdoc}
   */
  public function run($name, $time_limit = 0) {
    $worker = $this->workerManager->createInstance($name);
    $end = time() + $time_limit;
    $queue = $this->getQueue($name);
    $count = 0;

    while ((!$time_limit || time() < $end) && ($item = $queue->claimItem())) {
      try {
        drush_log(dt('Processing item @id from @name queue.', array('@name' => $name, 'id' => $item->item_id)), LogLevel::INFO);
        $worker->processItem($item->data);
        $queue->deleteItem($item);
        $count++;
      }
      catch (RequeueException $e) {
        // The worker requested the task to be immediately requeued.
        $queue->releaseItem($item);
      }
      catch (SuspendQueueException $e) {
        // If the worker indicates there is a problem with the whole queue,
        // release the item and skip to the next queue.
        $queue->releaseItem($item);
        drush_set_error('DRUSH_SUSPEND_QUEUE_EXCEPTION', $e->getMessage());
      }
      catch (\Exception $e) {
        // In case of any other kind of exception, log it and leave the item
        // in the queue to be processed again later.
        drush_set_error('DRUSH_QUEUE_EXCEPTION', $e->getMessage());
      }
    }

    return $count;
  }

}