web/lib/Zend/Queue/Adapter/Activemq.php
changeset 64 162c1de6545a
parent 19 1c2f13fd785c
child 68 ecaf28ffe26e
equal deleted inserted replaced
63:5b37998e522e 64:162c1de6545a
       
     1 <?php
       
     2 /**
       
     3  * Zend Framework
       
     4  *
       
     5  * LICENSE
       
     6  *
       
     7  * This source file is subject to the new BSD license that is bundled
       
     8  * with this package in the file LICENSE.txt.
       
     9  * It is also available through the world-wide-web at this URL:
       
    10  * http://framework.zend.com/license/new-bsd
       
    11  * If you did not receive a copy of the license and are unable to
       
    12  * obtain it through the world-wide-web, please send an email
       
    13  * to license@zend.com so we can send you a copy immediately.
       
    14  *
       
    15  * @category   Zend
       
    16  * @package    Zend_Queue
       
    17  * @subpackage Adapter
       
    18  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    19  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    20  * @version    $Id: Activemq.php 20096 2010-01-06 02:05:09Z bkarwin $
       
    21  */
       
    22 
       
    23 /**
       
    24  * @see Zend_Queue_Adapter_AdapterAbstract
       
    25  */
       
    26 require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
       
    27 
       
    28 /**
       
    29  * @see Zend_Queue_Adapter_Stomp_Client
       
    30  */
       
    31 require_once 'Zend/Queue/Stomp/Client.php';
       
    32 
       
    33 /**
       
    34  * @see Zend_Queue_Adapter_Stomp_Frame
       
    35  */
       
    36 require_once 'Zend/Queue/Stomp/Frame.php';
       
    37 
       
    38 /**
       
    39  * Class for using Stomp to talk to an Stomp compliant server
       
    40  *
       
    41  * @category   Zend
       
    42  * @package    Zend_Queue
       
    43  * @subpackage Adapter
       
    44  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    45  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    46  */
       
    47 class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract
       
    48 {
       
    49     const DEFAULT_SCHEME = 'tcp';
       
    50     const DEFAULT_HOST   = '127.0.0.1';
       
    51     const DEFAULT_PORT   = 61613;
       
    52 
       
    53     /**
       
    54      * @var Zend_Queue_Adapter_Stomp_client
       
    55      */
       
    56     private $_client = null;
       
    57 
       
    58     /**
       
    59      * Constructor
       
    60      *
       
    61      * @param  array|Zend_Config $config An array having configuration data
       
    62      * @param  Zend_Queue The Zend_Queue object that created this class
       
    63      * @return void
       
    64      */
       
    65     public function __construct($options, Zend_Queue $queue = null)
       
    66     {
       
    67         parent::__construct($options);
       
    68 
       
    69         $options = &$this->_options['driverOptions'];
       
    70         if (!array_key_exists('scheme', $options)) {
       
    71             $options['scheme'] = self::DEFAULT_SCHEME;
       
    72         }
       
    73         if (!array_key_exists('host', $options)) {
       
    74             $options['host'] = self::DEFAULT_HOST;
       
    75         }
       
    76         if (!array_key_exists('port', $options)) {
       
    77             $options['port'] = self::DEFAULT_PORT;
       
    78         }
       
    79 
       
    80         if (array_key_exists('stompClient', $options)) {
       
    81             $this->_client = $options['stompClient'];
       
    82         } else {
       
    83             $this->_client = new Zend_Queue_Stomp_Client($options['scheme'], $options['host'], $options['port']);
       
    84         }
       
    85 
       
    86         $connect = $this->_client->createFrame();
       
    87 
       
    88         // Username and password are optional on some messaging servers
       
    89         // such as Apache's ActiveMQ
       
    90         $connect->setCommand('CONNECT');
       
    91         if (isset($options['username'])) {
       
    92             $connect->setHeader('login', $options['username']);
       
    93             $connect->setHeader('passcode', $options['password']);
       
    94         }
       
    95 
       
    96         $response = $this->_client->send($connect)->receive();
       
    97 
       
    98         if ((false !== $response)
       
    99             && ($response->getCommand() != 'CONNECTED')
       
   100         ) {
       
   101             require_once 'Zend/Queue/Exception.php';
       
   102             throw new Zend_Queue_Exception("Unable to authenticate to '".$options['scheme'].'://'.$options['host'].':'.$options['port']."'");
       
   103         }
       
   104     }
       
   105 
       
   106     /**
       
   107      * Close the socket explicitly when destructed
       
   108      *
       
   109      * @return void
       
   110      */
       
   111     public function __destruct()
       
   112     {
       
   113         // Gracefully disconnect
       
   114         $frame = $this->_client->createFrame();
       
   115         $frame->setCommand('DISCONNECT');
       
   116         $this->_client->send($frame);
       
   117         unset($this->_client);
       
   118     }
       
   119 
       
   120     /**
       
   121      * Create a new queue
       
   122      *
       
   123      * @param  string  $name    queue name
       
   124      * @param  integer $timeout default visibility timeout
       
   125      * @return void
       
   126      * @throws Zend_Queue_Exception
       
   127      */
       
   128     public function create($name, $timeout=null)
       
   129     {
       
   130         require_once 'Zend/Queue/Exception.php';
       
   131         throw new Zend_Queue_Exception('create() is not supported in ' . get_class($this));
       
   132     }
       
   133 
       
   134     /**
       
   135      * Delete a queue and all of its messages
       
   136      *
       
   137      * @param  string $name queue name
       
   138      * @return void
       
   139      * @throws Zend_Queue_Exception
       
   140      */
       
   141     public function delete($name)
       
   142     {
       
   143         require_once 'Zend/Queue/Exception.php';
       
   144         throw new Zend_Queue_Exception('delete() is not supported in ' . get_class($this));
       
   145     }
       
   146 
       
   147     /**
       
   148      * Delete a message from the queue
       
   149      *
       
   150      * Returns true if the message is deleted, false if the deletion is
       
   151      * unsuccessful.
       
   152      *
       
   153      * @param  Zend_Queue_Message $message
       
   154      * @return boolean
       
   155      */
       
   156     public function deleteMessage(Zend_Queue_Message $message)
       
   157     {
       
   158         $frame = $this->_client->createFrame();
       
   159         $frame->setCommand('ACK');
       
   160         $frame->setHeader('message-id', $message->handle);
       
   161 
       
   162         $this->_client->send($frame);
       
   163 
       
   164         return true;
       
   165     }
       
   166 
       
   167     /**
       
   168      * Get an array of all available queues
       
   169      *
       
   170      * @return void
       
   171      * @throws Zend_Queue_Exception
       
   172      */
       
   173     public function getQueues()
       
   174     {
       
   175         require_once 'Zend/Queue/Exception.php';
       
   176         throw new Zend_Queue_Exception('getQueues() is not supported in this adapter');
       
   177     }
       
   178 
       
   179     /**
       
   180      * Return the first element in the queue
       
   181      *
       
   182      * @param  integer    $maxMessages
       
   183      * @param  integer    $timeout
       
   184      * @param  Zend_Queue $queue
       
   185      * @return Zend_Queue_Message_Iterator
       
   186      */
       
   187     public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null)
       
   188     {
       
   189         if ($maxMessages === null) {
       
   190             $maxMessages = 1;
       
   191         }
       
   192         if ($timeout === null) {
       
   193             $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
       
   194         }
       
   195         if ($queue === null) {
       
   196             $queue = $this->_queue;
       
   197         }
       
   198 
       
   199         // read
       
   200         $data = array();
       
   201 
       
   202         // signal that we are reading
       
   203         $frame = $this->_client->createFrame();
       
   204         $frame->setCommand('SUBSCRIBE');
       
   205         $frame->setHeader('destination', $queue->getName());
       
   206         $frame->setHeader('ack','client');
       
   207         $this->_client->send($frame);
       
   208 
       
   209         if ($maxMessages > 0) {
       
   210             if ($this->_client->canRead()) {
       
   211                 for ($i = 0; $i < $maxMessages; $i++) {
       
   212                     $response = $this->_client->receive();
       
   213 
       
   214                     switch ($response->getCommand()) {
       
   215                         case 'MESSAGE':
       
   216                             $datum = array(
       
   217                                 'message_id' => $response->getHeader('message-id'),
       
   218                                 'handle'     => $response->getHeader('message-id'),
       
   219                                 'body'       => $response->getBody(),
       
   220                                 'md5'        => md5($response->getBody())
       
   221                             );
       
   222                             $data[] = $datum;
       
   223                             break;
       
   224                         default:
       
   225                             $block = print_r($response, true);
       
   226                             require_once 'Zend/Queue/Exception.php';
       
   227                             throw new Zend_Queue_Exception('Invalid response received: ' . $block);
       
   228                     }
       
   229                 }
       
   230             }
       
   231         }
       
   232 
       
   233         $options = array(
       
   234             'queue'        => $queue,
       
   235             'data'         => $data,
       
   236             'messageClass' => $queue->getMessageClass()
       
   237         );
       
   238 
       
   239         $classname = $queue->getMessageSetClass();
       
   240 
       
   241         if (!class_exists($classname)) {
       
   242             require_once 'Zend/Loader.php';
       
   243             Zend_Loader::loadClass($classname);
       
   244         }
       
   245         return new $classname($options);
       
   246     }
       
   247 
       
   248     /**
       
   249      * Push an element onto the end of the queue
       
   250      *
       
   251      * @param  string     $message message to send to the queue
       
   252      * @param  Zend_Queue $queue
       
   253      * @return Zend_Queue_Message
       
   254      */
       
   255     public function send($message, Zend_Queue $queue=null)
       
   256     {
       
   257         if ($queue === null) {
       
   258             $queue = $this->_queue;
       
   259         }
       
   260 
       
   261         $frame = $this->_client->createFrame();
       
   262         $frame->setCommand('SEND');
       
   263         $frame->setHeader('destination', $queue->getName());
       
   264         $frame->setHeader('content-length', strlen($message));
       
   265         $frame->setBody((string) $message);
       
   266         $this->_client->send($frame);
       
   267 
       
   268         $data = array(
       
   269             'message_id' => null,
       
   270             'body'       => $message,
       
   271             'md5'        => md5($message),
       
   272             'handle'     => null
       
   273         );
       
   274 
       
   275         $options = array(
       
   276             'queue' => $queue,
       
   277             'data'  => $data
       
   278         );
       
   279 
       
   280         $classname = $queue->getMessageClass();
       
   281         if (!class_exists($classname)) {
       
   282             require_once 'Zend/Loader.php';
       
   283             Zend_Loader::loadClass($classname);
       
   284         }
       
   285         return new $classname($options);
       
   286     }
       
   287 
       
   288     /**
       
   289      * Returns the length of the queue
       
   290      *
       
   291      * @param  Zend_Queue $queue
       
   292      * @return integer
       
   293      * @throws Zend_Queue_Exception (not supported)
       
   294      */
       
   295     public function count(Zend_Queue $queue=null)
       
   296     {
       
   297         require_once 'Zend/Queue/Exception.php';
       
   298         throw new Zend_Queue_Exception('count() is not supported in this adapter');
       
   299     }
       
   300 
       
   301     /**
       
   302      * Does a queue already exist?
       
   303      *
       
   304      * @param  string $name
       
   305      * @return boolean
       
   306      * @throws Zend_Queue_Exception (not supported)
       
   307      */
       
   308     public function isExists($name)
       
   309     {
       
   310         require_once 'Zend/Queue/Exception.php';
       
   311         throw new Zend_Queue_Exception('isExists() is not supported in this adapter');
       
   312     }
       
   313 
       
   314     /**
       
   315      * Return a list of queue capabilities functions
       
   316      *
       
   317      * $array['function name'] = true or false
       
   318      * true is supported, false is not supported.
       
   319      *
       
   320      * @param  string $name
       
   321      * @return array
       
   322      */
       
   323     public function getCapabilities()
       
   324     {
       
   325         return array(
       
   326             'create'        => false,
       
   327             'delete'        => false,
       
   328             'send'          => true,
       
   329             'receive'       => true,
       
   330             'deleteMessage' => true,
       
   331             'getQueues'     => false,
       
   332             'count'         => false,
       
   333             'isExists'      => false,
       
   334         );
       
   335     }
       
   336 }