cms/drupal/modules/system/system.queue.inc
changeset 541 e756a8c72c3d
equal deleted inserted replaced
540:07239de796bb 541:e756a8c72c3d
       
     1 <?php
       
     2 
       
     3 /**
       
     4  * @file
       
     5  * Queue functionality.
       
     6  */
       
     7 
       
     8 /**
       
     9  * @defgroup queue Queue operations
       
    10  * @{
       
    11  * Queue items to allow later processing.
       
    12  *
       
    13  * The queue system allows placing items in a queue and processing them later.
       
    14  * The system tries to ensure that only one consumer can process an item.
       
    15  *
       
    16  * Before a queue can be used it needs to be created by
       
    17  * DrupalQueueInterface::createQueue().
       
    18  *
       
    19  * Items can be added to the queue by passing an arbitrary data object to
       
    20  * DrupalQueueInterface::createItem().
       
    21  *
       
    22  * To process an item, call DrupalQueueInterface::claimItem() and specify how
       
    23  * long you want to have a lease for working on that item. When finished
       
    24  * processing, the item needs to be deleted by calling
       
    25  * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
       
    26  * made available again by the DrupalQueueInterface implementation once the
       
    27  * lease expires. Another consumer will then be able to receive it when calling
       
    28  * DrupalQueueInterface::claimItem(). Due to this, the processing code should
       
    29  * be aware that an item might be handed over for processing more than once.
       
    30  *
       
    31  * The $item object used by the DrupalQueueInterface can contain arbitrary
       
    32  * metadata depending on the implementation. Systems using the interface should
       
    33  * only rely on the data property which will contain the information passed to
       
    34  * DrupalQueueInterface::createItem(). The full queue item returned by
       
    35  * DrupalQueueInterface::claimItem() needs to be passed to
       
    36  * DrupalQueueInterface::deleteItem() once processing is completed.
       
    37  *
       
    38  * There are two kinds of queue backends available: reliable, which preserves
       
    39  * the order of messages and guarantees that every item will be executed at
       
    40  * least once. The non-reliable kind only does a best effort to preserve order
       
    41  * in messages and to execute them at least once but there is a small chance
       
    42  * that some items get lost. For example, some distributed back-ends like
       
    43  * Amazon SQS will be managing jobs for a large set of producers and consumers
       
    44  * where a strict FIFO ordering will likely not be preserved. Another example
       
    45  * would be an in-memory queue backend which might lose items if it crashes.
       
    46  * However, such a backend would be able to deal with significantly more writes
       
    47  * than a reliable queue and for many tasks this is more important. See
       
    48  * aggregator_cron() for an example of how to effectively utilize a
       
    49  * non-reliable queue. Another example is doing Twitter statistics -- the small
       
    50  * possibility of losing a few items is insignificant next to power of the
       
    51  * queue being able to keep up with writes. As described in the processing
       
    52  * section, regardless of the queue being reliable or not, the processing code
       
    53  * should be aware that an item might be handed over for processing more than
       
    54  * once (because the processing code might time out before it finishes).
       
    55  */
       
    56 
       
    57 /**
       
    58  * Factory class for interacting with queues.
       
    59  */
       
    60 class DrupalQueue {
       
    61   /**
       
    62    * Returns the queue object for a given name.
       
    63    *
       
    64    * The following variables can be set by variable_set or $conf overrides:
       
    65    * - queue_class_$name: the class to be used for the queue $name.
       
    66    * - queue_default_class: the class to use when queue_class_$name is not
       
    67    *   defined. Defaults to SystemQueue, a reliable backend using SQL.
       
    68    * - queue_default_reliable_class: the class to use when queue_class_$name is
       
    69    *   not defined and the queue_default_class is not reliable. Defaults to
       
    70    *   SystemQueue.
       
    71    *
       
    72    * @param $name
       
    73    *   Arbitrary string. The name of the queue to work with.
       
    74    * @param $reliable
       
    75    *   TRUE if the ordering of items and guaranteeing every item executes at
       
    76    *   least once is important, FALSE if scalability is the main concern.
       
    77    *
       
    78    * @return
       
    79    *   The queue object for a given name.
       
    80    */
       
    81   public static function get($name, $reliable = FALSE) {
       
    82     static $queues;
       
    83     if (!isset($queues[$name])) {
       
    84       $class = variable_get('queue_class_' . $name, NULL);
       
    85       if (!$class) {
       
    86         $class = variable_get('queue_default_class', 'SystemQueue');
       
    87       }
       
    88       $object = new $class($name);
       
    89       if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
       
    90         $class = variable_get('queue_default_reliable_class', 'SystemQueue');
       
    91         $object = new $class($name);
       
    92       }
       
    93       $queues[$name] = $object;
       
    94     }
       
    95     return $queues[$name];
       
    96   }
       
    97 }
       
    98 
       
    99 interface DrupalQueueInterface {
       
   100 
       
   101   /**
       
   102    * Add a queue item and store it directly to the queue.
       
   103    *
       
   104    * @param $data
       
   105    *   Arbitrary data to be associated with the new task in the queue.
       
   106    * @return
       
   107    *   TRUE if the item was successfully created and was (best effort) added
       
   108    *   to the queue, otherwise FALSE. We don't guarantee the item was
       
   109    *   committed to disk etc, but as far as we know, the item is now in the
       
   110    *   queue.
       
   111    */
       
   112   public function createItem($data);
       
   113 
       
   114   /**
       
   115    * Retrieve the number of items in the queue.
       
   116    *
       
   117    * This is intended to provide a "best guess" count of the number of items in
       
   118    * the queue. Depending on the implementation and the setup, the accuracy of
       
   119    * the results of this function may vary.
       
   120    *
       
   121    * e.g. On a busy system with a large number of consumers and items, the
       
   122    * result might only be valid for a fraction of a second and not provide an
       
   123    * accurate representation.
       
   124    *
       
   125    * @return
       
   126    *   An integer estimate of the number of items in the queue.
       
   127    */
       
   128   public function numberOfItems();
       
   129 
       
   130   /**
       
   131    * Claim an item in the queue for processing.
       
   132    *
       
   133    * @param $lease_time
       
   134    *   How long the processing is expected to take in seconds, defaults to an
       
   135    *   hour. After this lease expires, the item will be reset and another
       
   136    *   consumer can claim the item. For idempotent tasks (which can be run
       
   137    *   multiple times without side effects), shorter lease times would result
       
   138    *   in lower latency in case a consumer fails. For tasks that should not be
       
   139    *   run more than once (non-idempotent), a larger lease time will make it
       
   140    *   more rare for a given task to run multiple times in cases of failure,
       
   141    *   at the cost of higher latency.
       
   142    * @return
       
   143    *   On success we return an item object. If the queue is unable to claim an
       
   144    *   item it returns false. This implies a best effort to retrieve an item
       
   145    *   and either the queue is empty or there is some other non-recoverable
       
   146    *   problem.
       
   147    */
       
   148   public function claimItem($lease_time = 3600);
       
   149 
       
   150   /**
       
   151    * Delete a finished item from the queue.
       
   152    *
       
   153    * @param $item
       
   154    *   The item returned by DrupalQueueInterface::claimItem().
       
   155    */
       
   156   public function deleteItem($item);
       
   157 
       
   158   /**
       
   159    * Release an item that the worker could not process, so another
       
   160    * worker can come in and process it before the timeout expires.
       
   161    *
       
   162    * @param $item
       
   163    * @return boolean
       
   164    */
       
   165   public function releaseItem($item);
       
   166 
       
   167   /**
       
   168    * Create a queue.
       
   169    *
       
   170    * Called during installation and should be used to perform any necessary
       
   171    * initialization operations. This should not be confused with the
       
   172    * constructor for these objects, which is called every time an object is
       
   173    * instantiated to operate on a queue. This operation is only needed the
       
   174    * first time a given queue is going to be initialized (for example, to make
       
   175    * a new database table or directory to hold tasks for the queue -- it
       
   176    * depends on the queue implementation if this is necessary at all).
       
   177    */
       
   178   public function createQueue();
       
   179 
       
   180   /**
       
   181    * Delete a queue and every item in the queue.
       
   182    */
       
   183   public function deleteQueue();
       
   184 }
       
   185 
       
   186 /**
       
   187  * Reliable queue interface.
       
   188  *
       
   189  * Classes implementing this interface preserve the order of messages and
       
   190  * guarantee that every item will be executed at least once.
       
   191  */
       
   192 interface DrupalReliableQueueInterface extends DrupalQueueInterface {
       
   193 }
       
   194 
       
   195 /**
       
   196  * Default queue implementation.
       
   197  */
       
   198 class SystemQueue implements DrupalReliableQueueInterface {
       
   199   /**
       
   200    * The name of the queue this instance is working with.
       
   201    *
       
   202    * @var string
       
   203    */
       
   204   protected $name;
       
   205 
       
   206   public function __construct($name) {
       
   207     $this->name = $name;
       
   208   }
       
   209 
       
   210   public function createItem($data) {
       
   211     // During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain
       
   212     // the queue table yet, so we cannot rely on drupal_write_record().
       
   213     $query = db_insert('queue')
       
   214       ->fields(array(
       
   215         'name' => $this->name,
       
   216         'data' => serialize($data),
       
   217         // We cannot rely on REQUEST_TIME because many items might be created
       
   218         // by a single request which takes longer than 1 second.
       
   219         'created' => time(),
       
   220       ));
       
   221     return (bool) $query->execute();
       
   222   }
       
   223 
       
   224   public function numberOfItems() {
       
   225     return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
       
   226   }
       
   227 
       
   228   public function claimItem($lease_time = 30) {
       
   229     // Claim an item by updating its expire fields. If claim is not successful
       
   230     // another thread may have claimed the item in the meantime. Therefore loop
       
   231     // until an item is successfully claimed or we are reasonably sure there
       
   232     // are no unclaimed items left.
       
   233     while (TRUE) {
       
   234       $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
       
   235       if ($item) {
       
   236         // Try to update the item. Only one thread can succeed in UPDATEing the
       
   237         // same row. We cannot rely on REQUEST_TIME because items might be
       
   238         // claimed by a single consumer which runs longer than 1 second. If we
       
   239         // continue to use REQUEST_TIME instead of the current time(), we steal
       
   240         // time from the lease, and will tend to reset items before the lease
       
   241         // should really expire.
       
   242         $update = db_update('queue')
       
   243           ->fields(array(
       
   244             'expire' => time() + $lease_time,
       
   245           ))
       
   246           ->condition('item_id', $item->item_id)
       
   247           ->condition('expire', 0);
       
   248         // If there are affected rows, this update succeeded.
       
   249         if ($update->execute()) {
       
   250           $item->data = unserialize($item->data);
       
   251           return $item;
       
   252         }
       
   253       }
       
   254       else {
       
   255         // No items currently available to claim.
       
   256         return FALSE;
       
   257       }
       
   258     }
       
   259   }
       
   260 
       
   261   public function releaseItem($item) {
       
   262     $update = db_update('queue')
       
   263       ->fields(array(
       
   264         'expire' => 0,
       
   265       ))
       
   266       ->condition('item_id', $item->item_id);
       
   267       return $update->execute();
       
   268   }
       
   269 
       
   270   public function deleteItem($item) {
       
   271     db_delete('queue')
       
   272       ->condition('item_id', $item->item_id)
       
   273       ->execute();
       
   274   }
       
   275 
       
   276   public function createQueue() {
       
   277     // All tasks are stored in a single database table (which is created when
       
   278     // Drupal is first installed) so there is nothing we need to do to create
       
   279     // a new queue.
       
   280   }
       
   281 
       
   282   public function deleteQueue() {
       
   283     db_delete('queue')
       
   284       ->condition('name', $this->name)
       
   285       ->execute();
       
   286   }
       
   287 }
       
   288 
       
   289 /**
       
   290  * Static queue implementation.
       
   291  *
       
   292  * This allows "undelayed" variants of processes relying on the Queue
       
   293  * interface. The queue data resides in memory. It should only be used for
       
   294  * items that will be queued and dequeued within a given page request.
       
   295  */
       
   296 class MemoryQueue implements DrupalQueueInterface {
       
   297   /**
       
   298    * The queue data.
       
   299    *
       
   300    * @var array
       
   301    */
       
   302   protected $queue;
       
   303 
       
   304   /**
       
   305    * Counter for item ids.
       
   306    *
       
   307    * @var int
       
   308    */
       
   309   protected $id_sequence;
       
   310 
       
   311   /**
       
   312    * Start working with a queue.
       
   313    *
       
   314    * @param $name
       
   315    *   Arbitrary string. The name of the queue to work with.
       
   316    */
       
   317   public function __construct($name) {
       
   318     $this->queue = array();
       
   319     $this->id_sequence = 0;
       
   320   }
       
   321 
       
   322   public function createItem($data) {
       
   323     $item = new stdClass();
       
   324     $item->item_id = $this->id_sequence++;
       
   325     $item->data = $data;
       
   326     $item->created = time();
       
   327     $item->expire = 0;
       
   328     $this->queue[$item->item_id] = $item;
       
   329     return TRUE;
       
   330   }
       
   331 
       
   332   public function numberOfItems() {
       
   333     return count($this->queue);
       
   334   }
       
   335 
       
   336   public function claimItem($lease_time = 30) {
       
   337     foreach ($this->queue as $key => $item) {
       
   338       if ($item->expire == 0) {
       
   339         $item->expire = time() + $lease_time;
       
   340         $this->queue[$key] = $item;
       
   341         return $item;
       
   342       }
       
   343     }
       
   344     return FALSE;
       
   345   }
       
   346 
       
   347   public function deleteItem($item) {
       
   348     unset($this->queue[$item->item_id]);
       
   349   }
       
   350 
       
   351   public function releaseItem($item) {
       
   352     if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
       
   353       $this->queue[$item->item_id]->expire = 0;
       
   354       return TRUE;
       
   355     }
       
   356     return FALSE;
       
   357   }
       
   358 
       
   359   public function createQueue() {
       
   360     // Nothing needed here.
       
   361   }
       
   362 
       
   363   public function deleteQueue() {
       
   364     $this->queue = array();
       
   365     $this->id_sequence = 0;
       
   366   }
       
   367 }
       
   368 
       
   369 /**
       
   370  * @} End of "defgroup queue".
       
   371  */