web/lib/Zend/Cloud/QueueService/Adapter/Sqs.php
changeset 64 162c1de6545a
parent 19 1c2f13fd785c
child 68 ecaf28ffe26e
equal deleted inserted replaced
63:5b37998e522e 64:162c1de6545a
       
     1 <?php
       
     2 /**
       
     3  * LICENSE
       
     4  *
       
     5  * This source file is subject to the new BSD license that is bundled
       
     6  * with this package in the file LICENSE.txt.
       
     7  * It is also available through the world-wide-web at this URL:
       
     8  * http://framework.zend.com/license/new-bsd
       
     9  * If you did not receive a copy of the license and are unable to
       
    10  * obtain it through the world-wide-web, please send an email
       
    11  * to license@zend.com so we can send you a copy immediately.
       
    12  *
       
    13  * @category   Zend
       
    14  * @package    Zend_Cloud
       
    15  * @subpackage QueueService
       
    16  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    17  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    18  */
       
    19 
       
    20 require_once 'Zend/Service/Amazon/Sqs.php';
       
    21 require_once 'Zend/Cloud/QueueService/Adapter/AbstractAdapter.php';
       
    22 require_once 'Zend/Cloud/QueueService/Exception.php';
       
    23 require_once 'Zend/Cloud/QueueService/Message.php';
       
    24 
       
    25 /**
       
    26  * SQS adapter for simple queue service.
       
    27  *
       
    28  * @category   Zend
       
    29  * @package    Zend_Cloud
       
    30  * @subpackage QueueService
       
    31  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    32  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    33  */
       
    34 class Zend_Cloud_QueueService_Adapter_Sqs 
       
    35     extends Zend_Cloud_QueueService_Adapter_AbstractAdapter
       
    36 {
       
    37     /*
       
    38      * Options array keys for the SQS adapter.
       
    39      */
       
    40     const AWS_ACCESS_KEY = 'aws_accesskey';
       
    41     const AWS_SECRET_KEY = 'aws_secretkey';
       
    42 
       
    43     /**
       
    44      * Defaults
       
    45      */
       
    46     const CREATE_TIMEOUT = 30;
       
    47 
       
    48     /**
       
    49      * SQS service instance.
       
    50      * @var Zend_Service_Amazon_Sqs
       
    51      */
       
    52     protected $_sqs;
       
    53 
       
    54     /**
       
    55      * Constructor
       
    56      * 
       
    57      * @param  array|Zend_Config $options 
       
    58      * @return void
       
    59      */
       
    60     public function __construct($options = array()) 
       
    61     {
       
    62         if ($options instanceof Zend_Config) {
       
    63             $options = $options->toArray();
       
    64         }
       
    65 
       
    66         if (!is_array($options)) {
       
    67             throw new Zend_Cloud_QueueService_Exception('Invalid options provided');
       
    68         }
       
    69 
       
    70         if (isset($options[self::MESSAGE_CLASS])) {
       
    71             $this->setMessageClass($options[self::MESSAGE_CLASS]);
       
    72         }
       
    73 
       
    74         if (isset($options[self::MESSAGESET_CLASS])) {
       
    75             $this->setMessageSetClass($options[self::MESSAGESET_CLASS]);
       
    76         }
       
    77 
       
    78         try {
       
    79             $this->_sqs = new Zend_Service_Amazon_Sqs(
       
    80                 $options[self::AWS_ACCESS_KEY], $options[self::AWS_SECRET_KEY]
       
    81             );
       
    82         } catch(Zend_Service_Amazon_Exception $e) {
       
    83             throw new Zend_Cloud_QueueService_Exception('Error on create: '.$e->getMessage(), $e->getCode(), $e);
       
    84         }
       
    85 
       
    86         if(isset($options[self::HTTP_ADAPTER])) {
       
    87             $this->_sqs->getHttpClient()->setAdapter($options[self::HTTP_ADAPTER]);
       
    88         } 
       
    89     }
       
    90 
       
    91      /**
       
    92      * Create a queue. Returns the ID of the created queue (typically the URL).
       
    93      * It may take some time to create the queue. Check your vendor's
       
    94      * documentation for details.
       
    95      *
       
    96      * @param  string $name
       
    97      * @param  array  $options
       
    98      * @return string Queue ID (typically URL)
       
    99      */
       
   100     public function createQueue($name, $options = null) 
       
   101     {
       
   102         try {
       
   103             return $this->_sqs->create($name, $options[self::CREATE_TIMEOUT]);
       
   104         } catch(Zend_Service_Amazon_Exception $e) {
       
   105             throw new Zend_Cloud_QueueService_Exception('Error on queue creation: '.$e->getMessage(), $e->getCode(), $e);
       
   106         }
       
   107     }
       
   108 
       
   109     /**
       
   110      * Delete a queue. All messages in the queue will also be deleted.
       
   111      *
       
   112      * @param  string $queueId
       
   113      * @param  array  $options
       
   114      * @return boolean true if successful, false otherwise
       
   115      */
       
   116     public function deleteQueue($queueId, $options = null) 
       
   117 {
       
   118         try {
       
   119             return $this->_sqs->delete($queueId);
       
   120         } catch(Zend_Service_Amazon_Exception $e) {
       
   121             throw Zend_Cloud_QueueService_Exception('Error on queue deletion: '.$e->getMessage(), $e->getCode(), $e);
       
   122         }
       
   123     }
       
   124 
       
   125     /**
       
   126      * List all queues.
       
   127      *
       
   128      * @param  array $options
       
   129      * @return array Queue IDs
       
   130      */
       
   131     public function listQueues($options = null) 
       
   132     {
       
   133         try {
       
   134             return $this->_sqs->getQueues();
       
   135         } catch(Zend_Service_Amazon_Exception $e) {
       
   136             throw new Zend_Cloud_QueueService_Exception('Error on listing queues: '.$e->getMessage(), $e->getCode(), $e);
       
   137         }
       
   138     }
       
   139 
       
   140     /**
       
   141      * Get a key/value array of metadata for the given queue.
       
   142      *
       
   143      * @param  string $queueId
       
   144      * @param  array  $options
       
   145      * @return array
       
   146      */
       
   147     public function fetchQueueMetadata($queueId, $options = null) 
       
   148     {
       
   149         try {
       
   150             // TODO: ZF-9050 Fix the SQS client library in trunk to return all attribute values
       
   151             $attributes = $this->_sqs->getAttribute($queueId, 'All');
       
   152             if(is_array($attributes)) {
       
   153                 return $attributes;
       
   154             } else {
       
   155                 return array('All' => $this->_sqs->getAttribute($queueId, 'All'));
       
   156             }
       
   157         } catch(Zend_Service_Amazon_Exception $e) {
       
   158             throw new Zend_Cloud_QueueService_Exception('Error on fetching queue metadata: '.$e->getMessage(), $e->getCode(), $e);
       
   159         }
       
   160     }
       
   161 
       
   162     /**
       
   163      * Store a key/value array of metadata for the specified queue.
       
   164      * WARNING: This operation overwrites any metadata that is located at
       
   165      * $destinationPath. Some adapters may not support this method.
       
   166      *
       
   167      * @param  array  $metadata
       
   168      * @param  string $queueId
       
   169      * @param  array  $options
       
   170      * @return void
       
   171      */
       
   172     public function storeQueueMetadata($queueId, $metadata, $options = null) 
       
   173     {
       
   174         // TODO Add support for SetQueueAttributes to client library
       
   175         require_once 'Zend/Cloud/OperationNotAvailableException.php';
       
   176         throw new Zend_Cloud_OperationNotAvailableException('Amazon SQS doesn\'t currently support storing metadata');
       
   177     }
       
   178 
       
   179     /**
       
   180      * Send a message to the specified queue.
       
   181      *
       
   182      * @param  string $message
       
   183      * @param  string $queueId
       
   184      * @param  array  $options
       
   185      * @return string Message ID
       
   186      */
       
   187     public function sendMessage($queueId, $message, $options = null) 
       
   188     {
       
   189         try {
       
   190             return $this->_sqs->send($queueId, $message);
       
   191         } catch(Zend_Service_Amazon_Exception $e) {
       
   192             throw new Zend_Cloud_QueueService_Exception('Error on sending message: '.$e->getMessage(), $e->getCode(), $e);
       
   193         }
       
   194     }
       
   195 
       
   196     /**
       
   197      * Recieve at most $max messages from the specified queue and return the
       
   198      * message IDs for messages recieved.
       
   199      *
       
   200      * @param  string $queueId
       
   201      * @param  int    $max
       
   202      * @param  array  $options
       
   203      * @return array
       
   204      */
       
   205     public function receiveMessages($queueId, $max = 1, $options = null) 
       
   206     {
       
   207         try {
       
   208             return $this->_makeMessages($this->_sqs->receive($queueId, $max, $options[self::VISIBILITY_TIMEOUT]));
       
   209         } catch(Zend_Service_Amazon_Exception $e) {
       
   210             throw new Zend_Cloud_QueueService_Exception('Error on recieving messages: '.$e->getMessage(), $e->getCode(), $e);
       
   211         }
       
   212     }
       
   213     
       
   214     /**
       
   215      * Create Zend_Cloud_QueueService_Message array for
       
   216      * Sqs messages.
       
   217      *  
       
   218      * @param array $messages
       
   219      * @return Zend_Cloud_QueueService_Message[]
       
   220      */
       
   221     protected function _makeMessages($messages)
       
   222     {
       
   223         $messageClass = $this->getMessageClass();
       
   224         $setClass     = $this->getMessageSetClass();
       
   225         $result = array();
       
   226         foreach($messages as $message) {
       
   227             $result[] = new $messageClass($message['body'], $message);
       
   228         }
       
   229         return new $setClass($result);
       
   230     }
       
   231 
       
   232     /**
       
   233      * Delete the specified message from the specified queue.
       
   234      *
       
   235      * @param  string $queueId
       
   236      * @param  Zend_Cloud_QueueService_Message $message
       
   237      * @param  array  $options
       
   238      * @return void
       
   239      */
       
   240     public function deleteMessage($queueId, $message, $options = null) 
       
   241     {
       
   242         try {
       
   243             if($message instanceof Zend_Cloud_QueueService_Message) {
       
   244                 $message = $message->getMessage();
       
   245             }
       
   246             $messageId = $message['handle'];
       
   247             return $this->_sqs->deleteMessage($queueId, $messageId);
       
   248         } catch(Zend_Service_Amazon_Exception $e) {
       
   249             throw new Zend_Cloud_QueueService_Exception('Error on deleting a message: '.$e->getMessage(), $e->getCode(), $e);
       
   250         }
       
   251     }
       
   252     
       
   253     /**
       
   254      * Peek at the messages from the specified queue without removing them.
       
   255      *
       
   256      * @param  string $queueId
       
   257      * @param  int $num How many messages
       
   258      * @param  array  $options
       
   259      * @return Zend_Cloud_QueueService_Message[]
       
   260      */
       
   261     public function peekMessages($queueId, $num = 1, $options = null)
       
   262     {
       
   263         try {
       
   264             return $this->_makeMessages($this->_sqs->receive($queueId, $num, 0));
       
   265         } catch(Zend_Service_Amazon_Exception $e) {
       
   266             throw new Zend_Cloud_QueueService_Exception('Error on peeking messages: '.$e->getMessage(), $e->getCode(), $e);
       
   267         }
       
   268     }
       
   269 
       
   270     /**
       
   271      * Get SQS implementation
       
   272      * @return Zend_Service_Amazon_Sqs 
       
   273      */
       
   274     public function getClient()
       
   275     {
       
   276         return $this->_sqs;
       
   277     }
       
   278 }