View Source

<ac:macro ac:name="unmigrated-inline-wiki-markup"><ac:plain-text-body><![CDATA[{zone-template-instance:ZFPROP:Proposal Zone Template}

{zone-data:component-name}
Zend_Queue_Adapter_Starling
{zone-data}

{zone-data:proposer-list}
[Tokarchuk Andrey|mailto:netandreus@gmail.com]
{zone-data}

{zone-data:liaison}
TBD
{zone-data}

{zone-data:revision}
1.0 - 21 May 2011: Initial Draft.
{zone-data}

{zone-data:overview}
Zend_Queue_Adapter_Starling is a component that add support to Starling queue server into Zend_Queue component.
{zone-data}

{zone-data:references}
* [Starling homepage at GitHub|https://github.com/starling/starling/]
* [Starling ruby gem|http://rubygems.org/gems/starling]
* [Starling: A Ruby Persistent Queue Server That Speaks Memcached|http://www.rubyinside.com/starling-and-rudeq-persistent-ruby-queues-958.html]
* [Integrating Starling into Zend Framework (rus)|http://tokarchuk.ru/2011/05/php-ruby-with-starling/]
{zone-data}

{zone-data:requirements}

* This component *will* provide operations adding message.
* This component *will* provide operations retreiving message.
* This component *will* provide operations adding queue.
* This component *will* provide operations deleting queue Starling server with all messages.
* This component *will* provide operations counting message in the queue.
* This component *will* provide retrieving list of he queues in the server.
* This component *will not* support deleting message from the queue, only retreive with deleting.
* This component *will* use Zend_Queue_Adapter_Memcacheq because Starling has Memcached protocol (with some difference).
* This component *will not* save any data using Zend_Cache or the filesystem.
{zone-data}

{zone-data:dependencies}
* Zend_Exception
* Zend_Queue_Adapter_Memcacheq
{zone-data}

{zone-data:operation}
The component is instantiated with a mind-link that:
$starling = new Zend_Queue(new ZendExtra_Queue_Adapter_Starling(array()), array('name' => 'myqueue1'));
{zone-data}

{zone-data:milestones}
* Milestone 1: \[DONE\] [design notes will be published here|http://framework.zend.com/wiki/display/ZFPROP/Zend_Queue_Adapter_Starling+-+Tokarchuk+Andrey]
* Milestone 2: \[DONE\] Making class sceleton and prototype.
* Milestone 3: Working prototype checked into the incubator supporting use cases #1, #2.
* Milestone 4: Initial documentation exists.

{zone-data}

{zone-data:class-list}
* Zend_Queue_Adapter_Starling
{zone-data}

{zone-data:use-cases}
||UC-01||
// Add job to queue
$starling = new Zend_Queue(new ZendExtra_Queue_Adapter_Starling(array()), array('name' => 'myqueue1'));
$starling->send(array('jobKey' => 'jobText'));

||UC-02||
// Get job from queue
$starling = new Zend_Queue(new ZendExtra_Queue_Adapter_Starling(array()), array('name' => 'myqueue1'));
var_dump($starling->receive(2)->toArray()); // Messages also deleting from queue
{zone-data}

{zone-data:skeletons}
{code}
<?php

/**
* Adapter for storage messags in Starling queue
* @author Tokarchuk Andrey (netandreus@gmail.com)
* @link http://tokarchuk.ru/2011/05/php-ruby-with-starling/
* Note: messages are stored serialized
* After recieving outside this adapter unserialization is needed
* For Ruby you can use gem "php_serialize" (not "php-serialize")
*
* Usage:
*
* $queuePool = new Zend_Queue(new ZendExtra_Queue_Adapter_Starling(array()), array('name' => 'myqueue1'));
* $queuePool->send(array('domain' => 'site.ru'));
* var_dump($queuePool->receive()->toArray()); // Messages also deleting from queue
*
*/
require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
class ZendExtra_Queue_Adapter_Starling extends Zend_Queue_Adapter_Memcacheq
{
const DEFAULT_HOST = '127.0.0.1';
const DEFAULT_PORT = 22122;
const EOL = "\r\n";

/**
* Supporting abilities
*
* @return array
*/
public function getCapabilities()
{
return array(
'create' => true,
'delete' => true,
'send' => true,
'receive' => true,
'deleteMessage' => false,
'getQueues' => true,
'count' => true,
'isExists' => true,
);
}

public function __construct($options, Zend_Queue $queue = null)
{
if (!extension_loaded('memcache')) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('Memcache extension does not appear to be loaded');
}

Zend_Queue_Adapter_AdapterAbstract::__construct($options, $queue); // instead of parent::, which is MemcachedQ

$options = &$this->_options['driverOptions'];

if (!array_key_exists('host', $options)) {
$options['host'] = self::DEFAULT_HOST;
}
if (!array_key_exists('port', $options)) {
$options['port'] = self::DEFAULT_PORT;
}

$this->_cache = new Memcache();

$result = $this->_cache->connect($options['host'], $options['port']);

if ($result === false) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('Could not connect to MemcacheQ');
}

$this->_host = $options['host'];
$this->_port = (int)$options['port'];
}

/**
* Return queues list
*
* @return array
*/
public function getQueues()
{
$this->_queues = array();

$response = $this->_sendCommand('stats', array('END'));

$postfixesArray = array('_items', '_total_items', '_logsize', '_expired_items', '_age', '_total', '_expired');
$arr = array();
foreach ($response as $i => $line) {
if(strpos($line, 'STAT queue_') === 0 ) { // Zero position (start), not false
$tmp = str_replace('STAT queue_', '', $line);
$tmp = explode(' ', $tmp);
$tmp = $tmp[0];
foreach($postfixesArray as $postfix) {
$tmp = str_replace($postfix, '', $tmp);
}
$arr[] = $tmp;
}
}
$this->_queues = array_unique($arr);
return $this->_queues;
}

/**
* Return the approximate number of messages in the queue
*
* @return integer
* @throws Zend_Queue_Exception (not supported)
*/
public function count(Zend_Queue $queue = null)
{
if($queue === NULL) {
$keyName = 'queue_'.$this->getQueue()->getName().'_items';
} else {
$keyName = 'queue_'.$queue->getName().'_items';
}
$response = $this->_sendCommand('stats', array('END'));
foreach($response as $line) {
if(strpos($line, $keyName) !== false) {
$tmp = str_replace('STAT '.$keyName.' ', '', $line);
return (int) $tmp;
}
}
return 0;
}

/**
* Get messages in the queue
*
* @param integer $maxMessages Maximum number of messages to return
* @param integer $timeout Visibility timeout for these messages
* @param Zend_Queue $queue
* @return Zend_Queue_Message_Iterator
* @throws Zend_Queue_Exception
*/
public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null)
{
if ($maxMessages === null) {
$maxMessages = 1;
}

if ($timeout === null) {
$timeout = self::RECEIVE_TIMEOUT_DEFAULT;
}
if ($queue === null) {
$queue = $this->_queue;
}

$msgs = array();

// Setting up the limit upon to queue count
if($maxMessages != NULL) {
$count = $this->count($queue);
if($count < $maxMessages)
$maxMessages = $count;
}

if ($maxMessages > 0 ) {
for ($i = 0; $i < $maxMessages; $i++) {
$data = unserialize($this->_cache->get($queue->getName()));
$msgs[] = $data;
}
}

$options = array(
'queue' => $queue,
'data' => $msgs,
'messageClass' => $queue->getMessageClass(),
);
$classname = $queue->getMessageSetClass();
if (!class_exists($classname)) {
require_once 'Zend/Loader.php';
Zend_Loader::loadClass($classname);
}
return new $classname($options);
}

/**
* Delete a queue and all of it's messages
*
* Returns false if the queue is not found, true if the queue exists
*
* @param string $name queue name
* @return boolean
* @throws Zend_Queue_Exception
*/
public function delete($name)
{
$queue = $this->getQueue($name);
$count = $this->count($queue);
$this->receive($count, NULL, $queue);
}

/**
* Send a message to the queue
*
* @param string $message Message to send to the active queue
* @param Zend_Queue $queue
* @return Zend_Queue_Message
* @throws Zend_Queue_Exception
*/
public function send($message, Zend_Queue $queue=null)
{
if ($queue === null) {
$queue = $this->_queue;
}

if (!$this->isExists($queue->getName())) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
}

$message = serialize($message);
$data = array(
'message_id' => md5(uniqid(rand(), true)),
'handle' => null,
'body' => $message,
'md5' => md5($message),
);

$result = $this->_cache->set($queue->getName(), $message, 0, 0);
if ($result === false) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('failed to insert message into queue:' . $queue->getName());
}

$options = array(
'queue' => $queue,
'data' => $data,
);

$classname = $queue->getMessageClass();
if (!class_exists($classname)) {
require_once 'Zend/Loader.php';
Zend_Loader::loadClass($classname);
}
return new $classname($options);
}


}
{code}
{zone-data}

{zone-template-instance}]]></ac:plain-text-body></ac:macro>