web/lib/Zend/Queue/Adapter/Activemq.php
changeset 807 877f952ae2bd
parent 207 621fa6caec0c
child 1230 68c69c656a2c
equal deleted inserted replaced
805:5e7a0fedabdf 807:877f952ae2bd
    13  * to license@zend.com so we can send you a copy immediately.
    13  * to license@zend.com so we can send you a copy immediately.
    14  *
    14  *
    15  * @category   Zend
    15  * @category   Zend
    16  * @package    Zend_Queue
    16  * @package    Zend_Queue
    17  * @subpackage Adapter
    17  * @subpackage Adapter
    18  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
    18  * @copyright  Copyright (c) 2005-2012 Zend Technologies USA Inc. (http://www.zend.com)
    19  * @license    http://framework.zend.com/license/new-bsd     New BSD License
    19  * @license    http://framework.zend.com/license/new-bsd     New BSD License
    20  * @version    $Id: Activemq.php 20096 2010-01-06 02:05:09Z bkarwin $
    20  * @version    $Id: Activemq.php 24593 2012-01-05 20:35:02Z matthew $
    21  */
    21  */
    22 
    22 
    23 /**
    23 /**
    24  * @see Zend_Queue_Adapter_AdapterAbstract
    24  * @see Zend_Queue_Adapter_AdapterAbstract
    25  */
    25  */
    39  * Class for using Stomp to talk to an Stomp compliant server
    39  * Class for using Stomp to talk to an Stomp compliant server
    40  *
    40  *
    41  * @category   Zend
    41  * @category   Zend
    42  * @package    Zend_Queue
    42  * @package    Zend_Queue
    43  * @subpackage Adapter
    43  * @subpackage Adapter
    44  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
    44  * @copyright  Copyright (c) 2005-2012 Zend Technologies USA Inc. (http://www.zend.com)
    45  * @license    http://framework.zend.com/license/new-bsd     New BSD License
    45  * @license    http://framework.zend.com/license/new-bsd     New BSD License
    46  */
    46  */
    47 class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract
    47 class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract
    48 {
    48 {
    49     const DEFAULT_SCHEME = 'tcp';
    49     const DEFAULT_SCHEME = 'tcp';
    52 
    52 
    53     /**
    53     /**
    54      * @var Zend_Queue_Adapter_Stomp_client
    54      * @var Zend_Queue_Adapter_Stomp_client
    55      */
    55      */
    56     private $_client = null;
    56     private $_client = null;
       
    57 
       
    58     /**
       
    59      * @var array
       
    60      */
       
    61     private $_subscribed = array();
    57 
    62 
    58     /**
    63     /**
    59      * Constructor
    64      * Constructor
    60      *
    65      *
    61      * @param  array|Zend_Config $config An array having configuration data
    66      * @param  array|Zend_Config $config An array having configuration data
   175         require_once 'Zend/Queue/Exception.php';
   180         require_once 'Zend/Queue/Exception.php';
   176         throw new Zend_Queue_Exception('getQueues() is not supported in this adapter');
   181         throw new Zend_Queue_Exception('getQueues() is not supported in this adapter');
   177     }
   182     }
   178 
   183 
   179     /**
   184     /**
       
   185      * Checks if the client is subscribed to the queue
       
   186      *
       
   187      * @param  Zend_Queue $queue
       
   188      * @return boolean
       
   189      */
       
   190     protected function _isSubscribed(Zend_Queue $queue)
       
   191     {
       
   192         return isset($this->_subscribed[$queue->getName()]);
       
   193     }
       
   194 
       
   195     /**
       
   196       * Subscribes the client to the queue.
       
   197       *
       
   198       * @param  Zend_Queue $queue
       
   199       * @return void
       
   200       */
       
   201     protected function _subscribe(Zend_Queue $queue)
       
   202     {
       
   203         $frame = $this->_client->createFrame();
       
   204         $frame->setCommand('SUBSCRIBE');
       
   205         $frame->setHeader('destination', $queue->getName());
       
   206         $frame->setHeader('ack', 'client');
       
   207         $this->_client->send($frame);
       
   208         $this->_subscribed[$queue->getName()] = true;
       
   209     }
       
   210 
       
   211     /**
   180      * Return the first element in the queue
   212      * Return the first element in the queue
   181      *
   213      *
   182      * @param  integer    $maxMessages
   214      * @param  integer    $maxMessages
   183      * @param  integer    $timeout
   215      * @param  integer    $timeout
   184      * @param  Zend_Queue $queue
   216      * @param  Zend_Queue $queue
   198 
   230 
   199         // read
   231         // read
   200         $data = array();
   232         $data = array();
   201 
   233 
   202         // signal that we are reading
   234         // signal that we are reading
   203         $frame = $this->_client->createFrame();
   235         if (!$this->_isSubscribed($queue)){
   204         $frame->setCommand('SUBSCRIBE');
   236             $this->_subscribe($queue);
   205         $frame->setHeader('destination', $queue->getName());
   237         }
   206         $frame->setHeader('ack','client');
       
   207         $this->_client->send($frame);
       
   208 
   238 
   209         if ($maxMessages > 0) {
   239         if ($maxMessages > 0) {
   210             if ($this->_client->canRead()) {
   240             if ($this->_client->canRead()) {
   211                 for ($i = 0; $i < $maxMessages; $i++) {
   241                 for ($i = 0; $i < $maxMessages; $i++) {
   212                     $response = $this->_client->receive();
   242                     $response = $this->_client->receive();