|
1 <?php |
|
2 /** |
|
3 * LICENSE |
|
4 * |
|
5 * This source file is subject to the new BSD license that is bundled |
|
6 * with this package in the file LICENSE.txt. |
|
7 * It is also available through the world-wide-web at this URL: |
|
8 * http://framework.zend.com/license/new-bsd |
|
9 * If you did not receive a copy of the license and are unable to |
|
10 * obtain it through the world-wide-web, please send an email |
|
11 * to license@zend.com so we can send you a copy immediately. |
|
12 * |
|
13 * @category Zend |
|
14 * @package Zend_Cloud |
|
15 * @subpackage QueueService |
|
16 * @copyright Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com) |
|
17 * @license http://framework.zend.com/license/new-bsd New BSD License |
|
18 */ |
|
19 |
|
20 require_once 'Zend/Service/Amazon/Sqs.php'; |
|
21 require_once 'Zend/Cloud/QueueService/Adapter/AbstractAdapter.php'; |
|
22 require_once 'Zend/Cloud/QueueService/Exception.php'; |
|
23 require_once 'Zend/Cloud/QueueService/Message.php'; |
|
24 |
|
25 /** |
|
26 * SQS adapter for simple queue service. |
|
27 * |
|
28 * @category Zend |
|
29 * @package Zend_Cloud |
|
30 * @subpackage QueueService |
|
31 * @copyright Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com) |
|
32 * @license http://framework.zend.com/license/new-bsd New BSD License |
|
33 */ |
|
34 class Zend_Cloud_QueueService_Adapter_Sqs |
|
35 extends Zend_Cloud_QueueService_Adapter_AbstractAdapter |
|
36 { |
|
37 /* |
|
38 * Options array keys for the SQS adapter. |
|
39 */ |
|
40 const AWS_ACCESS_KEY = 'aws_accesskey'; |
|
41 const AWS_SECRET_KEY = 'aws_secretkey'; |
|
42 |
|
43 /** |
|
44 * Defaults |
|
45 */ |
|
46 const CREATE_TIMEOUT = 30; |
|
47 |
|
48 /** |
|
49 * SQS service instance. |
|
50 * @var Zend_Service_Amazon_Sqs |
|
51 */ |
|
52 protected $_sqs; |
|
53 |
|
54 /** |
|
55 * Constructor |
|
56 * |
|
57 * @param array|Zend_Config $options |
|
58 * @return void |
|
59 */ |
|
60 public function __construct($options = array()) |
|
61 { |
|
62 if ($options instanceof Zend_Config) { |
|
63 $options = $options->toArray(); |
|
64 } |
|
65 |
|
66 if (!is_array($options)) { |
|
67 throw new Zend_Cloud_QueueService_Exception('Invalid options provided'); |
|
68 } |
|
69 |
|
70 if (isset($options[self::MESSAGE_CLASS])) { |
|
71 $this->setMessageClass($options[self::MESSAGE_CLASS]); |
|
72 } |
|
73 |
|
74 if (isset($options[self::MESSAGESET_CLASS])) { |
|
75 $this->setMessageSetClass($options[self::MESSAGESET_CLASS]); |
|
76 } |
|
77 |
|
78 try { |
|
79 $this->_sqs = new Zend_Service_Amazon_Sqs( |
|
80 $options[self::AWS_ACCESS_KEY], $options[self::AWS_SECRET_KEY] |
|
81 ); |
|
82 } catch(Zend_Service_Amazon_Exception $e) { |
|
83 throw new Zend_Cloud_QueueService_Exception('Error on create: '.$e->getMessage(), $e->getCode(), $e); |
|
84 } |
|
85 |
|
86 if(isset($options[self::HTTP_ADAPTER])) { |
|
87 $this->_sqs->getHttpClient()->setAdapter($options[self::HTTP_ADAPTER]); |
|
88 } |
|
89 } |
|
90 |
|
91 /** |
|
92 * Create a queue. Returns the ID of the created queue (typically the URL). |
|
93 * It may take some time to create the queue. Check your vendor's |
|
94 * documentation for details. |
|
95 * |
|
96 * @param string $name |
|
97 * @param array $options |
|
98 * @return string Queue ID (typically URL) |
|
99 */ |
|
100 public function createQueue($name, $options = null) |
|
101 { |
|
102 try { |
|
103 return $this->_sqs->create($name, $options[self::CREATE_TIMEOUT]); |
|
104 } catch(Zend_Service_Amazon_Exception $e) { |
|
105 throw new Zend_Cloud_QueueService_Exception('Error on queue creation: '.$e->getMessage(), $e->getCode(), $e); |
|
106 } |
|
107 } |
|
108 |
|
109 /** |
|
110 * Delete a queue. All messages in the queue will also be deleted. |
|
111 * |
|
112 * @param string $queueId |
|
113 * @param array $options |
|
114 * @return boolean true if successful, false otherwise |
|
115 */ |
|
116 public function deleteQueue($queueId, $options = null) |
|
117 { |
|
118 try { |
|
119 return $this->_sqs->delete($queueId); |
|
120 } catch(Zend_Service_Amazon_Exception $e) { |
|
121 throw Zend_Cloud_QueueService_Exception('Error on queue deletion: '.$e->getMessage(), $e->getCode(), $e); |
|
122 } |
|
123 } |
|
124 |
|
125 /** |
|
126 * List all queues. |
|
127 * |
|
128 * @param array $options |
|
129 * @return array Queue IDs |
|
130 */ |
|
131 public function listQueues($options = null) |
|
132 { |
|
133 try { |
|
134 return $this->_sqs->getQueues(); |
|
135 } catch(Zend_Service_Amazon_Exception $e) { |
|
136 throw new Zend_Cloud_QueueService_Exception('Error on listing queues: '.$e->getMessage(), $e->getCode(), $e); |
|
137 } |
|
138 } |
|
139 |
|
140 /** |
|
141 * Get a key/value array of metadata for the given queue. |
|
142 * |
|
143 * @param string $queueId |
|
144 * @param array $options |
|
145 * @return array |
|
146 */ |
|
147 public function fetchQueueMetadata($queueId, $options = null) |
|
148 { |
|
149 try { |
|
150 // TODO: ZF-9050 Fix the SQS client library in trunk to return all attribute values |
|
151 $attributes = $this->_sqs->getAttribute($queueId, 'All'); |
|
152 if(is_array($attributes)) { |
|
153 return $attributes; |
|
154 } else { |
|
155 return array('All' => $this->_sqs->getAttribute($queueId, 'All')); |
|
156 } |
|
157 } catch(Zend_Service_Amazon_Exception $e) { |
|
158 throw new Zend_Cloud_QueueService_Exception('Error on fetching queue metadata: '.$e->getMessage(), $e->getCode(), $e); |
|
159 } |
|
160 } |
|
161 |
|
162 /** |
|
163 * Store a key/value array of metadata for the specified queue. |
|
164 * WARNING: This operation overwrites any metadata that is located at |
|
165 * $destinationPath. Some adapters may not support this method. |
|
166 * |
|
167 * @param array $metadata |
|
168 * @param string $queueId |
|
169 * @param array $options |
|
170 * @return void |
|
171 */ |
|
172 public function storeQueueMetadata($queueId, $metadata, $options = null) |
|
173 { |
|
174 // TODO Add support for SetQueueAttributes to client library |
|
175 require_once 'Zend/Cloud/OperationNotAvailableException.php'; |
|
176 throw new Zend_Cloud_OperationNotAvailableException('Amazon SQS doesn\'t currently support storing metadata'); |
|
177 } |
|
178 |
|
179 /** |
|
180 * Send a message to the specified queue. |
|
181 * |
|
182 * @param string $message |
|
183 * @param string $queueId |
|
184 * @param array $options |
|
185 * @return string Message ID |
|
186 */ |
|
187 public function sendMessage($queueId, $message, $options = null) |
|
188 { |
|
189 try { |
|
190 return $this->_sqs->send($queueId, $message); |
|
191 } catch(Zend_Service_Amazon_Exception $e) { |
|
192 throw new Zend_Cloud_QueueService_Exception('Error on sending message: '.$e->getMessage(), $e->getCode(), $e); |
|
193 } |
|
194 } |
|
195 |
|
196 /** |
|
197 * Recieve at most $max messages from the specified queue and return the |
|
198 * message IDs for messages recieved. |
|
199 * |
|
200 * @param string $queueId |
|
201 * @param int $max |
|
202 * @param array $options |
|
203 * @return array |
|
204 */ |
|
205 public function receiveMessages($queueId, $max = 1, $options = null) |
|
206 { |
|
207 try { |
|
208 return $this->_makeMessages($this->_sqs->receive($queueId, $max, $options[self::VISIBILITY_TIMEOUT])); |
|
209 } catch(Zend_Service_Amazon_Exception $e) { |
|
210 throw new Zend_Cloud_QueueService_Exception('Error on recieving messages: '.$e->getMessage(), $e->getCode(), $e); |
|
211 } |
|
212 } |
|
213 |
|
214 /** |
|
215 * Create Zend_Cloud_QueueService_Message array for |
|
216 * Sqs messages. |
|
217 * |
|
218 * @param array $messages |
|
219 * @return Zend_Cloud_QueueService_Message[] |
|
220 */ |
|
221 protected function _makeMessages($messages) |
|
222 { |
|
223 $messageClass = $this->getMessageClass(); |
|
224 $setClass = $this->getMessageSetClass(); |
|
225 $result = array(); |
|
226 foreach($messages as $message) { |
|
227 $result[] = new $messageClass($message['body'], $message); |
|
228 } |
|
229 return new $setClass($result); |
|
230 } |
|
231 |
|
232 /** |
|
233 * Delete the specified message from the specified queue. |
|
234 * |
|
235 * @param string $queueId |
|
236 * @param Zend_Cloud_QueueService_Message $message |
|
237 * @param array $options |
|
238 * @return void |
|
239 */ |
|
240 public function deleteMessage($queueId, $message, $options = null) |
|
241 { |
|
242 try { |
|
243 if($message instanceof Zend_Cloud_QueueService_Message) { |
|
244 $message = $message->getMessage(); |
|
245 } |
|
246 $messageId = $message['handle']; |
|
247 return $this->_sqs->deleteMessage($queueId, $messageId); |
|
248 } catch(Zend_Service_Amazon_Exception $e) { |
|
249 throw new Zend_Cloud_QueueService_Exception('Error on deleting a message: '.$e->getMessage(), $e->getCode(), $e); |
|
250 } |
|
251 } |
|
252 |
|
253 /** |
|
254 * Peek at the messages from the specified queue without removing them. |
|
255 * |
|
256 * @param string $queueId |
|
257 * @param int $num How many messages |
|
258 * @param array $options |
|
259 * @return Zend_Cloud_QueueService_Message[] |
|
260 */ |
|
261 public function peekMessages($queueId, $num = 1, $options = null) |
|
262 { |
|
263 try { |
|
264 return $this->_makeMessages($this->_sqs->receive($queueId, $num, 0)); |
|
265 } catch(Zend_Service_Amazon_Exception $e) { |
|
266 throw new Zend_Cloud_QueueService_Exception('Error on peeking messages: '.$e->getMessage(), $e->getCode(), $e); |
|
267 } |
|
268 } |
|
269 |
|
270 /** |
|
271 * Get SQS implementation |
|
272 * @return Zend_Service_Amazon_Sqs |
|
273 */ |
|
274 public function getClient() |
|
275 { |
|
276 return $this->_sqs; |
|
277 } |
|
278 } |