|
1 <?php |
|
2 |
|
3 /** |
|
4 * @file |
|
5 * Queue functionality. |
|
6 */ |
|
7 |
|
8 /** |
|
9 * @defgroup queue Queue operations |
|
10 * @{ |
|
11 * Queue items to allow later processing. |
|
12 * |
|
13 * The queue system allows placing items in a queue and processing them later. |
|
14 * The system tries to ensure that only one consumer can process an item. |
|
15 * |
|
16 * Before a queue can be used it needs to be created by |
|
17 * DrupalQueueInterface::createQueue(). |
|
18 * |
|
19 * Items can be added to the queue by passing an arbitrary data object to |
|
20 * DrupalQueueInterface::createItem(). |
|
21 * |
|
22 * To process an item, call DrupalQueueInterface::claimItem() and specify how |
|
23 * long you want to have a lease for working on that item. When finished |
|
24 * processing, the item needs to be deleted by calling |
|
25 * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be |
|
26 * made available again by the DrupalQueueInterface implementation once the |
|
27 * lease expires. Another consumer will then be able to receive it when calling |
|
28 * DrupalQueueInterface::claimItem(). Due to this, the processing code should |
|
29 * be aware that an item might be handed over for processing more than once. |
|
30 * |
|
31 * The $item object used by the DrupalQueueInterface can contain arbitrary |
|
32 * metadata depending on the implementation. Systems using the interface should |
|
33 * only rely on the data property which will contain the information passed to |
|
34 * DrupalQueueInterface::createItem(). The full queue item returned by |
|
35 * DrupalQueueInterface::claimItem() needs to be passed to |
|
36 * DrupalQueueInterface::deleteItem() once processing is completed. |
|
37 * |
|
38 * There are two kinds of queue backends available: reliable, which preserves |
|
39 * the order of messages and guarantees that every item will be executed at |
|
40 * least once. The non-reliable kind only does a best effort to preserve order |
|
41 * in messages and to execute them at least once but there is a small chance |
|
42 * that some items get lost. For example, some distributed back-ends like |
|
43 * Amazon SQS will be managing jobs for a large set of producers and consumers |
|
44 * where a strict FIFO ordering will likely not be preserved. Another example |
|
45 * would be an in-memory queue backend which might lose items if it crashes. |
|
46 * However, such a backend would be able to deal with significantly more writes |
|
47 * than a reliable queue and for many tasks this is more important. See |
|
48 * aggregator_cron() for an example of how to effectively utilize a |
|
49 * non-reliable queue. Another example is doing Twitter statistics -- the small |
|
50 * possibility of losing a few items is insignificant next to power of the |
|
51 * queue being able to keep up with writes. As described in the processing |
|
52 * section, regardless of the queue being reliable or not, the processing code |
|
53 * should be aware that an item might be handed over for processing more than |
|
54 * once (because the processing code might time out before it finishes). |
|
55 */ |
|
56 |
|
57 /** |
|
58 * Factory class for interacting with queues. |
|
59 */ |
|
60 class DrupalQueue { |
|
61 /** |
|
62 * Returns the queue object for a given name. |
|
63 * |
|
64 * The following variables can be set by variable_set or $conf overrides: |
|
65 * - queue_class_$name: the class to be used for the queue $name. |
|
66 * - queue_default_class: the class to use when queue_class_$name is not |
|
67 * defined. Defaults to SystemQueue, a reliable backend using SQL. |
|
68 * - queue_default_reliable_class: the class to use when queue_class_$name is |
|
69 * not defined and the queue_default_class is not reliable. Defaults to |
|
70 * SystemQueue. |
|
71 * |
|
72 * @param $name |
|
73 * Arbitrary string. The name of the queue to work with. |
|
74 * @param $reliable |
|
75 * TRUE if the ordering of items and guaranteeing every item executes at |
|
76 * least once is important, FALSE if scalability is the main concern. |
|
77 * |
|
78 * @return |
|
79 * The queue object for a given name. |
|
80 */ |
|
81 public static function get($name, $reliable = FALSE) { |
|
82 static $queues; |
|
83 if (!isset($queues[$name])) { |
|
84 $class = variable_get('queue_class_' . $name, NULL); |
|
85 if (!$class) { |
|
86 $class = variable_get('queue_default_class', 'SystemQueue'); |
|
87 } |
|
88 $object = new $class($name); |
|
89 if ($reliable && !$object instanceof DrupalReliableQueueInterface) { |
|
90 $class = variable_get('queue_default_reliable_class', 'SystemQueue'); |
|
91 $object = new $class($name); |
|
92 } |
|
93 $queues[$name] = $object; |
|
94 } |
|
95 return $queues[$name]; |
|
96 } |
|
97 } |
|
98 |
|
99 interface DrupalQueueInterface { |
|
100 |
|
101 /** |
|
102 * Add a queue item and store it directly to the queue. |
|
103 * |
|
104 * @param $data |
|
105 * Arbitrary data to be associated with the new task in the queue. |
|
106 * @return |
|
107 * TRUE if the item was successfully created and was (best effort) added |
|
108 * to the queue, otherwise FALSE. We don't guarantee the item was |
|
109 * committed to disk etc, but as far as we know, the item is now in the |
|
110 * queue. |
|
111 */ |
|
112 public function createItem($data); |
|
113 |
|
114 /** |
|
115 * Retrieve the number of items in the queue. |
|
116 * |
|
117 * This is intended to provide a "best guess" count of the number of items in |
|
118 * the queue. Depending on the implementation and the setup, the accuracy of |
|
119 * the results of this function may vary. |
|
120 * |
|
121 * e.g. On a busy system with a large number of consumers and items, the |
|
122 * result might only be valid for a fraction of a second and not provide an |
|
123 * accurate representation. |
|
124 * |
|
125 * @return |
|
126 * An integer estimate of the number of items in the queue. |
|
127 */ |
|
128 public function numberOfItems(); |
|
129 |
|
130 /** |
|
131 * Claim an item in the queue for processing. |
|
132 * |
|
133 * @param $lease_time |
|
134 * How long the processing is expected to take in seconds, defaults to an |
|
135 * hour. After this lease expires, the item will be reset and another |
|
136 * consumer can claim the item. For idempotent tasks (which can be run |
|
137 * multiple times without side effects), shorter lease times would result |
|
138 * in lower latency in case a consumer fails. For tasks that should not be |
|
139 * run more than once (non-idempotent), a larger lease time will make it |
|
140 * more rare for a given task to run multiple times in cases of failure, |
|
141 * at the cost of higher latency. |
|
142 * @return |
|
143 * On success we return an item object. If the queue is unable to claim an |
|
144 * item it returns false. This implies a best effort to retrieve an item |
|
145 * and either the queue is empty or there is some other non-recoverable |
|
146 * problem. |
|
147 */ |
|
148 public function claimItem($lease_time = 3600); |
|
149 |
|
150 /** |
|
151 * Delete a finished item from the queue. |
|
152 * |
|
153 * @param $item |
|
154 * The item returned by DrupalQueueInterface::claimItem(). |
|
155 */ |
|
156 public function deleteItem($item); |
|
157 |
|
158 /** |
|
159 * Release an item that the worker could not process, so another |
|
160 * worker can come in and process it before the timeout expires. |
|
161 * |
|
162 * @param $item |
|
163 * @return boolean |
|
164 */ |
|
165 public function releaseItem($item); |
|
166 |
|
167 /** |
|
168 * Create a queue. |
|
169 * |
|
170 * Called during installation and should be used to perform any necessary |
|
171 * initialization operations. This should not be confused with the |
|
172 * constructor for these objects, which is called every time an object is |
|
173 * instantiated to operate on a queue. This operation is only needed the |
|
174 * first time a given queue is going to be initialized (for example, to make |
|
175 * a new database table or directory to hold tasks for the queue -- it |
|
176 * depends on the queue implementation if this is necessary at all). |
|
177 */ |
|
178 public function createQueue(); |
|
179 |
|
180 /** |
|
181 * Delete a queue and every item in the queue. |
|
182 */ |
|
183 public function deleteQueue(); |
|
184 } |
|
185 |
|
186 /** |
|
187 * Reliable queue interface. |
|
188 * |
|
189 * Classes implementing this interface preserve the order of messages and |
|
190 * guarantee that every item will be executed at least once. |
|
191 */ |
|
192 interface DrupalReliableQueueInterface extends DrupalQueueInterface { |
|
193 } |
|
194 |
|
195 /** |
|
196 * Default queue implementation. |
|
197 */ |
|
198 class SystemQueue implements DrupalReliableQueueInterface { |
|
199 /** |
|
200 * The name of the queue this instance is working with. |
|
201 * |
|
202 * @var string |
|
203 */ |
|
204 protected $name; |
|
205 |
|
206 public function __construct($name) { |
|
207 $this->name = $name; |
|
208 } |
|
209 |
|
210 public function createItem($data) { |
|
211 // During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain |
|
212 // the queue table yet, so we cannot rely on drupal_write_record(). |
|
213 $query = db_insert('queue') |
|
214 ->fields(array( |
|
215 'name' => $this->name, |
|
216 'data' => serialize($data), |
|
217 // We cannot rely on REQUEST_TIME because many items might be created |
|
218 // by a single request which takes longer than 1 second. |
|
219 'created' => time(), |
|
220 )); |
|
221 return (bool) $query->execute(); |
|
222 } |
|
223 |
|
224 public function numberOfItems() { |
|
225 return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField(); |
|
226 } |
|
227 |
|
228 public function claimItem($lease_time = 30) { |
|
229 // Claim an item by updating its expire fields. If claim is not successful |
|
230 // another thread may have claimed the item in the meantime. Therefore loop |
|
231 // until an item is successfully claimed or we are reasonably sure there |
|
232 // are no unclaimed items left. |
|
233 while (TRUE) { |
|
234 $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject(); |
|
235 if ($item) { |
|
236 // Try to update the item. Only one thread can succeed in UPDATEing the |
|
237 // same row. We cannot rely on REQUEST_TIME because items might be |
|
238 // claimed by a single consumer which runs longer than 1 second. If we |
|
239 // continue to use REQUEST_TIME instead of the current time(), we steal |
|
240 // time from the lease, and will tend to reset items before the lease |
|
241 // should really expire. |
|
242 $update = db_update('queue') |
|
243 ->fields(array( |
|
244 'expire' => time() + $lease_time, |
|
245 )) |
|
246 ->condition('item_id', $item->item_id) |
|
247 ->condition('expire', 0); |
|
248 // If there are affected rows, this update succeeded. |
|
249 if ($update->execute()) { |
|
250 $item->data = unserialize($item->data); |
|
251 return $item; |
|
252 } |
|
253 } |
|
254 else { |
|
255 // No items currently available to claim. |
|
256 return FALSE; |
|
257 } |
|
258 } |
|
259 } |
|
260 |
|
261 public function releaseItem($item) { |
|
262 $update = db_update('queue') |
|
263 ->fields(array( |
|
264 'expire' => 0, |
|
265 )) |
|
266 ->condition('item_id', $item->item_id); |
|
267 return $update->execute(); |
|
268 } |
|
269 |
|
270 public function deleteItem($item) { |
|
271 db_delete('queue') |
|
272 ->condition('item_id', $item->item_id) |
|
273 ->execute(); |
|
274 } |
|
275 |
|
276 public function createQueue() { |
|
277 // All tasks are stored in a single database table (which is created when |
|
278 // Drupal is first installed) so there is nothing we need to do to create |
|
279 // a new queue. |
|
280 } |
|
281 |
|
282 public function deleteQueue() { |
|
283 db_delete('queue') |
|
284 ->condition('name', $this->name) |
|
285 ->execute(); |
|
286 } |
|
287 } |
|
288 |
|
289 /** |
|
290 * Static queue implementation. |
|
291 * |
|
292 * This allows "undelayed" variants of processes relying on the Queue |
|
293 * interface. The queue data resides in memory. It should only be used for |
|
294 * items that will be queued and dequeued within a given page request. |
|
295 */ |
|
296 class MemoryQueue implements DrupalQueueInterface { |
|
297 /** |
|
298 * The queue data. |
|
299 * |
|
300 * @var array |
|
301 */ |
|
302 protected $queue; |
|
303 |
|
304 /** |
|
305 * Counter for item ids. |
|
306 * |
|
307 * @var int |
|
308 */ |
|
309 protected $id_sequence; |
|
310 |
|
311 /** |
|
312 * Start working with a queue. |
|
313 * |
|
314 * @param $name |
|
315 * Arbitrary string. The name of the queue to work with. |
|
316 */ |
|
317 public function __construct($name) { |
|
318 $this->queue = array(); |
|
319 $this->id_sequence = 0; |
|
320 } |
|
321 |
|
322 public function createItem($data) { |
|
323 $item = new stdClass(); |
|
324 $item->item_id = $this->id_sequence++; |
|
325 $item->data = $data; |
|
326 $item->created = time(); |
|
327 $item->expire = 0; |
|
328 $this->queue[$item->item_id] = $item; |
|
329 return TRUE; |
|
330 } |
|
331 |
|
332 public function numberOfItems() { |
|
333 return count($this->queue); |
|
334 } |
|
335 |
|
336 public function claimItem($lease_time = 30) { |
|
337 foreach ($this->queue as $key => $item) { |
|
338 if ($item->expire == 0) { |
|
339 $item->expire = time() + $lease_time; |
|
340 $this->queue[$key] = $item; |
|
341 return $item; |
|
342 } |
|
343 } |
|
344 return FALSE; |
|
345 } |
|
346 |
|
347 public function deleteItem($item) { |
|
348 unset($this->queue[$item->item_id]); |
|
349 } |
|
350 |
|
351 public function releaseItem($item) { |
|
352 if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) { |
|
353 $this->queue[$item->item_id]->expire = 0; |
|
354 return TRUE; |
|
355 } |
|
356 return FALSE; |
|
357 } |
|
358 |
|
359 public function createQueue() { |
|
360 // Nothing needed here. |
|
361 } |
|
362 |
|
363 public function deleteQueue() { |
|
364 $this->queue = array(); |
|
365 $this->id_sequence = 0; |
|
366 } |
|
367 } |
|
368 |
|
369 /** |
|
370 * @} End of "defgroup queue". |
|
371 */ |