equal
deleted
inserted
replaced
13 * to license@zend.com so we can send you a copy immediately. |
13 * to license@zend.com so we can send you a copy immediately. |
14 * |
14 * |
15 * @category Zend |
15 * @category Zend |
16 * @package Zend_Queue |
16 * @package Zend_Queue |
17 * @subpackage Adapter |
17 * @subpackage Adapter |
18 * @copyright Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com) |
18 * @copyright Copyright (c) 2005-2012 Zend Technologies USA Inc. (http://www.zend.com) |
19 * @license http://framework.zend.com/license/new-bsd New BSD License |
19 * @license http://framework.zend.com/license/new-bsd New BSD License |
20 * @version $Id: Activemq.php 20096 2010-01-06 02:05:09Z bkarwin $ |
20 * @version $Id: Activemq.php 24593 2012-01-05 20:35:02Z matthew $ |
21 */ |
21 */ |
22 |
22 |
23 /** |
23 /** |
24 * @see Zend_Queue_Adapter_AdapterAbstract |
24 * @see Zend_Queue_Adapter_AdapterAbstract |
25 */ |
25 */ |
39 * Class for using Stomp to talk to an Stomp compliant server |
39 * Class for using Stomp to talk to an Stomp compliant server |
40 * |
40 * |
41 * @category Zend |
41 * @category Zend |
42 * @package Zend_Queue |
42 * @package Zend_Queue |
43 * @subpackage Adapter |
43 * @subpackage Adapter |
44 * @copyright Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com) |
44 * @copyright Copyright (c) 2005-2012 Zend Technologies USA Inc. (http://www.zend.com) |
45 * @license http://framework.zend.com/license/new-bsd New BSD License |
45 * @license http://framework.zend.com/license/new-bsd New BSD License |
46 */ |
46 */ |
47 class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract |
47 class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract |
48 { |
48 { |
49 const DEFAULT_SCHEME = 'tcp'; |
49 const DEFAULT_SCHEME = 'tcp'; |
52 |
52 |
53 /** |
53 /** |
54 * @var Zend_Queue_Adapter_Stomp_client |
54 * @var Zend_Queue_Adapter_Stomp_client |
55 */ |
55 */ |
56 private $_client = null; |
56 private $_client = null; |
|
57 |
|
58 /** |
|
59 * @var array |
|
60 */ |
|
61 private $_subscribed = array(); |
57 |
62 |
58 /** |
63 /** |
59 * Constructor |
64 * Constructor |
60 * |
65 * |
61 * @param array|Zend_Config $config An array having configuration data |
66 * @param array|Zend_Config $config An array having configuration data |
175 require_once 'Zend/Queue/Exception.php'; |
180 require_once 'Zend/Queue/Exception.php'; |
176 throw new Zend_Queue_Exception('getQueues() is not supported in this adapter'); |
181 throw new Zend_Queue_Exception('getQueues() is not supported in this adapter'); |
177 } |
182 } |
178 |
183 |
179 /** |
184 /** |
|
185 * Checks if the client is subscribed to the queue |
|
186 * |
|
187 * @param Zend_Queue $queue |
|
188 * @return boolean |
|
189 */ |
|
190 protected function _isSubscribed(Zend_Queue $queue) |
|
191 { |
|
192 return isset($this->_subscribed[$queue->getName()]); |
|
193 } |
|
194 |
|
195 /** |
|
196 * Subscribes the client to the queue. |
|
197 * |
|
198 * @param Zend_Queue $queue |
|
199 * @return void |
|
200 */ |
|
201 protected function _subscribe(Zend_Queue $queue) |
|
202 { |
|
203 $frame = $this->_client->createFrame(); |
|
204 $frame->setCommand('SUBSCRIBE'); |
|
205 $frame->setHeader('destination', $queue->getName()); |
|
206 $frame->setHeader('ack', 'client'); |
|
207 $this->_client->send($frame); |
|
208 $this->_subscribed[$queue->getName()] = true; |
|
209 } |
|
210 |
|
211 /** |
180 * Return the first element in the queue |
212 * Return the first element in the queue |
181 * |
213 * |
182 * @param integer $maxMessages |
214 * @param integer $maxMessages |
183 * @param integer $timeout |
215 * @param integer $timeout |
184 * @param Zend_Queue $queue |
216 * @param Zend_Queue $queue |
198 |
230 |
199 // read |
231 // read |
200 $data = array(); |
232 $data = array(); |
201 |
233 |
202 // signal that we are reading |
234 // signal that we are reading |
203 $frame = $this->_client->createFrame(); |
235 if (!$this->_isSubscribed($queue)){ |
204 $frame->setCommand('SUBSCRIBE'); |
236 $this->_subscribe($queue); |
205 $frame->setHeader('destination', $queue->getName()); |
237 } |
206 $frame->setHeader('ack','client'); |
|
207 $this->_client->send($frame); |
|
208 |
238 |
209 if ($maxMessages > 0) { |
239 if ($maxMessages > 0) { |
210 if ($this->_client->canRead()) { |
240 if ($this->_client->canRead()) { |
211 for ($i = 0; $i < $maxMessages; $i++) { |
241 for ($i = 0; $i < $maxMessages; $i++) { |
212 $response = $this->_client->receive(); |
242 $response = $this->_client->receive(); |