cms/drupal/modules/system/system.queue.inc
changeset 541 e756a8c72c3d
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cms/drupal/modules/system/system.queue.inc	Fri Sep 08 12:04:06 2017 +0200
@@ -0,0 +1,371 @@
+<?php
+
+/**
+ * @file
+ * Queue functionality.
+ */
+
+/**
+ * @defgroup queue Queue operations
+ * @{
+ * Queue items to allow later processing.
+ *
+ * The queue system allows placing items in a queue and processing them later.
+ * The system tries to ensure that only one consumer can process an item.
+ *
+ * Before a queue can be used it needs to be created by
+ * DrupalQueueInterface::createQueue().
+ *
+ * Items can be added to the queue by passing an arbitrary data object to
+ * DrupalQueueInterface::createItem().
+ *
+ * To process an item, call DrupalQueueInterface::claimItem() and specify how
+ * long you want to have a lease for working on that item. When finished
+ * processing, the item needs to be deleted by calling
+ * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
+ * made available again by the DrupalQueueInterface implementation once the
+ * lease expires. Another consumer will then be able to receive it when calling
+ * DrupalQueueInterface::claimItem(). Due to this, the processing code should
+ * be aware that an item might be handed over for processing more than once.
+ *
+ * The $item object used by the DrupalQueueInterface can contain arbitrary
+ * metadata depending on the implementation. Systems using the interface should
+ * only rely on the data property which will contain the information passed to
+ * DrupalQueueInterface::createItem(). The full queue item returned by
+ * DrupalQueueInterface::claimItem() needs to be passed to
+ * DrupalQueueInterface::deleteItem() once processing is completed.
+ *
+ * There are two kinds of queue backends available: reliable, which preserves
+ * the order of messages and guarantees that every item will be executed at
+ * least once. The non-reliable kind only does a best effort to preserve order
+ * in messages and to execute them at least once but there is a small chance
+ * that some items get lost. For example, some distributed back-ends like
+ * Amazon SQS will be managing jobs for a large set of producers and consumers
+ * where a strict FIFO ordering will likely not be preserved. Another example
+ * would be an in-memory queue backend which might lose items if it crashes.
+ * However, such a backend would be able to deal with significantly more writes
+ * than a reliable queue and for many tasks this is more important. See
+ * aggregator_cron() for an example of how to effectively utilize a
+ * non-reliable queue. Another example is doing Twitter statistics -- the small
+ * possibility of losing a few items is insignificant next to power of the
+ * queue being able to keep up with writes. As described in the processing
+ * section, regardless of the queue being reliable or not, the processing code
+ * should be aware that an item might be handed over for processing more than
+ * once (because the processing code might time out before it finishes).
+ */
+
+/**
+ * Factory class for interacting with queues.
+ */
+class DrupalQueue {
+  /**
+   * Returns the queue object for a given name.
+   *
+   * The following variables can be set by variable_set or $conf overrides:
+   * - queue_class_$name: the class to be used for the queue $name.
+   * - queue_default_class: the class to use when queue_class_$name is not
+   *   defined. Defaults to SystemQueue, a reliable backend using SQL.
+   * - queue_default_reliable_class: the class to use when queue_class_$name is
+   *   not defined and the queue_default_class is not reliable. Defaults to
+   *   SystemQueue.
+   *
+   * @param $name
+   *   Arbitrary string. The name of the queue to work with.
+   * @param $reliable
+   *   TRUE if the ordering of items and guaranteeing every item executes at
+   *   least once is important, FALSE if scalability is the main concern.
+   *
+   * @return
+   *   The queue object for a given name.
+   */
+  public static function get($name, $reliable = FALSE) {
+    static $queues;
+    if (!isset($queues[$name])) {
+      $class = variable_get('queue_class_' . $name, NULL);
+      if (!$class) {
+        $class = variable_get('queue_default_class', 'SystemQueue');
+      }
+      $object = new $class($name);
+      if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
+        $class = variable_get('queue_default_reliable_class', 'SystemQueue');
+        $object = new $class($name);
+      }
+      $queues[$name] = $object;
+    }
+    return $queues[$name];
+  }
+}
+
+interface DrupalQueueInterface {
+
+  /**
+   * Add a queue item and store it directly to the queue.
+   *
+   * @param $data
+   *   Arbitrary data to be associated with the new task in the queue.
+   * @return
+   *   TRUE if the item was successfully created and was (best effort) added
+   *   to the queue, otherwise FALSE. We don't guarantee the item was
+   *   committed to disk etc, but as far as we know, the item is now in the
+   *   queue.
+   */
+  public function createItem($data);
+
+  /**
+   * Retrieve the number of items in the queue.
+   *
+   * This is intended to provide a "best guess" count of the number of items in
+   * the queue. Depending on the implementation and the setup, the accuracy of
+   * the results of this function may vary.
+   *
+   * e.g. On a busy system with a large number of consumers and items, the
+   * result might only be valid for a fraction of a second and not provide an
+   * accurate representation.
+   *
+   * @return
+   *   An integer estimate of the number of items in the queue.
+   */
+  public function numberOfItems();
+
+  /**
+   * Claim an item in the queue for processing.
+   *
+   * @param $lease_time
+   *   How long the processing is expected to take in seconds, defaults to an
+   *   hour. After this lease expires, the item will be reset and another
+   *   consumer can claim the item. For idempotent tasks (which can be run
+   *   multiple times without side effects), shorter lease times would result
+   *   in lower latency in case a consumer fails. For tasks that should not be
+   *   run more than once (non-idempotent), a larger lease time will make it
+   *   more rare for a given task to run multiple times in cases of failure,
+   *   at the cost of higher latency.
+   * @return
+   *   On success we return an item object. If the queue is unable to claim an
+   *   item it returns false. This implies a best effort to retrieve an item
+   *   and either the queue is empty or there is some other non-recoverable
+   *   problem.
+   */
+  public function claimItem($lease_time = 3600);
+
+  /**
+   * Delete a finished item from the queue.
+   *
+   * @param $item
+   *   The item returned by DrupalQueueInterface::claimItem().
+   */
+  public function deleteItem($item);
+
+  /**
+   * Release an item that the worker could not process, so another
+   * worker can come in and process it before the timeout expires.
+   *
+   * @param $item
+   * @return boolean
+   */
+  public function releaseItem($item);
+
+  /**
+   * Create a queue.
+   *
+   * Called during installation and should be used to perform any necessary
+   * initialization operations. This should not be confused with the
+   * constructor for these objects, which is called every time an object is
+   * instantiated to operate on a queue. This operation is only needed the
+   * first time a given queue is going to be initialized (for example, to make
+   * a new database table or directory to hold tasks for the queue -- it
+   * depends on the queue implementation if this is necessary at all).
+   */
+  public function createQueue();
+
+  /**
+   * Delete a queue and every item in the queue.
+   */
+  public function deleteQueue();
+}
+
+/**
+ * Reliable queue interface.
+ *
+ * Classes implementing this interface preserve the order of messages and
+ * guarantee that every item will be executed at least once.
+ */
+interface DrupalReliableQueueInterface extends DrupalQueueInterface {
+}
+
+/**
+ * Default queue implementation.
+ */
+class SystemQueue implements DrupalReliableQueueInterface {
+  /**
+   * The name of the queue this instance is working with.
+   *
+   * @var string
+   */
+  protected $name;
+
+  public function __construct($name) {
+    $this->name = $name;
+  }
+
+  public function createItem($data) {
+    // During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain
+    // the queue table yet, so we cannot rely on drupal_write_record().
+    $query = db_insert('queue')
+      ->fields(array(
+        'name' => $this->name,
+        'data' => serialize($data),
+        // We cannot rely on REQUEST_TIME because many items might be created
+        // by a single request which takes longer than 1 second.
+        'created' => time(),
+      ));
+    return (bool) $query->execute();
+  }
+
+  public function numberOfItems() {
+    return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
+  }
+
+  public function claimItem($lease_time = 30) {
+    // Claim an item by updating its expire fields. If claim is not successful
+    // another thread may have claimed the item in the meantime. Therefore loop
+    // until an item is successfully claimed or we are reasonably sure there
+    // are no unclaimed items left.
+    while (TRUE) {
+      $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
+      if ($item) {
+        // Try to update the item. Only one thread can succeed in UPDATEing the
+        // same row. We cannot rely on REQUEST_TIME because items might be
+        // claimed by a single consumer which runs longer than 1 second. If we
+        // continue to use REQUEST_TIME instead of the current time(), we steal
+        // time from the lease, and will tend to reset items before the lease
+        // should really expire.
+        $update = db_update('queue')
+          ->fields(array(
+            'expire' => time() + $lease_time,
+          ))
+          ->condition('item_id', $item->item_id)
+          ->condition('expire', 0);
+        // If there are affected rows, this update succeeded.
+        if ($update->execute()) {
+          $item->data = unserialize($item->data);
+          return $item;
+        }
+      }
+      else {
+        // No items currently available to claim.
+        return FALSE;
+      }
+    }
+  }
+
+  public function releaseItem($item) {
+    $update = db_update('queue')
+      ->fields(array(
+        'expire' => 0,
+      ))
+      ->condition('item_id', $item->item_id);
+      return $update->execute();
+  }
+
+  public function deleteItem($item) {
+    db_delete('queue')
+      ->condition('item_id', $item->item_id)
+      ->execute();
+  }
+
+  public function createQueue() {
+    // All tasks are stored in a single database table (which is created when
+    // Drupal is first installed) so there is nothing we need to do to create
+    // a new queue.
+  }
+
+  public function deleteQueue() {
+    db_delete('queue')
+      ->condition('name', $this->name)
+      ->execute();
+  }
+}
+
+/**
+ * Static queue implementation.
+ *
+ * This allows "undelayed" variants of processes relying on the Queue
+ * interface. The queue data resides in memory. It should only be used for
+ * items that will be queued and dequeued within a given page request.
+ */
+class MemoryQueue implements DrupalQueueInterface {
+  /**
+   * The queue data.
+   *
+   * @var array
+   */
+  protected $queue;
+
+  /**
+   * Counter for item ids.
+   *
+   * @var int
+   */
+  protected $id_sequence;
+
+  /**
+   * Start working with a queue.
+   *
+   * @param $name
+   *   Arbitrary string. The name of the queue to work with.
+   */
+  public function __construct($name) {
+    $this->queue = array();
+    $this->id_sequence = 0;
+  }
+
+  public function createItem($data) {
+    $item = new stdClass();
+    $item->item_id = $this->id_sequence++;
+    $item->data = $data;
+    $item->created = time();
+    $item->expire = 0;
+    $this->queue[$item->item_id] = $item;
+    return TRUE;
+  }
+
+  public function numberOfItems() {
+    return count($this->queue);
+  }
+
+  public function claimItem($lease_time = 30) {
+    foreach ($this->queue as $key => $item) {
+      if ($item->expire == 0) {
+        $item->expire = time() + $lease_time;
+        $this->queue[$key] = $item;
+        return $item;
+      }
+    }
+    return FALSE;
+  }
+
+  public function deleteItem($item) {
+    unset($this->queue[$item->item_id]);
+  }
+
+  public function releaseItem($item) {
+    if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
+      $this->queue[$item->item_id]->expire = 0;
+      return TRUE;
+    }
+    return FALSE;
+  }
+
+  public function createQueue() {
+    // Nothing needed here.
+  }
+
+  public function deleteQueue() {
+    $this->queue = array();
+    $this->id_sequence = 0;
+  }
+}
+
+/**
+ * @} End of "defgroup queue".
+ */