|
1 <?php |
|
2 /** |
|
3 * Zend Framework |
|
4 * |
|
5 * LICENSE |
|
6 * |
|
7 * This source file is subject to the new BSD license that is bundled |
|
8 * with this package in the file LICENSE.txt. |
|
9 * It is also available through the world-wide-web at this URL: |
|
10 * http://framework.zend.com/license/new-bsd |
|
11 * If you did not receive a copy of the license and are unable to |
|
12 * obtain it through the world-wide-web, please send an email |
|
13 * to license@zend.com so we can send you a copy immediately. |
|
14 * |
|
15 * @category Zend |
|
16 * @package Zend_Queue |
|
17 * @subpackage Adapter |
|
18 * @copyright Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com) |
|
19 * @license http://framework.zend.com/license/new-bsd New BSD License |
|
20 * @version $Id: Memcacheq.php 20096 2010-01-06 02:05:09Z bkarwin $ |
|
21 */ |
|
22 |
|
23 /** |
|
24 * @see Zend_Queue_Adapter_AdapterAbstract |
|
25 */ |
|
26 require_once 'Zend/Queue/Adapter/AdapterAbstract.php'; |
|
27 |
|
28 /** |
|
29 * Class for using connecting to a Zend_Cache-based queuing system |
|
30 * |
|
31 * @category Zend |
|
32 * @package Zend_Queue |
|
33 * @subpackage Adapter |
|
34 * @copyright Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com) |
|
35 * @license http://framework.zend.com/license/new-bsd New BSD License |
|
36 */ |
|
37 class Zend_Queue_Adapter_Memcacheq extends Zend_Queue_Adapter_AdapterAbstract |
|
38 { |
|
39 const DEFAULT_HOST = '127.0.0.1'; |
|
40 const DEFAULT_PORT = 22201; |
|
41 const EOL = "\r\n"; |
|
42 |
|
43 /** |
|
44 * @var Memcache |
|
45 */ |
|
46 protected $_cache = null; |
|
47 |
|
48 /** |
|
49 * @var string |
|
50 */ |
|
51 protected $_host = null; |
|
52 |
|
53 /** |
|
54 * @var integer |
|
55 */ |
|
56 protected $_port = null; |
|
57 |
|
58 /** |
|
59 * @var resource |
|
60 */ |
|
61 protected $_socket = null; |
|
62 |
|
63 /******************************************************************** |
|
64 * Constructor / Destructor |
|
65 *********************************************************************/ |
|
66 |
|
67 /** |
|
68 * Constructor |
|
69 * |
|
70 * @param array|Zend_Config $options |
|
71 * @param null|Zend_Queue $queue |
|
72 * @return void |
|
73 */ |
|
74 public function __construct($options, Zend_Queue $queue = null) |
|
75 { |
|
76 if (!extension_loaded('memcache')) { |
|
77 require_once 'Zend/Queue/Exception.php'; |
|
78 throw new Zend_Queue_Exception('Memcache extension does not appear to be loaded'); |
|
79 } |
|
80 |
|
81 parent::__construct($options, $queue); |
|
82 |
|
83 $options = &$this->_options['driverOptions']; |
|
84 |
|
85 if (!array_key_exists('host', $options)) { |
|
86 $options['host'] = self::DEFAULT_HOST; |
|
87 } |
|
88 if (!array_key_exists('port', $options)) { |
|
89 $options['port'] = self::DEFAULT_PORT; |
|
90 } |
|
91 |
|
92 $this->_cache = new Memcache(); |
|
93 |
|
94 $result = $this->_cache->connect($options['host'], $options['port']); |
|
95 |
|
96 if ($result === false) { |
|
97 require_once 'Zend/Queue/Exception.php'; |
|
98 throw new Zend_Queue_Exception('Could not connect to MemcacheQ'); |
|
99 } |
|
100 |
|
101 $this->_host = $options['host']; |
|
102 $this->_port = (int)$options['port']; |
|
103 } |
|
104 |
|
105 /** |
|
106 * Destructor |
|
107 * |
|
108 * @return void |
|
109 */ |
|
110 public function __destruct() |
|
111 { |
|
112 if ($this->_cache instanceof Memcache) { |
|
113 $this->_cache->close(); |
|
114 } |
|
115 if (is_resource($this->_socket)) { |
|
116 $cmd = 'quit' . self::EOL; |
|
117 fwrite($this->_socket, $cmd); |
|
118 fclose($this->_socket); |
|
119 } |
|
120 } |
|
121 |
|
122 /******************************************************************** |
|
123 * Queue management functions |
|
124 *********************************************************************/ |
|
125 |
|
126 /** |
|
127 * Does a queue already exist? |
|
128 * |
|
129 * Throws an exception if the adapter cannot determine if a queue exists. |
|
130 * use isSupported('isExists') to determine if an adapter can test for |
|
131 * queue existance. |
|
132 * |
|
133 * @param string $name |
|
134 * @return boolean |
|
135 * @throws Zend_Queue_Exception |
|
136 */ |
|
137 public function isExists($name) |
|
138 { |
|
139 if (empty($this->_queues)) { |
|
140 $this->getQueues(); |
|
141 } |
|
142 |
|
143 return in_array($name, $this->_queues); |
|
144 } |
|
145 |
|
146 /** |
|
147 * Create a new queue |
|
148 * |
|
149 * Visibility timeout is how long a message is left in the queue "invisible" |
|
150 * to other readers. If the message is acknowleged (deleted) before the |
|
151 * timeout, then the message is deleted. However, if the timeout expires |
|
152 * then the message will be made available to other queue readers. |
|
153 * |
|
154 * @param string $name queue name |
|
155 * @param integer $timeout default visibility timeout |
|
156 * @return boolean |
|
157 * @throws Zend_Queue_Exception |
|
158 */ |
|
159 public function create($name, $timeout=null) |
|
160 { |
|
161 if ($this->isExists($name)) { |
|
162 return false; |
|
163 } |
|
164 if ($timeout === null) { |
|
165 $timeout = self::CREATE_TIMEOUT_DEFAULT; |
|
166 } |
|
167 |
|
168 // MemcacheQ does not have a method to "create" a queue |
|
169 // queues are created upon sending a packet. |
|
170 // We cannot use the send() and receive() functions because those |
|
171 // depend on the current name. |
|
172 $result = $this->_cache->set($name, 'creating queue', 0, 15); |
|
173 $result = $this->_cache->get($name); |
|
174 |
|
175 $this->_queues[] = $name; |
|
176 |
|
177 return true; |
|
178 } |
|
179 |
|
180 /** |
|
181 * Delete a queue and all of it's messages |
|
182 * |
|
183 * Returns false if the queue is not found, true if the queue exists |
|
184 * |
|
185 * @param string $name queue name |
|
186 * @return boolean |
|
187 * @throws Zend_Queue_Exception |
|
188 */ |
|
189 public function delete($name) |
|
190 { |
|
191 $response = $this->_sendCommand('delete ' . $name, array('DELETED', 'NOT_FOUND'), true); |
|
192 |
|
193 if (in_array('DELETED', $response)) { |
|
194 $key = array_search($name, $this->_queues); |
|
195 |
|
196 if ($key !== false) { |
|
197 unset($this->_queues[$key]); |
|
198 } |
|
199 return true; |
|
200 } |
|
201 |
|
202 return false; |
|
203 } |
|
204 |
|
205 /** |
|
206 * Get an array of all available queues |
|
207 * |
|
208 * Not all adapters support getQueues(), use isSupported('getQueues') |
|
209 * to determine if the adapter supports this feature. |
|
210 * |
|
211 * @return array |
|
212 * @throws Zend_Queue_Exception |
|
213 */ |
|
214 public function getQueues() |
|
215 { |
|
216 $this->_queues = array(); |
|
217 |
|
218 $response = $this->_sendCommand('stats queue', array('END')); |
|
219 |
|
220 foreach ($response as $i => $line) { |
|
221 $this->_queues[] = str_replace('STAT ', '', $line); |
|
222 } |
|
223 |
|
224 return $this->_queues; |
|
225 } |
|
226 |
|
227 /** |
|
228 * Return the approximate number of messages in the queue |
|
229 * |
|
230 * @param Zend_Queue $queue |
|
231 * @return integer |
|
232 * @throws Zend_Queue_Exception (not supported) |
|
233 */ |
|
234 public function count(Zend_Queue $queue=null) |
|
235 { |
|
236 require_once 'Zend/Queue/Exception.php'; |
|
237 throw new Zend_Queue_Exception('count() is not supported in this adapter'); |
|
238 } |
|
239 |
|
240 /******************************************************************** |
|
241 * Messsage management functions |
|
242 *********************************************************************/ |
|
243 |
|
244 /** |
|
245 * Send a message to the queue |
|
246 * |
|
247 * @param string $message Message to send to the active queue |
|
248 * @param Zend_Queue $queue |
|
249 * @return Zend_Queue_Message |
|
250 * @throws Zend_Queue_Exception |
|
251 */ |
|
252 public function send($message, Zend_Queue $queue=null) |
|
253 { |
|
254 if ($queue === null) { |
|
255 $queue = $this->_queue; |
|
256 } |
|
257 |
|
258 if (!$this->isExists($queue->getName())) { |
|
259 require_once 'Zend/Queue/Exception.php'; |
|
260 throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName()); |
|
261 } |
|
262 |
|
263 $message = (string) $message; |
|
264 $data = array( |
|
265 'message_id' => md5(uniqid(rand(), true)), |
|
266 'handle' => null, |
|
267 'body' => $message, |
|
268 'md5' => md5($message), |
|
269 ); |
|
270 |
|
271 $result = $this->_cache->set($queue->getName(), $message, 0, 0); |
|
272 if ($result === false) { |
|
273 require_once 'Zend/Queue/Exception.php'; |
|
274 throw new Zend_Queue_Exception('failed to insert message into queue:' . $queue->getName()); |
|
275 } |
|
276 |
|
277 $options = array( |
|
278 'queue' => $queue, |
|
279 'data' => $data, |
|
280 ); |
|
281 |
|
282 $classname = $queue->getMessageClass(); |
|
283 if (!class_exists($classname)) { |
|
284 require_once 'Zend/Loader.php'; |
|
285 Zend_Loader::loadClass($classname); |
|
286 } |
|
287 return new $classname($options); |
|
288 } |
|
289 |
|
290 /** |
|
291 * Get messages in the queue |
|
292 * |
|
293 * @param integer $maxMessages Maximum number of messages to return |
|
294 * @param integer $timeout Visibility timeout for these messages |
|
295 * @param Zend_Queue $queue |
|
296 * @return Zend_Queue_Message_Iterator |
|
297 * @throws Zend_Queue_Exception |
|
298 */ |
|
299 public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null) |
|
300 { |
|
301 if ($maxMessages === null) { |
|
302 $maxMessages = 1; |
|
303 } |
|
304 |
|
305 if ($timeout === null) { |
|
306 $timeout = self::RECEIVE_TIMEOUT_DEFAULT; |
|
307 } |
|
308 if ($queue === null) { |
|
309 $queue = $this->_queue; |
|
310 } |
|
311 |
|
312 $msgs = array(); |
|
313 if ($maxMessages > 0 ) { |
|
314 for ($i = 0; $i < $maxMessages; $i++) { |
|
315 $data = array( |
|
316 'handle' => md5(uniqid(rand(), true)), |
|
317 'body' => $this->_cache->get($queue->getName()), |
|
318 ); |
|
319 |
|
320 $msgs[] = $data; |
|
321 } |
|
322 } |
|
323 |
|
324 $options = array( |
|
325 'queue' => $queue, |
|
326 'data' => $msgs, |
|
327 'messageClass' => $queue->getMessageClass(), |
|
328 ); |
|
329 |
|
330 $classname = $queue->getMessageSetClass(); |
|
331 if (!class_exists($classname)) { |
|
332 require_once 'Zend/Loader.php'; |
|
333 Zend_Loader::loadClass($classname); |
|
334 } |
|
335 return new $classname($options); |
|
336 } |
|
337 |
|
338 /** |
|
339 * Delete a message from the queue |
|
340 * |
|
341 * Returns true if the message is deleted, false if the deletion is |
|
342 * unsuccessful. |
|
343 * |
|
344 * @param Zend_Queue_Message $message |
|
345 * @return boolean |
|
346 * @throws Zend_Queue_Exception (unsupported) |
|
347 */ |
|
348 public function deleteMessage(Zend_Queue_Message $message) |
|
349 { |
|
350 require_once 'Zend/Queue/Exception.php'; |
|
351 throw new Zend_Queue_Exception('deleteMessage() is not supported in ' . get_class($this)); |
|
352 } |
|
353 |
|
354 /******************************************************************** |
|
355 * Supporting functions |
|
356 *********************************************************************/ |
|
357 |
|
358 /** |
|
359 * Return a list of queue capabilities functions |
|
360 * |
|
361 * $array['function name'] = true or false |
|
362 * true is supported, false is not supported. |
|
363 * |
|
364 * @param string $name |
|
365 * @return array |
|
366 */ |
|
367 public function getCapabilities() |
|
368 { |
|
369 return array( |
|
370 'create' => true, |
|
371 'delete' => true, |
|
372 'send' => true, |
|
373 'receive' => true, |
|
374 'deleteMessage' => false, |
|
375 'getQueues' => true, |
|
376 'count' => false, |
|
377 'isExists' => true, |
|
378 ); |
|
379 } |
|
380 |
|
381 /******************************************************************** |
|
382 * Functions that are not part of the Zend_Queue_Adapter_Abstract |
|
383 *********************************************************************/ |
|
384 |
|
385 /** |
|
386 * sends a command to MemcacheQ |
|
387 * |
|
388 * The memcache functions by php cannot handle all types of requests |
|
389 * supported by MemcacheQ |
|
390 * Non-standard requests are handled by this function. |
|
391 * |
|
392 * @param string $command - command to send to memcacheQ |
|
393 * @param array $terminator - strings to indicate end of memcacheQ response |
|
394 * @param boolean $include_term - include terminator in response |
|
395 * @return array |
|
396 * @throws Zend_Queue_Exception if connection cannot be opened |
|
397 */ |
|
398 protected function _sendCommand($command, array $terminator, $include_term=false) |
|
399 { |
|
400 if (!is_resource($this->_socket)) { |
|
401 $this->_socket = fsockopen($this->_host, $this->_port, $errno, $errstr, 10); |
|
402 } |
|
403 if ($this->_socket === false) { |
|
404 require_once 'Zend/Queue/Exception.php'; |
|
405 throw new Zend_Queue_Exception("Could not open a connection to $this->_host:$this->_port errno=$errno : $errstr"); |
|
406 } |
|
407 |
|
408 $response = array(); |
|
409 |
|
410 $cmd = $command . self::EOL; |
|
411 fwrite($this->_socket, $cmd); |
|
412 |
|
413 $continue_reading = true; |
|
414 while (!feof($this->_socket) && $continue_reading) { |
|
415 $resp = trim(fgets($this->_socket, 1024)); |
|
416 if (in_array($resp, $terminator)) { |
|
417 if ($include_term) { |
|
418 $response[] = $resp; |
|
419 } |
|
420 $continue_reading = false; |
|
421 } else { |
|
422 $response[] = $resp; |
|
423 } |
|
424 } |
|
425 |
|
426 return $response; |
|
427 } |
|
428 } |