Version 1 by Tokarchuk Andrey
on May 21, 2011 04:03.

compared with
Key
This line was removed.
This word was removed. This word was added.
This line was added.

Changes (34)

View Page History

{zone-data:component-name}
Zend_Magic Zend_Queue_Adapter_Starling
{zone-data}

{zone-data:proposer-list}
[My Name|mailto:noreply@zend.com]
[Tokarchuk Andrey|mailto:netandreus@gmail.com]
{zone-data}


{zone-data:revision}
1.0 - 1 January 2008: 21 May 2011: Initial Draft.
{zone-data}

{zone-data:overview}
Zend_Magic is a simple component that reads my mind and generates code dynamically to do what I want.
Zend_Queue_Adapter_Starling is a component that add support to Starling queue server into Zend_Queue component.
{zone-data}

{zone-data:references}
* [Harry Houdini Wikipedia Entry|http://en.wikipedia.org/wiki/Harry_Houdini]
* [MagicTricks.com|http://www.magictricks.com/]
* [Starling homepage at GitHub|https://github.com/starling/starling/]
* [Starling ruby gem|http://rubygems.org/gems/starling]
* [Starling: A Ruby Persistent Queue Server That Speaks Memcached|http://www.rubyinside.com/starling-and-rudeq-persistent-ruby-queues-958.html]
* [Integrating Starling into Zend Framework (rus)|http://tokarchuk.ru/2011/05/php-ruby-with-starling/]
{zone-data}

{zone-data:requirements}
Most requirements take the form of "foo will do ...." or "foo will not support ...", although different words and sentence structure might be used. Adding functionality to your proposal is requirements creep (bad), unless listed below. Discuss major changes with your team first, and then open a "feature improvement" issue against this component.

* This component *will* correctly reads a developers mind for intent and generate the right configuration file.
* The generated config file *will not* support XML, but will provide an extension point in the API.
* This component *will* use no more memory than twice the size of all data it contains.
* This component *will* include a factory method. provide operations adding message.
* This component *will not* allow subclassing. (i.e. when reviewed, we expect to see "final" keyword in code)
* This component *will* only generate data exports strictly complying with RFC 12345.
* This component *will* validate input data against formats supported by ZF component Foo.
* This component *will* provide operations retreiving message.
* This component *will* provide operations adding queue.
* This component *will* provide operations deleting queue Starling server with all messages.
* This component *will* provide operations counting message in the queue.
* This component *will* provide retrieving list of he queues in the server.
* This component *will not* support deleting message from the queue, only retreive with deleting.
* This component *will* use Zend_Queue_Adapter_Memcacheq because Starling has Memcached protocol (with some difference).
* This component *will not* save any data using Zend_Cache or the filesystem. All transient data *will be* saved using Zend_Session.
{zone-data}

{zone-data:dependencies}
* Zend_Exception
* Zend_Queue_Adapter_Memcacheq
{zone-data}

{zone-data:operation}
The component is instantiated with a mind-link that ... that:
$starling = new Zend_Queue(new ZendExtra_Queue_Adapter_Starling(array()), array('name' => 'myqueue1'));
{zone-data}

{zone-data:milestones}
Describe some intermediate state of this component in terms of design notes, additional material added to this page, and / code. Note any significant dependencies here, such as, "Milestone #3 can not be completed until feature Foo has been added to ZF component XYZ." Milestones will be required for acceptance of future proposals. They are not hard, and many times you will only need to think of the first three below.
* Milestone 1: \[DONE\] [design notes will be published here|http://framework.zend.com/wiki/x/sg]
* Milestone 2: \[DONE\] Making class sceleton and prototype.
* Milestone 23: Working prototype checked into the incubator supporting use cases #1, #2, ... #2.
* Milestone 3: Working prototype checked into the incubator supporting use cases #3 and #4.
* Milestone 4: Unit tests exist, work, and are checked into SVN.
* Milestone 54: Initial documentation exists.

If a milestone is already done, begin the description with "\[DONE\]", like this:
* Milestone #: \[DONE\] Unit tests ...
{zone-data}

{zone-data:class-list}
* Zend_Magic_Exception Zend_Queue_Adapter_Starling
* Zend_Magic (factory class)
* Zend_Magic_MindProbe
* Zend_Magic_MindProbe_Intent
* Zend_Magic_Action
* Zend_Magic_CodeGen
{zone-data}

{zone-data:use-cases}
||UC-01||
// Add job to queue
$starling = new Zend_Queue(new ZendExtra_Queue_Adapter_Starling(array()), array('name' => 'myqueue1'));
$starling->send(array('jobKey' => 'jobText'));

... (see good use cases book)
||UC-02||
// Add job to queue
$starling = new Zend_Queue(new ZendExtra_Queue_Adapter_Starling(array()), array('name' => 'myqueue1'));
var_dump($starling->receive(2)->toArray()); // Messages also deleting from queue
{zone-data}

{zone-data:skeletons}
{code}
class Zend_Magic_Exception extends Zend_Exception {}
<?php

class Zend_Magic {
... /**
* Adapter for storage messags in Starling queue
* @author Tokarchuk Andrey (netandreus@gmail.com)
* Note: messages are stored serialized
* After recieving outside this adapter unserialization is needed
* For Ruby you can use gem "php_serialize" (not "php-serialize")
*
* Usage:
*
* $queuePool = new Zend_Queue(new ZendExtra_Queue_Adapter_Starling(array()), array('name' => 'myqueue1'));
* $queuePool->send(array('domain' => 'site.ru'));
* var_dump($queuePool->receive()->toArray()); // Messages also deleting from queue
*
*/
require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
class ZendExtra_Queue_Adapter_Starling extends Zend_Queue_Adapter_Memcacheq
{
const DEFAULT_HOST = '127.0.0.1';
const DEFAULT_PORT = 22122;
const EOL = "\r\n";

/**
* Supporting abilities
*
* @return array
*/
public function getCapabilities()
{
return array(
'create' => true,
'delete' => true,
'send' => true,
'receive' => true,
'deleteMessage' => false,
'getQueues' => true,
'count' => true,
'isExists' => true,
);
}

public function __construct($options, Zend_Queue $queue = null)
{
if (!extension_loaded('memcache')) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('Memcache extension does not appear to be loaded');
}

Zend_Queue_Adapter_AdapterAbstract::__construct($options, $queue); // instead of parent::, which is MemcachedQ

$options = &$this->_options['driverOptions'];

if (!array_key_exists('host', $options)) {
$options['host'] = self::DEFAULT_HOST;
}
if (!array_key_exists('port', $options)) {
$options['port'] = self::DEFAULT_PORT;
}

$this->_cache = new Memcache();

$result = $this->_cache->connect($options['host'], $options['port']);

if ($result === false) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('Could not connect to MemcacheQ');
}

$this->_host = $options['host'];
$this->_port = (int)$options['port'];
}

/**
* Return queues list
*
* @return array
*/
public function getQueues()
{
$this->_queues = array();

$response = $this->_sendCommand('stats', array('END'));

$postfixesArray = array('_items', '_total_items', '_logsize', '_expired_items', '_age', '_total', '_expired');
$arr = array();
foreach ($response as $i => $line) {
if(strpos($line, 'STAT queue_') === 0 ) { // Zero position (start), not false
$tmp = str_replace('STAT queue_', '', $line);
$tmp = explode(' ', $tmp);
$tmp = $tmp[0];
foreach($postfixesArray as $postfix) {
$tmp = str_replace($postfix, '', $tmp);
}
$arr[] = $tmp;
}
}
$this->_queues = array_unique($arr);
return $this->_queues;
}

/**
* Return the approximate number of messages in the queue
*
* @return integer
* @throws Zend_Queue_Exception (not supported)
*/
public function count(Zend_Queue $queue = null)
{
if($queue === NULL) {
$keyName = 'queue_'.$this->getQueue()->getName().'_items';
} else {
$keyName = 'queue_'.$queue->getName().'_items';
}
$response = $this->_sendCommand('stats', array('END'));
foreach($response as $line) {
if(strpos($line, $keyName) !== false) {
$tmp = str_replace('STAT '.$keyName.' ', '', $line);
return (int) $tmp;
}
}
return 0;
}

/**
* Get messages in the queue
*
* @param integer $maxMessages Maximum number of messages to return
* @param integer $timeout Visibility timeout for these messages
* @param Zend_Queue $queue
* @return Zend_Queue_Message_Iterator
* @throws Zend_Queue_Exception
*/
public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null)
{
if ($maxMessages === null) {
$maxMessages = 1;
}

if ($timeout === null) {
$timeout = self::RECEIVE_TIMEOUT_DEFAULT;
}
if ($queue === null) {
$queue = $this->_queue;
}

$msgs = array();

// Setting up the limit upon to queue count
if($maxMessages != NULL) {
$count = $this->count($queue);
if($count < $maxMessages)
$maxMessages = $count;
}

if ($maxMessages > 0 ) {
for ($i = 0; $i < $maxMessages; $i++) {
$data = unserialize($this->_cache->get($queue->getName()));
$msgs[] = $data;
}
}

$options = array(
'queue' => $queue,
'data' => $msgs,
'messageClass' => $queue->getMessageClass(),
);
$classname = $queue->getMessageSetClass();
if (!class_exists($classname)) {
require_once 'Zend/Loader.php';
Zend_Loader::loadClass($classname);
}
return new $classname($options);
}

/**
* Delete a queue and all of it's messages
*
* Returns false if the queue is not found, true if the queue exists
*
* @param string $name queue name
* @return boolean
* @throws Zend_Queue_Exception
*/
public function delete($name)
{
$queue = $this->getQueue($name);
$count = $this->count($queue);
$this->receive($count, NULL, $queue);
}

/**
* Send a message to the queue
*
* @param string $message Message to send to the active queue
* @param Zend_Queue $queue
* @return Zend_Queue_Message
* @throws Zend_Queue_Exception
*/
public function send($message, Zend_Queue $queue=null)
{
if ($queue === null) {
$queue = $this->_queue;
}

if (!$this->isExists($queue->getName())) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
}

$message = serialize($message);
$data = array(
'message_id' => md5(uniqid(rand(), true)),
'handle' => null,
'body' => $message,
'md5' => md5($message),
);

$result = $this->_cache->set($queue->getName(), $message, 0, 0);
if ($result === false) {
require_once 'Zend/Queue/Exception.php';
throw new Zend_Queue_Exception('failed to insert message into queue:' . $queue->getName());
}

$options = array(
'queue' => $queue,
'data' => $data,
);

$classname = $queue->getMessageClass();
if (!class_exists($classname)) {
require_once 'Zend/Loader.php';
Zend_Loader::loadClass($classname);
}
return new $classname($options);
}


}
{code}
{zone-data}