ZF-7534: The Reveive method on Zend_Queue Adapters creates multiple consumers when running in a polling mode
Description
I am trying to setup a listener for messages send to an ActiveMq Broker from Zend_Queue. My client code in the listener class looks like this
while (1)
{
if ($messages=$this->queue->receive(1))
{
foreach ($messages as $message)
{
if (($msg=$message->body) != null)
{
//do my processing and then delete the message from queue
$this->queue->deleteMessage($message);
}
}
}
}
The problem is with the following code in the ZF Supplied ActiveMQ Adapter (happens around lines 208)
// signal that we are reading
if ($this->isConnected == 0)
{
$frame = $this->_client->createFrame();
$frame->setCommand('SUBSCRIBE');
$frame->setHeader('destination', $queue->getName());
$frame->setHeader('ack','client');
$this->_client->send($frame);
$this->isConnected = 1;
}
When running in a listening mode , this code creates multiple consumers for every call to receive even though there are no messages on the queue. A better way would have been to strip this code from receive into a seperate method (something like checkmessage or getMessageCount..) and call that before calling receive? Also, it would be nice to have a generic listener in the framework itself that just needs the name of a queue/topic and possibly a polling interval and which can invoke predefined code when it receives a message. Thats how most adapters in EAI systems work.
Again, for now I have written my own adapter that works around this. The documentation on this functionality is sparse so raising an Issue just in case I might have missed something.
Comments
Posted by Daniel Lo (danlo) on 2009-08-08T12:55:46.000+0000
If you have already solved this issue, please submit a patch :)
I will check into this.
Posted by Daniel Lo (danlo) on 2009-08-08T13:07:02.000+0000
I checked around, but there is no method to obtain the number of messages in the queue. If you know of such a method for ActiveMQ please let me know.
Sources:
Continuing to look into this...
Posted by Dheeraj Saxena (kkarank) on 2009-08-08T20:03:37.000+0000
The proposed fix will stop the adapter from creating multiple consumers. Instead of hardcoding the value of isConnected, keeping it configurable will give a user the option to have a set number of consumers depending upon expected load and application specificity.
/Dheeraj
Posted by Dheeraj Saxena (kkarank) on 2009-08-08T20:13:48.000+0000
Daniel,
A simple way would be to check that there is a valid MESSAGE frame on a target queue before receive (in its current incarnation) is called. I believe this can be easily done in the _canRead() method?( i may be wrong but the brief look I had at its code seemed to suggest that as the ideal place for such logic)
Using this and my proposed fix would a) make the adapter create a subscriber on demand and b) let the user control the number of required consumers based on his application needs (my current application suffices 1 consumer as i use the queue to process sparse email sending for comments placed on my message board. In another application though I have over 100 messages per sec flowing into the broker so I would like to have more consumers)
This would be a great load balancing act .
Dheeraj
Posted by Daniel Lo (danlo) on 2009-08-16T09:10:17.000+0000
We are currently waiting for Matthew O'Phinney to get back from vacation to comment on this issue.
Posted by Rob Olmos (ndrtek_rob) on 2009-08-28T23:32:05.000+0000
The only method I have found to get the number of pending messages (along with other items) is:
Activate the ActiveMQ.Agent topic (and JMX)
Send a "query -QQueue=" message
Required: Set a reply-to header with a queue or topic
Parse the number from the response
You might be able to return only the line or number of messages with more options to the command but I haven't tried.
http://activemq.apache.org/activemq-command-line-t… http://activemq.apache.org/command-agent.html
Posted by Anton C. Swartz IV (necrogami) on 2010-12-18T00:28:14.000+0000
I fixed this issue in ZF-7948