--- tests/Zend/Queue/Adapter/ActivemqTest.php (revision 23486) +++ tests/Zend/Queue/Adapter/ActivemqTest.php (working copy) @@ -63,13 +63,13 @@ { $driverOptions = array(); if (defined('TESTS_ZEND_QUEUE_ACTIVEMQ_HOST')) { - $driverOptions['host'] = TESTS_ZEND_QUEUE_APACHEMQ_HOST; + $driverOptions['host'] = TESTS_ZEND_QUEUE_ACTIVEMQ_HOST; } if (defined('TESTS_ZEND_QUEUE_ACTIVEMQ_PORT')) { - $driverOptions['port'] = TESTS_ZEND_QUEUE_APACHEMQ_PORT; + $driverOptions['port'] = TESTS_ZEND_QUEUE_ACTIVEMQ_PORT; } if (defined('TESTS_ZEND_QUEUE_ACTIVEMQ_SCHEME')) { - $driverOptions['scheme'] = TESTS_ZEND_QUEUE_APACHEMQ_SCHEME; + $driverOptions['scheme'] = TESTS_ZEND_QUEUE_ACTIVEMQ_SCHEME; } return array('driverOptions' => $driverOptions); } Index: tests/Zend/Queue/Adapter/ActivemqOfflineTest.php =================================================================== --- tests/Zend/Queue/Adapter/ActivemqOfflineTest.php (revision 0) +++ tests/Zend/Queue/Adapter/ActivemqOfflineTest.php (revision 0) @@ -0,0 +1,101 @@ + 'foo')); + $adapter->receive(null, null, $queue); + $adapter->receive(null, null, $queue); + + // iterate through mock StompClient and ensure SUBSCRIBE is only sent once per queue + $subscribes = 0; + foreach ($stompClient->frameStack as $frame) { + if ($frame->getCommand() === 'SUBSCRIBE') { + $subscribes++; + } + } + + $this->assertEquals(1, $subscribes); + } +} + +class StompClientMock extends Zend_Queue_Stomp_Client +{ + public $frameStack = array(); + public $responseStack = array(); + + public function __construct() { + // spoof a successful connection in the response stack + $frame = new Zend_Queue_Stomp_Frame; + $frame->setCommand('CONNECTED'); + $this->responseStack[] = $frame; + } + public function __destruct() {} + + public function send(Zend_Queue_Stomp_FrameInterface $frame) + { + $this->frameStack[] = $frame; + return $this; + } + + public function receive() + { + return array_shift($this->responseStack); + } + + public function canRead() + { + return count($this->responseStack) > 0; + } + + public function createFrame() + { + return new Zend_Queue_Stomp_Frame; + } +} Index: tests/Zend/Queue/Adapter/AdapterTest.php =================================================================== --- tests/Zend/Queue/Adapter/AdapterTest.php (revision 23486) +++ tests/Zend/Queue/Adapter/AdapterTest.php (working copy) @@ -148,12 +148,22 @@ try { $queue = new Zend_Queue($this->getAdapterName(), $config); } catch (Zend_Queue_Exception $e) { - $this->markTestSkipped(); + $this->markTestSkipped($e->getMessage()); restore_error_handler(); return false; } - restore_error_handler(); + // a PHP level error occurred, mark test as failed with error as reason + // (misconfigured test? undefined constant?) + if ($this->error) { + $err = error_get_last(); + $this->markTestFailed($err['message']); + restore_error_handler(); + return false; + } + + restore_error_handler(); + return $queue; } Index: tests/Zend/Queue/AllTests.php =================================================================== --- tests/Zend/Queue/AllTests.php (revision 23486) +++ tests/Zend/Queue/AllTests.php (working copy) @@ -49,6 +49,7 @@ // Message Queues dependent on Stomp require_once 'Zend/Queue/Adapter/ActivemqTest.php'; +require_once 'Zend/Queue/Adapter/ActivemqOfflineTest.php'; /** * @category Zend @@ -94,6 +95,7 @@ // Message Queues dependent on Stomp $suite->addTestSuite('Zend_Queue_Adapter_ActivemqTest'); + $suite->addTestSuite('Zend_Queue_Adapter_ActivemqOfflineTest'); return $suite; } Index: library/Zend/Queue/Adapter/Activemq.php =================================================================== --- library/Zend/Queue/Adapter/Activemq.php (revision 23486) +++ library/Zend/Queue/Adapter/Activemq.php (working copy) @@ -56,6 +56,11 @@ private $_client = null; /** + * @var array + */ + private $_subscribed = array(); + + /** * Constructor * * @param array|Zend_Config $config An array having configuration data @@ -177,6 +182,33 @@ } /** + * Checks if the client is subscribed to the queue + * + * @param Zend_Queue $queue + * @return boolean + */ + protected function _isSubscribed(Zend_Queue $queue) + { + return isset($this->_subscribed[$queue->getName()]); + } + + /** + * Subscribes the client to the queue. + * + * @param Zend_Queue $queue + * @return void + */ + protected function _subscribe(Zend_Queue $queue) + { + $frame = $this->_client->createFrame(); + $frame->setCommand('SUBSCRIBE'); + $frame->setHeader('destination', $queue->getName()); + $frame->setHeader('ack', 'client'); + $this->_client->send($frame); + $this->_subscribed[$queue->getName()] = true; + } + + /** * Return the first element in the queue * * @param integer $maxMessages @@ -200,11 +232,9 @@ $data = array(); // signal that we are reading - $frame = $this->_client->createFrame(); - $frame->setCommand('SUBSCRIBE'); - $frame->setHeader('destination', $queue->getName()); - $frame->setHeader('ack','client'); - $this->_client->send($frame); + if (!$this->_isSubscribed($queue)){ + $this->_subscribe($queue); + } if ($maxMessages > 0) { if ($this->_client->canRead()) {