web/lib/Zend/Queue/Adapter/Activemq.php
changeset 807 877f952ae2bd
parent 207 621fa6caec0c
child 1230 68c69c656a2c
--- a/web/lib/Zend/Queue/Adapter/Activemq.php	Thu Mar 21 17:31:31 2013 +0100
+++ b/web/lib/Zend/Queue/Adapter/Activemq.php	Thu Mar 21 19:50:53 2013 +0100
@@ -15,9 +15,9 @@
  * @category   Zend
  * @package    Zend_Queue
  * @subpackage Adapter
- * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
+ * @copyright  Copyright (c) 2005-2012 Zend Technologies USA Inc. (http://www.zend.com)
  * @license    http://framework.zend.com/license/new-bsd     New BSD License
- * @version    $Id: Activemq.php 20096 2010-01-06 02:05:09Z bkarwin $
+ * @version    $Id: Activemq.php 24593 2012-01-05 20:35:02Z matthew $
  */
 
 /**
@@ -41,7 +41,7 @@
  * @category   Zend
  * @package    Zend_Queue
  * @subpackage Adapter
- * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
+ * @copyright  Copyright (c) 2005-2012 Zend Technologies USA Inc. (http://www.zend.com)
  * @license    http://framework.zend.com/license/new-bsd     New BSD License
  */
 class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract
@@ -56,6 +56,11 @@
     private $_client = null;
 
     /**
+     * @var array
+     */
+    private $_subscribed = array();
+
+    /**
      * Constructor
      *
      * @param  array|Zend_Config $config An array having configuration data
@@ -177,6 +182,33 @@
     }
 
     /**
+     * Checks if the client is subscribed to the queue
+     *
+     * @param  Zend_Queue $queue
+     * @return boolean
+     */
+    protected function _isSubscribed(Zend_Queue $queue)
+    {
+        return isset($this->_subscribed[$queue->getName()]);
+    }
+
+    /**
+      * Subscribes the client to the queue.
+      *
+      * @param  Zend_Queue $queue
+      * @return void
+      */
+    protected function _subscribe(Zend_Queue $queue)
+    {
+        $frame = $this->_client->createFrame();
+        $frame->setCommand('SUBSCRIBE');
+        $frame->setHeader('destination', $queue->getName());
+        $frame->setHeader('ack', 'client');
+        $this->_client->send($frame);
+        $this->_subscribed[$queue->getName()] = true;
+    }
+
+    /**
      * Return the first element in the queue
      *
      * @param  integer    $maxMessages
@@ -200,11 +232,9 @@
         $data = array();
 
         // signal that we are reading
-        $frame = $this->_client->createFrame();
-        $frame->setCommand('SUBSCRIBE');
-        $frame->setHeader('destination', $queue->getName());
-        $frame->setHeader('ack','client');
-        $this->_client->send($frame);
+        if (!$this->_isSubscribed($queue)){
+            $this->_subscribe($queue);
+        }
 
         if ($maxMessages > 0) {
             if ($this->_client->canRead()) {