web/lib/Zend/Queue/Adapter/Memcacheq.php
changeset 64 162c1de6545a
parent 19 1c2f13fd785c
child 68 ecaf28ffe26e
equal deleted inserted replaced
63:5b37998e522e 64:162c1de6545a
       
     1 <?php
       
     2 /**
       
     3  * Zend Framework
       
     4  *
       
     5  * LICENSE
       
     6  *
       
     7  * This source file is subject to the new BSD license that is bundled
       
     8  * with this package in the file LICENSE.txt.
       
     9  * It is also available through the world-wide-web at this URL:
       
    10  * http://framework.zend.com/license/new-bsd
       
    11  * If you did not receive a copy of the license and are unable to
       
    12  * obtain it through the world-wide-web, please send an email
       
    13  * to license@zend.com so we can send you a copy immediately.
       
    14  *
       
    15  * @category   Zend
       
    16  * @package    Zend_Queue
       
    17  * @subpackage Adapter
       
    18  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    19  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    20  * @version    $Id: Memcacheq.php 20096 2010-01-06 02:05:09Z bkarwin $
       
    21  */
       
    22 
       
    23 /**
       
    24  * @see Zend_Queue_Adapter_AdapterAbstract
       
    25  */
       
    26 require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
       
    27 
       
    28 /**
       
    29  * Class for using connecting to a Zend_Cache-based queuing system
       
    30  *
       
    31  * @category   Zend
       
    32  * @package    Zend_Queue
       
    33  * @subpackage Adapter
       
    34  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    35  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    36  */
       
    37 class Zend_Queue_Adapter_Memcacheq extends Zend_Queue_Adapter_AdapterAbstract
       
    38 {
       
    39     const DEFAULT_HOST = '127.0.0.1';
       
    40     const DEFAULT_PORT = 22201;
       
    41     const EOL          = "\r\n";
       
    42 
       
    43     /**
       
    44      * @var Memcache
       
    45      */
       
    46     protected $_cache = null;
       
    47 
       
    48     /**
       
    49      * @var string
       
    50      */
       
    51     protected $_host = null;
       
    52 
       
    53     /**
       
    54      * @var integer
       
    55      */
       
    56     protected $_port = null;
       
    57 
       
    58     /**
       
    59      * @var resource
       
    60      */
       
    61     protected $_socket = null;
       
    62 
       
    63     /********************************************************************
       
    64     * Constructor / Destructor
       
    65      *********************************************************************/
       
    66 
       
    67     /**
       
    68      * Constructor
       
    69      *
       
    70      * @param  array|Zend_Config $options
       
    71      * @param  null|Zend_Queue $queue
       
    72      * @return void
       
    73      */
       
    74     public function __construct($options, Zend_Queue $queue = null)
       
    75     {
       
    76         if (!extension_loaded('memcache')) {
       
    77             require_once 'Zend/Queue/Exception.php';
       
    78             throw new Zend_Queue_Exception('Memcache extension does not appear to be loaded');
       
    79         }
       
    80 
       
    81         parent::__construct($options, $queue);
       
    82 
       
    83         $options = &$this->_options['driverOptions'];
       
    84 
       
    85         if (!array_key_exists('host', $options)) {
       
    86             $options['host'] = self::DEFAULT_HOST;
       
    87         }
       
    88         if (!array_key_exists('port', $options)) {
       
    89             $options['port'] = self::DEFAULT_PORT;
       
    90         }
       
    91 
       
    92         $this->_cache = new Memcache();
       
    93 
       
    94         $result = $this->_cache->connect($options['host'], $options['port']);
       
    95 
       
    96         if ($result === false) {
       
    97             require_once 'Zend/Queue/Exception.php';
       
    98             throw new Zend_Queue_Exception('Could not connect to MemcacheQ');
       
    99         }
       
   100 
       
   101         $this->_host = $options['host'];
       
   102         $this->_port = (int)$options['port'];
       
   103     }
       
   104 
       
   105     /**
       
   106      * Destructor
       
   107      *
       
   108      * @return void
       
   109      */
       
   110     public function __destruct()
       
   111     {
       
   112         if ($this->_cache instanceof Memcache) {
       
   113             $this->_cache->close();
       
   114         }
       
   115         if (is_resource($this->_socket)) {
       
   116             $cmd = 'quit' . self::EOL;
       
   117             fwrite($this->_socket, $cmd);
       
   118             fclose($this->_socket);
       
   119         }
       
   120     }
       
   121 
       
   122     /********************************************************************
       
   123      * Queue management functions
       
   124      *********************************************************************/
       
   125 
       
   126     /**
       
   127      * Does a queue already exist?
       
   128      *
       
   129      * Throws an exception if the adapter cannot determine if a queue exists.
       
   130      * use isSupported('isExists') to determine if an adapter can test for
       
   131      * queue existance.
       
   132      *
       
   133      * @param  string $name
       
   134      * @return boolean
       
   135      * @throws Zend_Queue_Exception
       
   136      */
       
   137     public function isExists($name)
       
   138     {
       
   139         if (empty($this->_queues)) {
       
   140             $this->getQueues();
       
   141         }
       
   142 
       
   143         return in_array($name, $this->_queues);
       
   144     }
       
   145 
       
   146     /**
       
   147      * Create a new queue
       
   148      *
       
   149      * Visibility timeout is how long a message is left in the queue "invisible"
       
   150      * to other readers.  If the message is acknowleged (deleted) before the
       
   151      * timeout, then the message is deleted.  However, if the timeout expires
       
   152      * then the message will be made available to other queue readers.
       
   153      *
       
   154      * @param  string  $name    queue name
       
   155      * @param  integer $timeout default visibility timeout
       
   156      * @return boolean
       
   157      * @throws Zend_Queue_Exception
       
   158      */
       
   159     public function create($name, $timeout=null)
       
   160     {
       
   161         if ($this->isExists($name)) {
       
   162             return false;
       
   163         }
       
   164         if ($timeout === null) {
       
   165             $timeout = self::CREATE_TIMEOUT_DEFAULT;
       
   166         }
       
   167 
       
   168         // MemcacheQ does not have a method to "create" a queue
       
   169         // queues are created upon sending a packet.
       
   170         // We cannot use the send() and receive() functions because those
       
   171         // depend on the current name.
       
   172         $result = $this->_cache->set($name, 'creating queue', 0, 15);
       
   173         $result = $this->_cache->get($name);
       
   174 
       
   175         $this->_queues[] = $name;
       
   176 
       
   177         return true;
       
   178     }
       
   179 
       
   180     /**
       
   181      * Delete a queue and all of it's messages
       
   182      *
       
   183      * Returns false if the queue is not found, true if the queue exists
       
   184      *
       
   185      * @param  string  $name queue name
       
   186      * @return boolean
       
   187      * @throws Zend_Queue_Exception
       
   188      */
       
   189     public function delete($name)
       
   190     {
       
   191         $response = $this->_sendCommand('delete ' . $name, array('DELETED', 'NOT_FOUND'), true);
       
   192 
       
   193         if (in_array('DELETED', $response)) {
       
   194             $key = array_search($name, $this->_queues);
       
   195 
       
   196             if ($key !== false) {
       
   197                 unset($this->_queues[$key]);
       
   198             }
       
   199             return true;
       
   200         }
       
   201 
       
   202         return false;
       
   203     }
       
   204 
       
   205     /**
       
   206      * Get an array of all available queues
       
   207      *
       
   208      * Not all adapters support getQueues(), use isSupported('getQueues')
       
   209      * to determine if the adapter supports this feature.
       
   210      *
       
   211      * @return array
       
   212      * @throws Zend_Queue_Exception
       
   213      */
       
   214     public function getQueues()
       
   215     {
       
   216         $this->_queues = array();
       
   217 
       
   218         $response = $this->_sendCommand('stats queue', array('END'));
       
   219 
       
   220         foreach ($response as $i => $line) {
       
   221             $this->_queues[] = str_replace('STAT ', '', $line);
       
   222         }
       
   223 
       
   224         return $this->_queues;
       
   225     }
       
   226 
       
   227     /**
       
   228      * Return the approximate number of messages in the queue
       
   229      *
       
   230      * @param  Zend_Queue $queue
       
   231      * @return integer
       
   232      * @throws Zend_Queue_Exception (not supported)
       
   233      */
       
   234     public function count(Zend_Queue $queue=null)
       
   235     {
       
   236         require_once 'Zend/Queue/Exception.php';
       
   237         throw new Zend_Queue_Exception('count() is not supported in this adapter');
       
   238     }
       
   239 
       
   240     /********************************************************************
       
   241      * Messsage management functions
       
   242      *********************************************************************/
       
   243 
       
   244     /**
       
   245      * Send a message to the queue
       
   246      *
       
   247      * @param  string     $message Message to send to the active queue
       
   248      * @param  Zend_Queue $queue
       
   249      * @return Zend_Queue_Message
       
   250      * @throws Zend_Queue_Exception
       
   251      */
       
   252     public function send($message, Zend_Queue $queue=null)
       
   253     {
       
   254         if ($queue === null) {
       
   255             $queue = $this->_queue;
       
   256         }
       
   257 
       
   258         if (!$this->isExists($queue->getName())) {
       
   259             require_once 'Zend/Queue/Exception.php';
       
   260             throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
       
   261         }
       
   262 
       
   263         $message = (string) $message;
       
   264         $data    = array(
       
   265             'message_id' => md5(uniqid(rand(), true)),
       
   266             'handle'     => null,
       
   267             'body'       => $message,
       
   268             'md5'        => md5($message),
       
   269         );
       
   270 
       
   271         $result = $this->_cache->set($queue->getName(), $message, 0, 0);
       
   272         if ($result === false) {
       
   273             require_once 'Zend/Queue/Exception.php';
       
   274             throw new Zend_Queue_Exception('failed to insert message into queue:' . $queue->getName());
       
   275         }
       
   276 
       
   277         $options = array(
       
   278             'queue' => $queue,
       
   279             'data'  => $data,
       
   280         );
       
   281 
       
   282         $classname = $queue->getMessageClass();
       
   283         if (!class_exists($classname)) {
       
   284             require_once 'Zend/Loader.php';
       
   285             Zend_Loader::loadClass($classname);
       
   286         }
       
   287         return new $classname($options);
       
   288     }
       
   289 
       
   290     /**
       
   291      * Get messages in the queue
       
   292      *
       
   293      * @param  integer    $maxMessages  Maximum number of messages to return
       
   294      * @param  integer    $timeout      Visibility timeout for these messages
       
   295      * @param  Zend_Queue $queue
       
   296      * @return Zend_Queue_Message_Iterator
       
   297      * @throws Zend_Queue_Exception
       
   298      */
       
   299     public function receive($maxMessages=null, $timeout=null, Zend_Queue $queue=null)
       
   300     {
       
   301         if ($maxMessages === null) {
       
   302             $maxMessages = 1;
       
   303         }
       
   304 
       
   305         if ($timeout === null) {
       
   306             $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
       
   307         }
       
   308         if ($queue === null) {
       
   309             $queue = $this->_queue;
       
   310         }
       
   311 
       
   312         $msgs = array();
       
   313         if ($maxMessages > 0 ) {
       
   314             for ($i = 0; $i < $maxMessages; $i++) {
       
   315                 $data = array(
       
   316                     'handle' => md5(uniqid(rand(), true)),
       
   317                     'body'   => $this->_cache->get($queue->getName()),
       
   318                 );
       
   319 
       
   320                 $msgs[] = $data;
       
   321             }
       
   322         }
       
   323 
       
   324         $options = array(
       
   325             'queue'        => $queue,
       
   326             'data'         => $msgs,
       
   327             'messageClass' => $queue->getMessageClass(),
       
   328         );
       
   329 
       
   330         $classname = $queue->getMessageSetClass();
       
   331         if (!class_exists($classname)) {
       
   332             require_once 'Zend/Loader.php';
       
   333             Zend_Loader::loadClass($classname);
       
   334         }
       
   335         return new $classname($options);
       
   336     }
       
   337 
       
   338     /**
       
   339      * Delete a message from the queue
       
   340      *
       
   341      * Returns true if the message is deleted, false if the deletion is
       
   342      * unsuccessful.
       
   343      *
       
   344      * @param  Zend_Queue_Message $message
       
   345      * @return boolean
       
   346      * @throws Zend_Queue_Exception (unsupported)
       
   347      */
       
   348     public function deleteMessage(Zend_Queue_Message $message)
       
   349     {
       
   350         require_once 'Zend/Queue/Exception.php';
       
   351         throw new Zend_Queue_Exception('deleteMessage() is not supported in  ' . get_class($this));
       
   352     }
       
   353 
       
   354     /********************************************************************
       
   355      * Supporting functions
       
   356      *********************************************************************/
       
   357 
       
   358     /**
       
   359      * Return a list of queue capabilities functions
       
   360      *
       
   361      * $array['function name'] = true or false
       
   362      * true is supported, false is not supported.
       
   363      *
       
   364      * @param  string $name
       
   365      * @return array
       
   366      */
       
   367     public function getCapabilities()
       
   368     {
       
   369         return array(
       
   370             'create'        => true,
       
   371             'delete'        => true,
       
   372             'send'          => true,
       
   373             'receive'       => true,
       
   374             'deleteMessage' => false,
       
   375             'getQueues'     => true,
       
   376             'count'         => false,
       
   377             'isExists'      => true,
       
   378         );
       
   379     }
       
   380 
       
   381     /********************************************************************
       
   382      * Functions that are not part of the Zend_Queue_Adapter_Abstract
       
   383      *********************************************************************/
       
   384 
       
   385     /**
       
   386      * sends a command to MemcacheQ
       
   387      *
       
   388      * The memcache functions by php cannot handle all types of requests
       
   389      * supported by MemcacheQ
       
   390      * Non-standard requests are handled by this function.
       
   391      *
       
   392      * @param  string  $command - command to send to memcacheQ
       
   393      * @param  array   $terminator - strings to indicate end of memcacheQ response
       
   394      * @param  boolean $include_term - include terminator in response
       
   395      * @return array
       
   396      * @throws Zend_Queue_Exception if connection cannot be opened
       
   397      */
       
   398     protected function _sendCommand($command, array $terminator, $include_term=false)
       
   399     {
       
   400         if (!is_resource($this->_socket)) {
       
   401             $this->_socket = fsockopen($this->_host, $this->_port, $errno, $errstr, 10);
       
   402         }
       
   403         if ($this->_socket === false) {
       
   404             require_once 'Zend/Queue/Exception.php';
       
   405             throw new Zend_Queue_Exception("Could not open a connection to $this->_host:$this->_port errno=$errno : $errstr");
       
   406         }
       
   407 
       
   408         $response = array();
       
   409 
       
   410         $cmd = $command . self::EOL;
       
   411         fwrite($this->_socket, $cmd);
       
   412 
       
   413         $continue_reading = true;
       
   414         while (!feof($this->_socket) && $continue_reading) {
       
   415             $resp = trim(fgets($this->_socket, 1024));
       
   416             if (in_array($resp, $terminator)) {
       
   417                 if ($include_term) {
       
   418                     $response[] = $resp;
       
   419                 }
       
   420                 $continue_reading = false;
       
   421             } else {
       
   422                 $response[] = $resp;
       
   423             }
       
   424         }
       
   425 
       
   426         return $response;
       
   427     }
       
   428 }