web/lib/Zend/Cloud/QueueService/Adapter/ZendQueue.php
changeset 64 162c1de6545a
parent 19 1c2f13fd785c
child 68 ecaf28ffe26e
equal deleted inserted replaced
63:5b37998e522e 64:162c1de6545a
       
     1 <?php
       
     2 /**
       
     3  * LICENSE
       
     4  *
       
     5  * This source file is subject to the new BSD license that is bundled
       
     6  * with this package in the file LICENSE.txt.
       
     7  * It is also available through the world-wide-web at this URL:
       
     8  * http://framework.zend.com/license/new-bsd
       
     9  * If you did not receive a copy of the license and are unable to
       
    10  * obtain it through the world-wide-web, please send an email
       
    11  * to license@zend.com so we can send you a copy immediately.
       
    12  *
       
    13  * @category   Zend
       
    14  * @package    Zend_Cloud
       
    15  * @subpackage QueueService
       
    16  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    17  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    18  */
       
    19 
       
    20 require_once 'Zend/Cloud/QueueService/Adapter/AbstractAdapter.php';
       
    21 require_once 'Zend/Cloud/QueueService/Exception.php';
       
    22 require_once 'Zend/Queue.php';
       
    23 
       
    24 /**
       
    25  * WindowsAzure adapter for simple queue service.
       
    26  *
       
    27  * @category   Zend
       
    28  * @package    Zend_Cloud
       
    29  * @subpackage QueueService
       
    30  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    31  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    32  */
       
    33 class Zend_Cloud_QueueService_Adapter_ZendQueue 
       
    34     extends Zend_Cloud_QueueService_Adapter_AbstractAdapter
       
    35 {
       
    36     /**
       
    37      * Options array keys for the Zend_Queue adapter.
       
    38      */
       
    39     const ADAPTER = 'adapter';
       
    40     
       
    41     /**
       
    42      * Storage client
       
    43      * 
       
    44      * @var Zend_Queue
       
    45      */
       
    46     protected $_queue = null;
       
    47     
       
    48     /**
       
    49      * @var array All queues
       
    50      */
       
    51     protected $_queues = array();
       
    52     
       
    53     /**
       
    54      * Constructor
       
    55      * 
       
    56      * @param  array|Zend_Config $options 
       
    57      * @return void
       
    58      */
       
    59     public function __construct ($options = array())
       
    60     {
       
    61         if ($options instanceof Zend_Config) {
       
    62             $options = $options->toArray();
       
    63         }
       
    64 
       
    65         if (!is_array($options)) {
       
    66             throw new Zend_Cloud_QueueService_Exception('Invalid options provided');
       
    67         }
       
    68 
       
    69         if (isset($options[self::MESSAGE_CLASS])) {
       
    70             $this->setMessageClass($options[self::MESSAGE_CLASS]);
       
    71         }
       
    72 
       
    73         if (isset($options[self::MESSAGESET_CLASS])) {
       
    74             $this->setMessageSetClass($options[self::MESSAGESET_CLASS]);
       
    75         }
       
    76 
       
    77         // Build Zend_Service_WindowsAzure_Storage_Blob instance
       
    78         if (!isset($options[self::ADAPTER])) {
       
    79             throw new Zend_Cloud_QueueService_Exception('No Zend_Queue adapter provided');
       
    80         } else {
       
    81             $adapter = $options[self::ADAPTER];
       
    82             unset($options[self::ADAPTER]);
       
    83         }
       
    84         try {
       
    85             $this->_queue = new Zend_Queue($adapter, $options);
       
    86         } catch (Zend_Queue_Exception $e) {
       
    87             throw new Zend_Cloud_QueueService_Exception('Error on create: '.$e->getMessage(), $e->getCode(), $e);
       
    88         }
       
    89     }
       
    90     
       
    91     /**
       
    92      * Create a queue. Returns the ID of the created queue (typically the URL).
       
    93      * It may take some time to create the queue. Check your vendor's
       
    94      * documentation for details.
       
    95      *
       
    96      * @param  string $name
       
    97      * @param  array  $options
       
    98      * @return string Queue ID (typically URL)
       
    99      */
       
   100     public function createQueue($name, $options = null)
       
   101     {
       
   102         try {
       
   103             $this->_queues[$name] = $this->_queue->createQueue($name, isset($options[Zend_Queue::TIMEOUT])?$options[Zend_Queue::TIMEOUT]:null);
       
   104             return $name;
       
   105         } catch (Zend_Queue_Exception $e) {
       
   106             throw new Zend_Cloud_QueueService_Exception('Error on queue creation: '.$e->getMessage(), $e->getCode(), $e);
       
   107         }
       
   108     }
       
   109     
       
   110     /**
       
   111      * Delete a queue. All messages in the queue will also be deleted.
       
   112      *
       
   113      * @param  string $queueId
       
   114      * @param  array  $options
       
   115      * @return boolean true if successful, false otherwise
       
   116      */
       
   117     public function deleteQueue($queueId, $options = null)
       
   118     {
       
   119         if (!isset($this->_queues[$queueId])) {
       
   120             return false;
       
   121         }
       
   122         try {
       
   123             if ($this->_queues[$queueId]->deleteQueue()) {
       
   124                 unset($this->_queues[$queueId]);
       
   125                 return true;
       
   126             }
       
   127         } catch (Zend_Queue_Exception $e) {
       
   128             throw new Zend_Cloud_QueueService_Exception('Error on queue deletion: '.$e->getMessage(), $e->getCode(), $e);
       
   129         }
       
   130     }
       
   131     
       
   132     /**
       
   133      * List all queues.
       
   134      *
       
   135      * @param  array $options
       
   136      * @return array Queue IDs
       
   137      */
       
   138     public function listQueues($options = null)
       
   139     {
       
   140         try {
       
   141             return $this->_queue->getQueues();
       
   142         } catch (Zend_Queue_Exception $e) {
       
   143             throw new Zend_Cloud_QueueService_Exception('Error on listing queues: '.$e->getMessage(), $e->getCode(), $e);
       
   144         }
       
   145     }
       
   146     
       
   147     /**
       
   148      * Get a key/value array of metadata for the given queue.
       
   149      *
       
   150      * @param  string $queueId
       
   151      * @param  array  $options
       
   152      * @return array
       
   153      */
       
   154     public function fetchQueueMetadata($queueId, $options = null)
       
   155     {
       
   156         if (!isset($this->_queues[$queueId])) {
       
   157             return false;
       
   158         }
       
   159         try {
       
   160             return $this->_queues[$queueId]->getOptions();
       
   161         } catch (Zend_Queue_Exception $e) {
       
   162             throw new Zend_Cloud_QueueService_Exception('Error on fetching queue metadata: '.$e->getMessage(), $e->getCode(), $e);
       
   163         }
       
   164     }
       
   165     
       
   166     /**
       
   167      * Store a key/value array of metadata for the specified queue.
       
   168      * WARNING: This operation overwrites any metadata that is located at 
       
   169      * $destinationPath. Some adapters may not support this method.
       
   170      * 
       
   171      * @param  string $queueId
       
   172      * @param  array  $metadata
       
   173      * @param  array  $options
       
   174      * @return void
       
   175      */
       
   176     public function storeQueueMetadata($queueId, $metadata, $options = null)
       
   177     {
       
   178         if (!isset($this->_queues[$queueId])) {
       
   179             throw new Zend_Cloud_QueueService_Exception("No such queue: $queueId");
       
   180         }
       
   181         try {
       
   182             return $this->_queues[$queueId]->setOptions($metadata);
       
   183         } catch (Zend_Queue_Exception $e) {
       
   184             throw new Zend_Cloud_QueueService_Exception('Error on setting queue metadata: '.$e->getMessage(), $e->getCode(), $e);
       
   185         }
       
   186     }
       
   187     
       
   188     /**
       
   189      * Send a message to the specified queue.
       
   190      * 
       
   191      * @param  string $queueId
       
   192      * @param  string $message
       
   193      * @param  array  $options
       
   194      * @return string Message ID
       
   195      */
       
   196     public function sendMessage($queueId, $message, $options = null)
       
   197     {
       
   198         if (!isset($this->_queues[$queueId])) {
       
   199             throw new Zend_Cloud_QueueService_Exception("No such queue: $queueId");
       
   200         }
       
   201         try {
       
   202             return $this->_queues[$queueId]->send($message);
       
   203         } catch (Zend_Queue_Exception $e) {
       
   204             throw new Zend_Cloud_QueueService_Exception('Error on sending message: '.$e->getMessage(), $e->getCode(), $e);
       
   205         }
       
   206     }
       
   207     
       
   208     /**
       
   209      * Recieve at most $max messages from the specified queue and return the
       
   210      * message IDs for messages recieved.
       
   211      * 
       
   212      * @param  string $queueId
       
   213      * @param  int    $max
       
   214      * @param  array  $options
       
   215      * @return array
       
   216      */
       
   217     public function receiveMessages($queueId, $max = 1, $options = null)
       
   218     {
       
   219         if (!isset($this->_queues[$queueId])) {
       
   220             throw new Zend_Cloud_QueueService_Exception("No such queue: $queueId");
       
   221         }
       
   222         try {
       
   223             $res = $this->_queues[$queueId]->receive($max, isset($options[Zend_Queue::TIMEOUT])?$options[Zend_Queue::TIMEOUT]:null);
       
   224             if ($res instanceof Iterator) {
       
   225                 return $this->_makeMessages($res);
       
   226             } else {
       
   227                 return $this->_makeMessages(array($res));
       
   228             }
       
   229         } catch (Zend_Queue_Exception $e) {
       
   230             throw new Zend_Cloud_QueueService_Exception('Error on recieving messages: '.$e->getMessage(), $e->getCode(), $e);
       
   231         }
       
   232     }
       
   233     
       
   234     /**
       
   235      * Create Zend_Cloud_QueueService_Message array for
       
   236      * Azure messages.
       
   237      *  
       
   238      * @param array $messages
       
   239      * @return Zend_Cloud_QueueService_Message[]
       
   240      */
       
   241     protected function _makeMessages($messages)
       
   242     {
       
   243         $messageClass = $this->getMessageClass();
       
   244         $setClass     = $this->getMessageSetClass();
       
   245         $result = array();
       
   246         foreach ($messages as $message) {
       
   247             $result[] = new $messageClass($message->body, $message);
       
   248         }
       
   249         return new $setClass($result);
       
   250     }
       
   251     
       
   252     /**
       
   253      * Delete the specified message from the specified queue.
       
   254      * 
       
   255      * @param  string $queueId
       
   256      * @param  Zend_Cloud_QueueService_Message $message Message ID or message 
       
   257      * @param  array  $options
       
   258      * @return void
       
   259      */
       
   260     public function deleteMessage($queueId, $message, $options = null)
       
   261     {
       
   262         if (!isset($this->_queues[$queueId])) {
       
   263             throw new Zend_Cloud_QueueService_Exception("No such queue: $queueId");
       
   264         }
       
   265         try {
       
   266             if ($message instanceof Zend_Cloud_QueueService_Message) {
       
   267                 $message = $message->getMessage();
       
   268             }
       
   269             if (!($message instanceof Zend_Queue_Message)) {
       
   270                 throw new Zend_Cloud_QueueService_Exception('Cannot delete the message: Zend_Queue_Message object required');
       
   271             }
       
   272             
       
   273             return $this->_queues[$queueId]->deleteMessage($message);
       
   274         } catch (Zend_Queue_Exception $e) {
       
   275             throw new Zend_Cloud_QueueService_Exception('Error on deleting a message: '.$e->getMessage(), $e->getCode(), $e);
       
   276         }
       
   277     }
       
   278     
       
   279     /**
       
   280      * Peek at the messages from the specified queue without removing them.
       
   281      *
       
   282      * @param  string $queueId
       
   283      * @param  int $num How many messages
       
   284      * @param  array  $options
       
   285      * @return Zend_Cloud_QueueService_Message[]
       
   286      */
       
   287     public function peekMessages($queueId, $num = 1, $options = null)
       
   288     {
       
   289         require_once 'Zend/Cloud/OperationNotAvailableException.php';
       
   290         throw new Zend_Cloud_OperationNotAvailableException('ZendQueue doesn\'t currently support message peeking');
       
   291     }
       
   292     
       
   293     /**
       
   294      * Get Azure implementation
       
   295      * @return Zend_Queue 
       
   296      */
       
   297     public function getClient()
       
   298     {
       
   299         return $this->_queue;
       
   300     }
       
   301 }