|
1 <?php |
|
2 /** |
|
3 * Zend Framework |
|
4 * |
|
5 * LICENSE |
|
6 * |
|
7 * This source file is subject to the new BSD license that is bundled |
|
8 * with this package in the file LICENSE.txt. |
|
9 * It is also available through the world-wide-web at this URL: |
|
10 * http://framework.zend.com/license/new-bsd |
|
11 * If you did not receive a copy of the license and are unable to |
|
12 * obtain it through the world-wide-web, please send an email |
|
13 * to license@zend.com so we can send you a copy immediately. |
|
14 * |
|
15 * @category Zend |
|
16 * @package Zend_Service |
|
17 * @subpackage Amazon_Sqs |
|
18 * @copyright Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com) |
|
19 * @license http://framework.zend.com/license/new-bsd New BSD License |
|
20 * @version $Id: Sqs.php 22984 2010-09-21 02:52:48Z matthew $ |
|
21 */ |
|
22 |
|
23 /** |
|
24 * @see Zend_Service_Amazon_Abstract |
|
25 */ |
|
26 require_once 'Zend/Service/Amazon/Abstract.php'; |
|
27 |
|
28 /** |
|
29 * @see Zend_Crypt_Hmac |
|
30 */ |
|
31 require_once 'Zend/Crypt/Hmac.php'; |
|
32 |
|
33 /** |
|
34 * Class for connecting to the Amazon Simple Queue Service (SQS) |
|
35 * |
|
36 * @category Zend |
|
37 * @package Zend_Service |
|
38 * @subpackage Amazon_Sqs |
|
39 * @copyright Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com) |
|
40 * @license http://framework.zend.com/license/new-bsd New BSD License |
|
41 * @see http://aws.amazon.com/sqs/ Amazon Simple Queue Service |
|
42 */ |
|
43 class Zend_Service_Amazon_Sqs extends Zend_Service_Amazon_Abstract |
|
44 { |
|
45 /** |
|
46 * Default timeout for createQueue() function |
|
47 */ |
|
48 const CREATE_TIMEOUT_DEFAULT = 30; |
|
49 |
|
50 /** |
|
51 * HTTP end point for the Amazon SQS service |
|
52 */ |
|
53 protected $_sqsEndpoint = 'queue.amazonaws.com'; |
|
54 |
|
55 /** |
|
56 * The API version to use |
|
57 */ |
|
58 protected $_sqsApiVersion = '2009-02-01'; |
|
59 |
|
60 /** |
|
61 * Signature Version |
|
62 */ |
|
63 protected $_sqsSignatureVersion = '2'; |
|
64 |
|
65 /** |
|
66 * Signature Encoding Method |
|
67 */ |
|
68 protected $_sqsSignatureMethod = 'HmacSHA256'; |
|
69 |
|
70 /** |
|
71 * Constructor |
|
72 * |
|
73 * @param string $accessKey |
|
74 * @param string $secretKey |
|
75 * @param string $region |
|
76 */ |
|
77 public function __construct($accessKey = null, $secretKey = null, $region = null) |
|
78 { |
|
79 parent::__construct($accessKey, $secretKey, $region); |
|
80 } |
|
81 |
|
82 /** |
|
83 * Create a new queue |
|
84 * |
|
85 * Visibility timeout is how long a message is left in the queue "invisible" |
|
86 * to other readers. If the message is acknowleged (deleted) before the |
|
87 * timeout, then the message is deleted. However, if the timeout expires |
|
88 * then the message will be made available to other queue readers. |
|
89 * |
|
90 * @param string $queue_name queue name |
|
91 * @param integer $timeout default visibility timeout |
|
92 * @return string|boolean |
|
93 * @throws Zend_Service_Amazon_Sqs_Exception |
|
94 */ |
|
95 public function create($queue_name, $timeout = null) |
|
96 { |
|
97 $params = array(); |
|
98 $params['QueueName'] = $queue_name; |
|
99 $timeout = ($timeout === null) ? self::CREATE_TIMEOUT_DEFAULT : (int)$timeout; |
|
100 $params['DefaultVisibilityTimeout'] = $timeout; |
|
101 |
|
102 $retry_count = 0; |
|
103 |
|
104 do { |
|
105 $retry = false; |
|
106 $result = $this->_makeRequest(null, 'CreateQueue', $params); |
|
107 |
|
108 if (!isset($result->CreateQueueResult->QueueUrl) |
|
109 || empty($result->CreateQueueResult->QueueUrl) |
|
110 ) { |
|
111 if ($result->Error->Code == 'AWS.SimpleQueueService.QueueNameExists') { |
|
112 return false; |
|
113 } elseif ($result->Error->Code == 'AWS.SimpleQueueService.QueueDeletedRecently') { |
|
114 // Must sleep for 60 seconds, then try re-creating the queue |
|
115 sleep(60); |
|
116 $retry = true; |
|
117 $retry_count++; |
|
118 } else { |
|
119 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; |
|
120 throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code); |
|
121 } |
|
122 } else { |
|
123 return (string) $result->CreateQueueResult->QueueUrl; |
|
124 } |
|
125 |
|
126 } while ($retry); |
|
127 |
|
128 return false; |
|
129 } |
|
130 |
|
131 /** |
|
132 * Delete a queue and all of it's messages |
|
133 * |
|
134 * Returns false if the queue is not found, true if the queue exists |
|
135 * |
|
136 * @param string $queue_url queue URL |
|
137 * @return boolean |
|
138 * @throws Zend_Service_Amazon_Sqs_Exception |
|
139 */ |
|
140 public function delete($queue_url) |
|
141 { |
|
142 $result = $this->_makeRequest($queue_url, 'DeleteQueue'); |
|
143 |
|
144 if ($result->Error->Code !== null) { |
|
145 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; |
|
146 throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code); |
|
147 } |
|
148 |
|
149 return true; |
|
150 } |
|
151 |
|
152 /** |
|
153 * Get an array of all available queues |
|
154 * |
|
155 * @return array |
|
156 * @throws Zend_Service_Amazon_Sqs_Exception |
|
157 */ |
|
158 public function getQueues() |
|
159 { |
|
160 $result = $this->_makeRequest(null, 'ListQueues'); |
|
161 |
|
162 if (isset($result->Error)) { |
|
163 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; |
|
164 throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code); |
|
165 } |
|
166 |
|
167 if (!isset($result->ListQueuesResult->QueueUrl) |
|
168 || empty($result->ListQueuesResult->QueueUrl) |
|
169 ) { |
|
170 return array(); |
|
171 } |
|
172 |
|
173 $queues = array(); |
|
174 foreach ($result->ListQueuesResult->QueueUrl as $queue_url) { |
|
175 $queues[] = (string)$queue_url; |
|
176 } |
|
177 |
|
178 return $queues; |
|
179 } |
|
180 |
|
181 /** |
|
182 * Return the approximate number of messages in the queue |
|
183 * |
|
184 * @param string $queue_url Queue URL |
|
185 * @return integer |
|
186 * @throws Zend_Service_Amazon_Sqs_Exception |
|
187 */ |
|
188 public function count($queue_url) |
|
189 { |
|
190 return (int)$this->getAttribute($queue_url, 'ApproximateNumberOfMessages'); |
|
191 } |
|
192 |
|
193 /** |
|
194 * Send a message to the queue |
|
195 * |
|
196 * @param string $queue_url Queue URL |
|
197 * @param string $message Message to send to the queue |
|
198 * @return string Message ID |
|
199 * @throws Zend_Service_Amazon_Sqs_Exception |
|
200 */ |
|
201 public function send($queue_url, $message) |
|
202 { |
|
203 $params = array(); |
|
204 $params['MessageBody'] = urlencode($message); |
|
205 |
|
206 $checksum = md5($params['MessageBody']); |
|
207 |
|
208 $result = $this->_makeRequest($queue_url, 'SendMessage', $params); |
|
209 |
|
210 if (!isset($result->SendMessageResult->MessageId) |
|
211 || empty($result->SendMessageResult->MessageId) |
|
212 ) { |
|
213 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; |
|
214 throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code); |
|
215 } else if ((string) $result->SendMessageResult->MD5OfMessageBody != $checksum) { |
|
216 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; |
|
217 throw new Zend_Service_Amazon_Sqs_Exception('MD5 of body does not match message sent'); |
|
218 } |
|
219 |
|
220 return (string) $result->SendMessageResult->MessageId; |
|
221 } |
|
222 |
|
223 /** |
|
224 * Get messages in the queue |
|
225 * |
|
226 * @param string $queue_url Queue name |
|
227 * @param integer $max_messages Maximum number of messages to return |
|
228 * @param integer $timeout Visibility timeout for these messages |
|
229 * @return array |
|
230 * @throws Zend_Service_Amazon_Sqs_Exception |
|
231 */ |
|
232 public function receive($queue_url, $max_messages = null, $timeout = null) |
|
233 { |
|
234 $params = array(); |
|
235 |
|
236 // If not set, the visibility timeout on the queue is used |
|
237 if ($timeout !== null) { |
|
238 $params['VisibilityTimeout'] = (int)$timeout; |
|
239 } |
|
240 |
|
241 // SQS will default to only returning one message |
|
242 if ($max_messages !== null) { |
|
243 $params['MaxNumberOfMessages'] = (int)$max_messages; |
|
244 } |
|
245 |
|
246 $result = $this->_makeRequest($queue_url, 'ReceiveMessage', $params); |
|
247 |
|
248 if (isset($result->Error)) { |
|
249 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; |
|
250 throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code); |
|
251 } |
|
252 |
|
253 if (!isset($result->ReceiveMessageResult->Message) |
|
254 || empty($result->ReceiveMessageResult->Message) |
|
255 ) { |
|
256 // no messages found |
|
257 return array(); |
|
258 } |
|
259 |
|
260 $data = array(); |
|
261 foreach ($result->ReceiveMessageResult->Message as $message) { |
|
262 $data[] = array( |
|
263 'message_id' => (string)$message->MessageId, |
|
264 'handle' => (string)$message->ReceiptHandle, |
|
265 'md5' => (string)$message->MD5OfBody, |
|
266 'body' => urldecode((string)$message->Body), |
|
267 ); |
|
268 } |
|
269 |
|
270 return $data; |
|
271 } |
|
272 |
|
273 /** |
|
274 * Delete a message from the queue |
|
275 * |
|
276 * Returns true if the message is deleted, false if the deletion is |
|
277 * unsuccessful. |
|
278 * |
|
279 * @param string $queue_url Queue URL |
|
280 * @param string $handle Message handle as returned by SQS |
|
281 * @return boolean |
|
282 * @throws Zend_Service_Amazon_Sqs_Exception |
|
283 */ |
|
284 public function deleteMessage($queue_url, $handle) |
|
285 { |
|
286 $params = array(); |
|
287 $params['ReceiptHandle'] = (string)$handle; |
|
288 |
|
289 $result = $this->_makeRequest($queue_url, 'DeleteMessage', $params); |
|
290 |
|
291 if (isset($result->Error->Code) |
|
292 && !empty($result->Error->Code) |
|
293 ) { |
|
294 return false; |
|
295 } |
|
296 |
|
297 // Will always return true unless ReceiptHandle is malformed |
|
298 return true; |
|
299 } |
|
300 |
|
301 /** |
|
302 * Get the attributes for the queue |
|
303 * |
|
304 * @param string $queue_url Queue URL |
|
305 * @param string $attribute |
|
306 * @return string |
|
307 * @throws Zend_Service_Amazon_Sqs_Exception |
|
308 */ |
|
309 public function getAttribute($queue_url, $attribute = 'All') |
|
310 { |
|
311 $params = array(); |
|
312 $params['AttributeName'] = $attribute; |
|
313 |
|
314 $result = $this->_makeRequest($queue_url, 'GetQueueAttributes', $params); |
|
315 |
|
316 if (!isset($result->GetQueueAttributesResult->Attribute) |
|
317 || empty($result->GetQueueAttributesResult->Attribute) |
|
318 ) { |
|
319 require_once 'Zend/Service/Amazon/Sqs/Exception.php'; |
|
320 throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code); |
|
321 } |
|
322 |
|
323 if(count($result->GetQueueAttributesResult->Attribute) > 1) { |
|
324 $attr_result = array(); |
|
325 foreach($result->GetQueueAttributesResult->Attribute as $attribute) { |
|
326 $attr_result[(string)$attribute->Name] = (string)$attribute->Value; |
|
327 } |
|
328 return $attr_result; |
|
329 } else { |
|
330 return (string) $result->GetQueueAttributesResult->Attribute->Value; |
|
331 } |
|
332 } |
|
333 |
|
334 /** |
|
335 * Make a request to Amazon SQS |
|
336 * |
|
337 * @param string $queue Queue Name |
|
338 * @param string $action SQS action |
|
339 * @param array $params |
|
340 * @return SimpleXMLElement |
|
341 */ |
|
342 private function _makeRequest($queue_url, $action, $params = array()) |
|
343 { |
|
344 $params['Action'] = $action; |
|
345 $params = $this->addRequiredParameters($queue_url, $params); |
|
346 |
|
347 if ($queue_url === null) { |
|
348 $queue_url = '/'; |
|
349 } |
|
350 |
|
351 $client = self::getHttpClient(); |
|
352 |
|
353 switch ($action) { |
|
354 case 'ListQueues': |
|
355 case 'CreateQueue': |
|
356 $client->setUri('http://'.$this->_sqsEndpoint); |
|
357 break; |
|
358 default: |
|
359 $client->setUri($queue_url); |
|
360 break; |
|
361 } |
|
362 |
|
363 $retry_count = 0; |
|
364 |
|
365 do { |
|
366 $retry = false; |
|
367 |
|
368 $client->resetParameters(); |
|
369 $client->setParameterGet($params); |
|
370 |
|
371 $response = $client->request('GET'); |
|
372 |
|
373 $response_code = $response->getStatus(); |
|
374 |
|
375 // Some 5xx errors are expected, so retry automatically |
|
376 if ($response_code >= 500 && $response_code < 600 && $retry_count <= 5) { |
|
377 $retry = true; |
|
378 $retry_count++; |
|
379 sleep($retry_count / 4 * $retry_count); |
|
380 } |
|
381 } while ($retry); |
|
382 |
|
383 unset($client); |
|
384 |
|
385 return new SimpleXMLElement($response->getBody()); |
|
386 } |
|
387 |
|
388 /** |
|
389 * Adds required authentication and version parameters to an array of |
|
390 * parameters |
|
391 * |
|
392 * The required parameters are: |
|
393 * - AWSAccessKey |
|
394 * - SignatureVersion |
|
395 * - Timestamp |
|
396 * - Version and |
|
397 * - Signature |
|
398 * |
|
399 * If a required parameter is already set in the <tt>$parameters</tt> array, |
|
400 * it is overwritten. |
|
401 * |
|
402 * @param string $queue_url Queue URL |
|
403 * @param array $parameters the array to which to add the required |
|
404 * parameters. |
|
405 * @return array |
|
406 */ |
|
407 protected function addRequiredParameters($queue_url, array $parameters) |
|
408 { |
|
409 $parameters['AWSAccessKeyId'] = $this->_getAccessKey(); |
|
410 $parameters['SignatureVersion'] = $this->_sqsSignatureVersion; |
|
411 $parameters['Timestamp'] = gmdate('Y-m-d\TH:i:s\Z', time()+10); |
|
412 $parameters['Version'] = $this->_sqsApiVersion; |
|
413 $parameters['SignatureMethod'] = $this->_sqsSignatureMethod; |
|
414 $parameters['Signature'] = $this->_signParameters($queue_url, $parameters); |
|
415 |
|
416 return $parameters; |
|
417 } |
|
418 |
|
419 /** |
|
420 * Computes the RFC 2104-compliant HMAC signature for request parameters |
|
421 * |
|
422 * This implements the Amazon Web Services signature, as per the following |
|
423 * specification: |
|
424 * |
|
425 * 1. Sort all request parameters (including <tt>SignatureVersion</tt> and |
|
426 * excluding <tt>Signature</tt>, the value of which is being created), |
|
427 * ignoring case. |
|
428 * |
|
429 * 2. Iterate over the sorted list and append the parameter name (in its |
|
430 * original case) and then its value. Do not URL-encode the parameter |
|
431 * values before constructing this string. Do not use any separator |
|
432 * characters when appending strings. |
|
433 * |
|
434 * @param string $queue_url Queue URL |
|
435 * @param array $parameters the parameters for which to get the signature. |
|
436 * |
|
437 * @return string the signed data. |
|
438 */ |
|
439 protected function _signParameters($queue_url, array $paramaters) |
|
440 { |
|
441 $data = "GET\n"; |
|
442 $data .= $this->_sqsEndpoint . "\n"; |
|
443 if ($queue_url !== null) { |
|
444 $data .= parse_url($queue_url, PHP_URL_PATH); |
|
445 } |
|
446 else { |
|
447 $data .= '/'; |
|
448 } |
|
449 $data .= "\n"; |
|
450 |
|
451 uksort($paramaters, 'strcmp'); |
|
452 unset($paramaters['Signature']); |
|
453 |
|
454 $arrData = array(); |
|
455 foreach($paramaters as $key => $value) { |
|
456 $arrData[] = $key . '=' . str_replace('%7E', '~', urlencode($value)); |
|
457 } |
|
458 |
|
459 $data .= implode('&', $arrData); |
|
460 |
|
461 $hmac = Zend_Crypt_Hmac::compute($this->_getSecretKey(), 'SHA256', $data, Zend_Crypt_Hmac::BINARY); |
|
462 |
|
463 return base64_encode($hmac); |
|
464 } |
|
465 } |