web/enmi/Zend/Service/Amazon/Sqs.php
changeset 19 1c2f13fd785c
parent 0 4eba9c11703f
equal deleted inserted replaced
18:bd595ad770fc 19:1c2f13fd785c
       
     1 <?php
       
     2 /**
       
     3  * Zend Framework
       
     4  *
       
     5  * LICENSE
       
     6  *
       
     7  * This source file is subject to the new BSD license that is bundled
       
     8  * with this package in the file LICENSE.txt.
       
     9  * It is also available through the world-wide-web at this URL:
       
    10  * http://framework.zend.com/license/new-bsd
       
    11  * If you did not receive a copy of the license and are unable to
       
    12  * obtain it through the world-wide-web, please send an email
       
    13  * to license@zend.com so we can send you a copy immediately.
       
    14  *
       
    15  * @category   Zend
       
    16  * @package    Zend_Service
       
    17  * @subpackage Amazon_Sqs
       
    18  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    19  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    20  * @version    $Id: Sqs.php 22984 2010-09-21 02:52:48Z matthew $
       
    21  */
       
    22 
       
    23 /**
       
    24  * @see Zend_Service_Amazon_Abstract
       
    25  */
       
    26 require_once 'Zend/Service/Amazon/Abstract.php';
       
    27 
       
    28 /**
       
    29  * @see Zend_Crypt_Hmac
       
    30  */
       
    31 require_once 'Zend/Crypt/Hmac.php';
       
    32 
       
    33 /**
       
    34  * Class for connecting to the Amazon Simple Queue Service (SQS)
       
    35  *
       
    36  * @category   Zend
       
    37  * @package    Zend_Service
       
    38  * @subpackage Amazon_Sqs
       
    39  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    40  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    41  * @see        http://aws.amazon.com/sqs/ Amazon Simple Queue Service
       
    42  */
       
    43 class Zend_Service_Amazon_Sqs extends Zend_Service_Amazon_Abstract
       
    44 {
       
    45     /**
       
    46      * Default timeout for createQueue() function
       
    47      */
       
    48     const CREATE_TIMEOUT_DEFAULT = 30;
       
    49 
       
    50     /**
       
    51      * HTTP end point for the Amazon SQS service
       
    52      */
       
    53     protected $_sqsEndpoint = 'queue.amazonaws.com';
       
    54 
       
    55     /**
       
    56      * The API version to use
       
    57      */
       
    58     protected $_sqsApiVersion = '2009-02-01';
       
    59 
       
    60     /**
       
    61      * Signature Version
       
    62      */
       
    63     protected $_sqsSignatureVersion = '2';
       
    64 
       
    65     /**
       
    66      * Signature Encoding Method
       
    67      */
       
    68     protected $_sqsSignatureMethod = 'HmacSHA256';
       
    69 
       
    70     /**
       
    71      * Constructor
       
    72      *
       
    73      * @param string $accessKey
       
    74      * @param string $secretKey
       
    75      * @param string $region
       
    76      */
       
    77     public function __construct($accessKey = null, $secretKey = null, $region = null)
       
    78     {
       
    79         parent::__construct($accessKey, $secretKey, $region);
       
    80     }
       
    81 
       
    82     /**
       
    83      * Create a new queue
       
    84      *
       
    85      * Visibility timeout is how long a message is left in the queue "invisible"
       
    86      * to other readers.  If the message is acknowleged (deleted) before the
       
    87      * timeout, then the message is deleted.  However, if the timeout expires
       
    88      * then the message will be made available to other queue readers.
       
    89      *
       
    90      * @param  string  $queue_name queue name
       
    91      * @param  integer $timeout    default visibility timeout
       
    92      * @return string|boolean
       
    93      * @throws Zend_Service_Amazon_Sqs_Exception
       
    94      */
       
    95     public function create($queue_name, $timeout = null)
       
    96     {
       
    97         $params = array();
       
    98         $params['QueueName'] = $queue_name;
       
    99         $timeout = ($timeout === null) ? self::CREATE_TIMEOUT_DEFAULT : (int)$timeout;
       
   100         $params['DefaultVisibilityTimeout'] = $timeout;
       
   101 
       
   102         $retry_count = 0;
       
   103 
       
   104         do {
       
   105             $retry  = false;
       
   106             $result = $this->_makeRequest(null, 'CreateQueue', $params);
       
   107 
       
   108             if (!isset($result->CreateQueueResult->QueueUrl)
       
   109                 || empty($result->CreateQueueResult->QueueUrl)
       
   110             ) {
       
   111                 if ($result->Error->Code == 'AWS.SimpleQueueService.QueueNameExists') {
       
   112                     return false;
       
   113                 } elseif ($result->Error->Code == 'AWS.SimpleQueueService.QueueDeletedRecently') {
       
   114                     // Must sleep for 60 seconds, then try re-creating the queue
       
   115                     sleep(60);
       
   116                     $retry = true;
       
   117                     $retry_count++;
       
   118                 } else {
       
   119                     require_once 'Zend/Service/Amazon/Sqs/Exception.php';
       
   120                     throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code);
       
   121                 }
       
   122             } else {
       
   123                 return (string) $result->CreateQueueResult->QueueUrl;
       
   124             }
       
   125 
       
   126         } while ($retry);
       
   127 
       
   128         return false;
       
   129     }
       
   130 
       
   131     /**
       
   132      * Delete a queue and all of it's messages
       
   133      *
       
   134      * Returns false if the queue is not found, true if the queue exists
       
   135      *
       
   136      * @param  string  $queue_url queue URL
       
   137      * @return boolean
       
   138      * @throws Zend_Service_Amazon_Sqs_Exception
       
   139      */
       
   140     public function delete($queue_url)
       
   141     {
       
   142         $result = $this->_makeRequest($queue_url, 'DeleteQueue');
       
   143 
       
   144         if ($result->Error->Code !== null) {
       
   145             require_once 'Zend/Service/Amazon/Sqs/Exception.php';
       
   146             throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code);
       
   147         }
       
   148 
       
   149         return true;
       
   150     }
       
   151 
       
   152     /**
       
   153      * Get an array of all available queues
       
   154      *
       
   155      * @return array
       
   156      * @throws Zend_Service_Amazon_Sqs_Exception
       
   157      */
       
   158     public function getQueues()
       
   159     {
       
   160         $result = $this->_makeRequest(null, 'ListQueues');
       
   161 
       
   162         if (isset($result->Error)) {
       
   163             require_once 'Zend/Service/Amazon/Sqs/Exception.php';
       
   164             throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code);
       
   165         }
       
   166 
       
   167         if (!isset($result->ListQueuesResult->QueueUrl)
       
   168             || empty($result->ListQueuesResult->QueueUrl)
       
   169         ) {
       
   170             return array();
       
   171         }
       
   172 
       
   173         $queues = array();
       
   174         foreach ($result->ListQueuesResult->QueueUrl as $queue_url) {
       
   175             $queues[] = (string)$queue_url;
       
   176         }
       
   177 
       
   178         return $queues;
       
   179     }
       
   180 
       
   181     /**
       
   182      * Return the approximate number of messages in the queue
       
   183      *
       
   184      * @param  string  $queue_url Queue URL
       
   185      * @return integer
       
   186      * @throws Zend_Service_Amazon_Sqs_Exception
       
   187      */
       
   188     public function count($queue_url)
       
   189     {
       
   190         return (int)$this->getAttribute($queue_url, 'ApproximateNumberOfMessages');
       
   191     }
       
   192 
       
   193     /**
       
   194      * Send a message to the queue
       
   195      *
       
   196      * @param  string $queue_url Queue URL
       
   197      * @param  string $message   Message to send to the queue
       
   198      * @return string            Message ID
       
   199      * @throws Zend_Service_Amazon_Sqs_Exception
       
   200      */
       
   201     public function send($queue_url, $message)
       
   202     {
       
   203         $params = array();
       
   204         $params['MessageBody'] = urlencode($message);
       
   205 
       
   206         $checksum = md5($params['MessageBody']);
       
   207 
       
   208         $result = $this->_makeRequest($queue_url, 'SendMessage', $params);
       
   209 
       
   210         if (!isset($result->SendMessageResult->MessageId)
       
   211             || empty($result->SendMessageResult->MessageId)
       
   212         ) {
       
   213             require_once 'Zend/Service/Amazon/Sqs/Exception.php';
       
   214             throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code);
       
   215         } else if ((string) $result->SendMessageResult->MD5OfMessageBody != $checksum) {
       
   216             require_once 'Zend/Service/Amazon/Sqs/Exception.php';
       
   217             throw new Zend_Service_Amazon_Sqs_Exception('MD5 of body does not match message sent');
       
   218         }
       
   219 
       
   220         return (string) $result->SendMessageResult->MessageId;
       
   221     }
       
   222 
       
   223     /**
       
   224      * Get messages in the queue
       
   225      *
       
   226      * @param  string  $queue_url    Queue name
       
   227      * @param  integer $max_messages Maximum number of messages to return
       
   228      * @param  integer $timeout      Visibility timeout for these messages
       
   229      * @return array
       
   230      * @throws Zend_Service_Amazon_Sqs_Exception
       
   231      */
       
   232     public function receive($queue_url, $max_messages = null, $timeout = null)
       
   233     {
       
   234         $params = array();
       
   235 
       
   236         // If not set, the visibility timeout on the queue is used
       
   237         if ($timeout !== null) {
       
   238             $params['VisibilityTimeout'] = (int)$timeout;
       
   239         }
       
   240 
       
   241         // SQS will default to only returning one message
       
   242         if ($max_messages !== null) {
       
   243             $params['MaxNumberOfMessages'] = (int)$max_messages;
       
   244         }
       
   245 
       
   246         $result = $this->_makeRequest($queue_url, 'ReceiveMessage', $params);
       
   247 
       
   248         if (isset($result->Error)) {
       
   249             require_once 'Zend/Service/Amazon/Sqs/Exception.php';
       
   250             throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code);
       
   251         }
       
   252 
       
   253         if (!isset($result->ReceiveMessageResult->Message)
       
   254             || empty($result->ReceiveMessageResult->Message)
       
   255         ) {
       
   256             // no messages found
       
   257             return array();
       
   258         }
       
   259 
       
   260         $data = array();
       
   261         foreach ($result->ReceiveMessageResult->Message as $message) {
       
   262             $data[] = array(
       
   263                 'message_id' => (string)$message->MessageId,
       
   264                 'handle'     => (string)$message->ReceiptHandle,
       
   265                 'md5'        => (string)$message->MD5OfBody,
       
   266                 'body'       => urldecode((string)$message->Body),
       
   267             );
       
   268         }
       
   269 
       
   270         return $data;
       
   271     }
       
   272 
       
   273     /**
       
   274      * Delete a message from the queue
       
   275      *
       
   276      * Returns true if the message is deleted, false if the deletion is
       
   277      * unsuccessful.
       
   278      *
       
   279      * @param  string $queue_url  Queue URL
       
   280      * @param  string $handle     Message handle as returned by SQS
       
   281      * @return boolean
       
   282      * @throws Zend_Service_Amazon_Sqs_Exception
       
   283      */
       
   284     public function deleteMessage($queue_url, $handle)
       
   285     {
       
   286         $params = array();
       
   287         $params['ReceiptHandle'] = (string)$handle;
       
   288 
       
   289         $result = $this->_makeRequest($queue_url, 'DeleteMessage', $params);
       
   290 
       
   291         if (isset($result->Error->Code) 
       
   292             && !empty($result->Error->Code)
       
   293         ) {
       
   294             return false;
       
   295         }
       
   296 
       
   297         // Will always return true unless ReceiptHandle is malformed
       
   298         return true;
       
   299     }
       
   300 
       
   301     /**
       
   302      * Get the attributes for the queue
       
   303      *
       
   304      * @param  string $queue_url  Queue URL
       
   305      * @param  string $attribute
       
   306      * @return string
       
   307      * @throws Zend_Service_Amazon_Sqs_Exception
       
   308      */
       
   309     public function getAttribute($queue_url, $attribute = 'All')
       
   310     {
       
   311         $params = array();
       
   312         $params['AttributeName'] = $attribute;
       
   313 
       
   314         $result = $this->_makeRequest($queue_url, 'GetQueueAttributes', $params);
       
   315 
       
   316         if (!isset($result->GetQueueAttributesResult->Attribute)
       
   317             || empty($result->GetQueueAttributesResult->Attribute)
       
   318         ) {
       
   319             require_once 'Zend/Service/Amazon/Sqs/Exception.php';
       
   320             throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code);
       
   321         }
       
   322         
       
   323         if(count($result->GetQueueAttributesResult->Attribute) > 1) {
       
   324             $attr_result = array();
       
   325             foreach($result->GetQueueAttributesResult->Attribute as $attribute) {
       
   326                 $attr_result[(string)$attribute->Name] = (string)$attribute->Value;
       
   327             }
       
   328             return $attr_result;
       
   329         } else {
       
   330             return (string) $result->GetQueueAttributesResult->Attribute->Value;
       
   331         }
       
   332     }
       
   333 
       
   334     /**
       
   335      * Make a request to Amazon SQS
       
   336      *
       
   337      * @param  string           $queue  Queue Name
       
   338      * @param  string           $action SQS action
       
   339      * @param  array            $params
       
   340      * @return SimpleXMLElement
       
   341      */
       
   342     private function _makeRequest($queue_url, $action, $params = array())
       
   343     {
       
   344         $params['Action'] = $action;
       
   345         $params = $this->addRequiredParameters($queue_url, $params);
       
   346 
       
   347         if ($queue_url === null) {
       
   348             $queue_url = '/';
       
   349         }
       
   350 
       
   351         $client = self::getHttpClient();
       
   352 
       
   353         switch ($action) {
       
   354             case 'ListQueues':
       
   355             case 'CreateQueue':
       
   356                 $client->setUri('http://'.$this->_sqsEndpoint);
       
   357                 break;
       
   358             default:
       
   359                 $client->setUri($queue_url);
       
   360                 break;
       
   361         }
       
   362 
       
   363         $retry_count = 0;
       
   364 
       
   365         do {
       
   366             $retry = false;
       
   367 
       
   368             $client->resetParameters();
       
   369             $client->setParameterGet($params);
       
   370 
       
   371             $response = $client->request('GET');
       
   372 
       
   373             $response_code = $response->getStatus();
       
   374 
       
   375             // Some 5xx errors are expected, so retry automatically
       
   376             if ($response_code >= 500 && $response_code < 600 && $retry_count <= 5) {
       
   377                 $retry = true;
       
   378                 $retry_count++;
       
   379                 sleep($retry_count / 4 * $retry_count);
       
   380             }
       
   381         } while ($retry);
       
   382 
       
   383         unset($client);
       
   384 
       
   385         return new SimpleXMLElement($response->getBody());
       
   386     }
       
   387 
       
   388     /**
       
   389      * Adds required authentication and version parameters to an array of
       
   390      * parameters
       
   391      *
       
   392      * The required parameters are:
       
   393      * - AWSAccessKey
       
   394      * - SignatureVersion
       
   395      * - Timestamp
       
   396      * - Version and
       
   397      * - Signature
       
   398      *
       
   399      * If a required parameter is already set in the <tt>$parameters</tt> array,
       
   400      * it is overwritten.
       
   401      *
       
   402      * @param  string $queue_url  Queue URL
       
   403      * @param  array  $parameters the array to which to add the required
       
   404      *                            parameters.
       
   405      * @return array
       
   406      */
       
   407     protected function addRequiredParameters($queue_url, array $parameters)
       
   408     {
       
   409         $parameters['AWSAccessKeyId']   = $this->_getAccessKey();
       
   410         $parameters['SignatureVersion'] = $this->_sqsSignatureVersion;
       
   411         $parameters['Timestamp']        = gmdate('Y-m-d\TH:i:s\Z', time()+10);
       
   412         $parameters['Version']          = $this->_sqsApiVersion;
       
   413         $parameters['SignatureMethod']  = $this->_sqsSignatureMethod;
       
   414         $parameters['Signature']        = $this->_signParameters($queue_url, $parameters);
       
   415 
       
   416         return $parameters;
       
   417     }
       
   418 
       
   419     /**
       
   420      * Computes the RFC 2104-compliant HMAC signature for request parameters
       
   421      *
       
   422      * This implements the Amazon Web Services signature, as per the following
       
   423      * specification:
       
   424      *
       
   425      * 1. Sort all request parameters (including <tt>SignatureVersion</tt> and
       
   426      *    excluding <tt>Signature</tt>, the value of which is being created),
       
   427      *    ignoring case.
       
   428      *
       
   429      * 2. Iterate over the sorted list and append the parameter name (in its
       
   430      *    original case) and then its value. Do not URL-encode the parameter
       
   431      *    values before constructing this string. Do not use any separator
       
   432      *    characters when appending strings.
       
   433      *
       
   434      * @param  string $queue_url  Queue URL
       
   435      * @param  array  $parameters the parameters for which to get the signature.
       
   436      *
       
   437      * @return string the signed data.
       
   438      */
       
   439     protected function _signParameters($queue_url, array $paramaters)
       
   440     {
       
   441         $data = "GET\n";
       
   442         $data .= $this->_sqsEndpoint . "\n";
       
   443         if ($queue_url !== null) {
       
   444             $data .= parse_url($queue_url, PHP_URL_PATH);
       
   445         }
       
   446         else {
       
   447             $data .= '/';
       
   448         }
       
   449         $data .= "\n";
       
   450 
       
   451         uksort($paramaters, 'strcmp');
       
   452         unset($paramaters['Signature']);
       
   453 
       
   454         $arrData = array();
       
   455         foreach($paramaters as $key => $value) {
       
   456             $arrData[] = $key . '=' . str_replace('%7E', '~', urlencode($value));
       
   457         }
       
   458 
       
   459         $data .= implode('&', $arrData);
       
   460 
       
   461         $hmac = Zend_Crypt_Hmac::compute($this->_getSecretKey(), 'SHA256', $data, Zend_Crypt_Hmac::BINARY);
       
   462 
       
   463         return base64_encode($hmac);
       
   464     }
       
   465 }