ZF-7534: The Reveive method on Zend_Queue Adapters creates multiple consumers when running in a polling mode

Description

I am trying to setup a listener for messages send to an ActiveMq Broker from Zend_Queue. My client code in the listener class looks like this


while (1)
      {
                          
                  if ($messages=$this->queue->receive(1))
                  {
                        foreach ($messages as $message)
                        {
                              if (($msg=$message->body) != null)
                              {
                                    //do my processing and then delete the message from queue
                                    $this->queue->deleteMessage($message);
                              }
                        }
                  }
                        
      }

The problem is with the following code in the ZF Supplied ActiveMQ Adapter (happens around lines 208)


   // signal that we are reading
        if ($this->isConnected == 0)
        {
        $frame = $this->_client->createFrame();
        $frame->setCommand('SUBSCRIBE');
        $frame->setHeader('destination', $queue->getName());
        $frame->setHeader('ack','client');
        $this->_client->send($frame);
        $this->isConnected = 1;
        }

When running in a listening mode , this code creates multiple consumers for every call to receive even though there are no messages on the queue. A better way would have been to strip this code from receive into a seperate method (something like checkmessage or getMessageCount..) and call that before calling receive? Also, it would be nice to have a generic listener in the framework itself that just needs the name of a queue/topic and possibly a polling interval and which can invoke predefined code when it receives a message. Thats how most adapters in EAI systems work.

Again, for now I have written my own adapter that works around this. The documentation on this functionality is sparse so raising an Issue just in case I might have missed something.

Comments

If you have already solved this issue, please submit a patch :)

I will check into this.

I checked around, but there is no method to obtain the number of messages in the queue. If you know of such a method for ActiveMQ please let me know.

Sources:

Continuing to look into this...

The proposed fix will stop the adapter from creating multiple consumers. Instead of hardcoding the value of isConnected, keeping it configurable will give a user the option to have a set number of consumers depending upon expected load and application specificity.

/Dheeraj

Daniel,

A simple way would be to check that there is a valid MESSAGE frame on a target queue before receive (in its current incarnation) is called. I believe this can be easily done in the _canRead() method?( i may be wrong but the brief look I had at its code seemed to suggest that as the ideal place for such logic)

Using this and my proposed fix would a) make the adapter create a subscriber on demand and b) let the user control the number of required consumers based on his application needs (my current application suffices 1 consumer as i use the queue to process sparse email sending for comments placed on my message board. In another application though I have over 100 messages per sec flowing into the broker so I would like to have more consumers)

This would be a great load balancing act .

Dheeraj

We are currently waiting for Matthew O'Phinney to get back from vacation to comment on this issue.

The only method I have found to get the number of pending messages (along with other items) is:

Activate the ActiveMQ.Agent topic (and JMX)

Send a "query -QQueue=" message

Required: Set a reply-to header with a queue or topic

Parse the number from the response

You might be able to return only the line or number of messages with more options to the command but I haven't tried.

http://activemq.apache.org/activemq-command-line-t… http://activemq.apache.org/command-agent.html

I fixed this issue in ZF-7948