ZF-7534: The Reveive method on Zend_Queue Adapters creates multiple consumers when running in a polling mode


I am trying to setup a listener for messages send to an ActiveMq Broker from Zend_Queue. My client code in the listener class looks like this

while (1)
                  if ($messages=$this->queue->receive(1))
                        foreach ($messages as $message)
                              if (($msg=$message->body) != null)
                                    //do my processing and then delete the message from queue

The problem is with the following code in the ZF Supplied ActiveMQ Adapter (happens around lines 208)

   // signal that we are reading
        if ($this->isConnected == 0)
        $frame = $this->_client->createFrame();
        $frame->setHeader('destination', $queue->getName());
        $this->isConnected = 1;

When running in a listening mode , this code creates multiple consumers for every call to receive even though there are no messages on the queue. A better way would have been to strip this code from receive into a seperate method (something like checkmessage or getMessageCount..) and call that before calling receive? Also, it would be nice to have a generic listener in the framework itself that just needs the name of a queue/topic and possibly a polling interval and which can invoke predefined code when it receives a message. Thats how most adapters in EAI systems work.

Again, for now I have written my own adapter that works around this. The documentation on this functionality is sparse so raising an Issue just in case I might have missed something.


If you have already solved this issue, please submit a patch :)

I will check into this.

I checked around, but there is no method to obtain the number of messages in the queue. If you know of such a method for ActiveMQ please let me know.


Continuing to look into this...

The proposed fix will stop the adapter from creating multiple consumers. Instead of hardcoding the value of isConnected, keeping it configurable will give a user the option to have a set number of consumers depending upon expected load and application specificity.



A simple way would be to check that there is a valid MESSAGE frame on a target queue before receive (in its current incarnation) is called. I believe this can be easily done in the _canRead() method?( i may be wrong but the brief look I had at its code seemed to suggest that as the ideal place for such logic)

Using this and my proposed fix would a) make the adapter create a subscriber on demand and b) let the user control the number of required consumers based on his application needs (my current application suffices 1 consumer as i use the queue to process sparse email sending for comments placed on my message board. In another application though I have over 100 messages per sec flowing into the broker so I would like to have more consumers)

This would be a great load balancing act .


We are currently waiting for Matthew O'Phinney to get back from vacation to comment on this issue.

The only method I have found to get the number of pending messages (along with other items) is:

Activate the ActiveMQ.Agent topic (and JMX)

Send a "query -QQueue=" message

Required: Set a reply-to header with a queue or topic

Parse the number from the response

You might be able to return only the line or number of messages with more options to the command but I haven't tried.

http://activemq.apache.org/activemq-command-line-t… http://activemq.apache.org/command-agent.html

I fixed this issue in ZF-7948