Queue Service Introduction

The QueueService implements access to message queues available as local or remote services. The simple queues that QueueService supports implement a messaging pattern that enables different processes to exchange messages in a reliable and scalable way. One common use case for such message queues is job dispatching, in which a frontend web server adds a complex job to a queue for a backend worker to do the expensive processing. The frontend web server can then return the page without waiting for the work to be completed.

The interface Zend_Cloud_QueueService_Adapter defines the methods which concrete queue service adapters must implement. The following adapters are shipped with the Simple Cloud API:

Instantiating and Configuring QueueService Adapters

To instantiate a QueueService adapter, use the static method Zend_Cloud_QueueService_Factory::getAdapter(), which accepts either an array or a Zend_Config object. Three parameters apply to all adapters, while the remaining parameters are adapter-specific properties; these adapter-specific properties often contain access details.

The general parameters are as follows:

  • queue_adapter specifies the concrete adapter class;

  • message_class specifies the class to use to represent queue messages; defaults to Zend_Cloud_QueueService_Message; and

  • messageset_class specifies the class to use to represent collections of queue messages; defaults to Zend_Cloud_QueueService_MesageSet.

Example #1 Instantiating an Amazon SQS adapter via the factory

  1. $queues = Zend_Cloud_QueueService_Factory::getAdapter(array(
  2.     Zend_Cloud_QueueService_Factory::QUEUE_ADAPTER_KEY  => 'Zend_Cloud_QueueService_Adapter_Sqs',
  3.     Zend_Cloud_QueueService_Adapter_Sqs::AWS_ACCESS_KEY => $amazonKey,
  4.     Zend_Cloud_QueueService_Adapter_Sqs::AWS_SECRET_KEY => $amazonSecret,
  5. ));

Service-Specific Options

Zend_Cloud_QueueService_Adapter_Sqs Options
Option key Description Used in Required Default
aws_accesskey Your Amazon AWS access key Constructor Yes None
aws_secretkey Your Amazon AWS secret key Constructor Yes None
http_adapter HTTP adapter to use in all access operations Constructor No Zend_Http_Client_Adapter_Socket
http_adapter HTTP adapter to use in all access operations Constructor No Zend_Http_Client_Adapter_Socket
visibility_timeout Message visibility timeout receiveMessages() No 60
Zend_Cloud_QueueService_Adapter_WindowsAzure Options
Option key Description Used in Required Default
storage_accountname Windows Azure account name Constructor Yes None
storage_accountkey Windows Azure account key Constructor Yes None
storage_host Windows Azure access host Constructor No queue.core.windows.net
storage_proxy_host Proxy hostname Constructor No None
storage_proxy_port Proxy port Constructor No 8080
storage_proxy_credentials Proxy credentials Constructor No None
http_adapter HTTP adapter to use in all access operations Constructor No Zend_Http_Client_Adapter_Socket
visibility_timeout Message visibility timeout receiveMessages() No 60
prefix Filter the results to only queue names beginning with given prefix listQueues() No None
max_results Limit queue list to certain number of results listQueues() No 5,000
ttl Set time-to-live for message sendMessage() No 7 days
Zend_Cloud_QueueService_Adapter_ZendQueue Options
Option key Description Used in Required Default
adapter Concrete Zend_Queue adapter to use. See the Zend_Queue documentation for supported adapters and their options. Constructor No Filesystem
timeout Visibility timeout for messages createQueue(), receiveMessages() No 30

Basic concepts

Every queue service typically offers one or more queues. Each queue can store zero or more messages. A process can send a message to a queue, and another process can remove it. Usually processes remove the oldest message in the queue, observing a first in, first out (FIFO) queue-style interface.

Exceptions

If some error occurs inside the storage service, a Zend_Cloud_QueueService_Exception is thrown. If the exception was caused by underlying service driver, you can use the getClientException() method to retrieve the original exception.

Since different cloud providers implement different sets of services, some adapters do not implement certain features. In this case, the Zend_Cloud_OperationNotAvailableException exception is thrown.

Create a queue

The createQueue() method creates a message queue with the given name. It returns a queue identifier, the format of which is service-dependent. Some services return a URL for the queue identifier, while others return a GUID to use in future operations.

Example #2 Creating a queue

  1. $queueId = $queues->createQueue('my-queue');

Delete a queue

The deleteQueue() method removes the queue from the service. You must use the identifier received from createQueue() when calling deleteQueue().

Example #3 Deleting a queue

  1. $queueId = $queues->createQueue('my-queue');
  2.  
  3. // ... do stuff ...
  4.  
  5. $queues->deleteQueue($queueId);

Note: Deleting a queue can take significant time for some services. Typically, you cannot re-create a queue with the same name until the original queue is fully removed.

List queues

To retrieve the list of all queues in the system, use the listQueues() method.

Example #4 Listing queues

  1. $names = $queues->listQueues();
  2. foreach ($names as $name) {
  3.     echo "Found queue $name\n";
  4. }

Set queue metadata

In some services, you can associate a set of key-value pairs with the queue as queue metadata. To set queue metadata, use the storeQueueMetadata() method:

Example #5 Setting queue metadata

  1. $queues->storeQueueMetadata($queueId, array(
  2.     'purpose'       => 'Operations',
  3.     'administrator' => 'joe@example.com',
  4. ));

Fetch queue metadata

To retrieve queue metadata, use the fetchQueueMetadata() method.

Example #6 Fetching queue metadata

  1. $metadata = $queues->fetchQueueMetadata($queueId);
  2. foreac h($metadata as $key => $value) {
  3.     echo "Metadata $key: $value\n";
  4. }

Send a message

To add a message to a queue, use the sendMessage() method. The message is passed as an unstructured string.

Example #7 Sending a message

  1. $queues->sendMessage($queueId, "Hello world!");

Receive a message

To receive one or more messages from the queue, use the receiveMessages() method. This method returns a Zend_Cloud_QueueService_MessageSet instance by default, unless configured otherwise. Each element of the MessageSet is an instance of Zend_Cloud_QueueService_Message by default, unless configuired otherwise.

Example #8 Receiving a message

  1. // Get the first message
  2. $messages = $queues->receiveMessages($queueId);
  3. if (count($messages)) {
  4.     foreach ($messages as $message) {
  5.         echo "Message: " . $message->getBody();
  6.         break;
  7.     }
  8. }
  9.  
  10. // Get two messages           
  11. $messages = $queues->receiveMessages($queueId, 2);

When a message is received, it is not visible to other clients. It is not deleted from the queue, however, until the client that has received the message calls the deleteMessage() method. If it is not deleted during the specfied visibility timeout, it will become visible to all other clients again. In other words, all clients will be able to retrieve the message with the receiveMessages() method if the visibility timeout is exceeded.

Delete a message

In order to delete the message from the queue, use the deleteMessage() method. This method deletes the specified message.

Example #9 Deleting a message

  1. // process and delete $max messages           
  2. $messages = $queues->receiveMessages($queueId, $max);
  3. if (count($messages)) {
  4.     foreach ($messages as $message) {
  5.         process($message);
  6.         $queues->deleteMessage($queueId, $message);
  7.     }
  8. }

Accessing concrete adapters

Sometimes it is necessary to retrieve the concrete adapter for the service that the Queue API is working with. This can be achieved by using the getAdapter() method.

Note: Accessing the underlying adapter breaks portability among services, so it should be reserved for exceptional circumstances only.

Example #10 Using a concrete adapter

  1. // send the message directly with the SQS client library
  2. $sqs = $queues->getAdapter();
  3. $sqs->sendMessage("myQueue", "hello!");
blog comments powered by Disqus