web/lib/Zend/Queue/Adapter/Db.php
changeset 64 162c1de6545a
parent 19 1c2f13fd785c
child 68 ecaf28ffe26e
equal deleted inserted replaced
63:5b37998e522e 64:162c1de6545a
       
     1 <?php
       
     2 /**
       
     3  * Zend Framework
       
     4  *
       
     5  * LICENSE
       
     6  *
       
     7  * This source file is subject to the new BSD license that is bundled
       
     8  * with this package in the file LICENSE.txt.
       
     9  * It is also available through the world-wide-web at this URL:
       
    10  * http://framework.zend.com/license/new-bsd
       
    11  * If you did not receive a copy of the license and are unable to
       
    12  * obtain it through the world-wide-web, please send an email
       
    13  * to license@zend.com so we can send you a copy immediately.
       
    14  *
       
    15  * @category   Zend
       
    16  * @package    Zend_Queue
       
    17  * @subpackage Adapter
       
    18  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    19  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    20  * @version    $Id: Db.php 20096 2010-01-06 02:05:09Z bkarwin $
       
    21  */
       
    22 
       
    23 /**
       
    24  * @see Zend_Queue_Adapter_AdapterAbstract
       
    25  */
       
    26 require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
       
    27 
       
    28 /**
       
    29  * @see Zend_Db_Select
       
    30  */
       
    31 require_once 'Zend/Db/Select.php';
       
    32 
       
    33 /**
       
    34  * @see Zend_Db
       
    35  */
       
    36 require_once 'Zend/Db.php';
       
    37 
       
    38 /**
       
    39  * @see Zend_Queue_Adapter_Db_Queue
       
    40  */
       
    41 require_once 'Zend/Queue/Adapter/Db/Queue.php';
       
    42 
       
    43 /**
       
    44  * @see Zend_Queue_Adapter_Db_Message
       
    45  */
       
    46 require_once 'Zend/Queue/Adapter/Db/Message.php';
       
    47 
       
    48 /**
       
    49  * Class for using connecting to a Zend_Db-based queuing system
       
    50  *
       
    51  * @category   Zend
       
    52  * @package    Zend_Queue
       
    53  * @subpackage Adapter
       
    54  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    55  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    56  */
       
    57 class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
       
    58 {
       
    59     /**
       
    60      * @var Zend_Queue_Adapter_Db_Queue
       
    61      */
       
    62     protected $_queueTable = null;
       
    63 
       
    64     /**
       
    65      * @var Zend_Queue_Adapter_Db_Message
       
    66      */
       
    67     protected $_messageTable = null;
       
    68 
       
    69     /**
       
    70      * @var Zend_Db_Table_Row_Abstract
       
    71      */
       
    72     protected $_messageRow = null;
       
    73 
       
    74     /**
       
    75      * Constructor
       
    76      *
       
    77      * @param  array|Zend_Config $options
       
    78      * @param  Zend_Queue|null $queue
       
    79      * @return void
       
    80      */
       
    81     public function __construct($options, Zend_Queue $queue = null)
       
    82     {
       
    83         parent::__construct($options, $queue);
       
    84 
       
    85         if (!isset($this->_options['options'][Zend_Db_Select::FOR_UPDATE])) {
       
    86             // turn off auto update by default
       
    87             $this->_options['options'][Zend_Db_Select::FOR_UPDATE] = false;
       
    88         }
       
    89 
       
    90         if (!is_bool($this->_options['options'][Zend_Db_Select::FOR_UPDATE])) {
       
    91             require_once 'Zend/Queue/Exception.php';
       
    92             throw new Zend_Queue_Exception('Options array item: Zend_Db_Select::FOR_UPDATE must be boolean');
       
    93         }
       
    94 
       
    95         if (isset($this->_options['dbAdapter'])
       
    96             && $this->_options['dbAdapter'] instanceof Zend_Db_Adapter_Abstract) {
       
    97             $db = $this->_options['dbAdapter'];
       
    98         } else {
       
    99             $db = $this->_initDbAdapter();
       
   100         }
       
   101 
       
   102         $this->_queueTable = new Zend_Queue_Adapter_Db_Queue(array(
       
   103             'db' => $db,
       
   104         ));
       
   105 
       
   106         $this->_messageTable = new Zend_Queue_Adapter_Db_Message(array(
       
   107             'db' => $db,
       
   108         ));
       
   109 
       
   110     }
       
   111 
       
   112     /**
       
   113      * Initialize Db adapter using 'driverOptions' section of the _options array
       
   114      *
       
   115      * Throws an exception if the adapter cannot connect to DB.
       
   116      *
       
   117      * @return Zend_Db_Adapter_Abstract
       
   118      * @throws Zend_Queue_Exception
       
   119      */
       
   120     protected function _initDbAdapter()
       
   121     {
       
   122         $options = &$this->_options['driverOptions'];
       
   123         if (!array_key_exists('type', $options)) {
       
   124             require_once 'Zend/Queue/Exception.php';
       
   125             throw new Zend_Queue_Exception("Configuration array must have a key for 'type' for the database type to use");
       
   126         }
       
   127 
       
   128         if (!array_key_exists('host', $options)) {
       
   129             require_once 'Zend/Queue/Exception.php';
       
   130             throw new Zend_Queue_Exception("Configuration array must have a key for 'host' for the host to use");
       
   131         }
       
   132 
       
   133         if (!array_key_exists('username', $options)) {
       
   134             require_once 'Zend/Queue/Exception.php';
       
   135             throw new Zend_Queue_Exception("Configuration array must have a key for 'username' for the username to use");
       
   136         }
       
   137 
       
   138         if (!array_key_exists('password', $options)) {
       
   139             require_once 'Zend/Queue/Exception.php';
       
   140             throw new Zend_Queue_Exception("Configuration array must have a key for 'password' for the password to use");
       
   141         }
       
   142 
       
   143         if (!array_key_exists('dbname', $options)) {
       
   144             require_once 'Zend/Queue/Exception.php';
       
   145             throw new Zend_Queue_Exception("Configuration array must have a key for 'dbname' for the database to use");
       
   146         }
       
   147 
       
   148         $type = $options['type'];
       
   149         unset($options['type']);
       
   150 
       
   151         try {
       
   152             $db = Zend_Db::factory($type, $options);
       
   153         } catch (Zend_Db_Exception $e) {
       
   154             require_once 'Zend/Queue/Exception.php';
       
   155             throw new Zend_Queue_Exception('Error connecting to database: ' . $e->getMessage(), $e->getCode(), $e);
       
   156         }
       
   157 
       
   158         return $db;
       
   159     }
       
   160 
       
   161     /********************************************************************
       
   162      * Queue management functions
       
   163      *********************************************************************/
       
   164 
       
   165     /**
       
   166      * Does a queue already exist?
       
   167      *
       
   168      * Throws an exception if the adapter cannot determine if a queue exists.
       
   169      * use isSupported('isExists') to determine if an adapter can test for
       
   170      * queue existance.
       
   171      *
       
   172      * @param  string $name
       
   173      * @return boolean
       
   174      * @throws Zend_Queue_Exception
       
   175      */
       
   176     public function isExists($name)
       
   177     {
       
   178         $id = 0;
       
   179 
       
   180         try {
       
   181             $id = $this->getQueueId($name);
       
   182         } catch (Zend_Queue_Exception $e) {
       
   183             return false;
       
   184         }
       
   185 
       
   186         return ($id > 0);
       
   187     }
       
   188 
       
   189     /**
       
   190      * Create a new queue
       
   191      *
       
   192      * Visibility timeout is how long a message is left in the queue "invisible"
       
   193      * to other readers.  If the message is acknowleged (deleted) before the
       
   194      * timeout, then the message is deleted.  However, if the timeout expires
       
   195      * then the message will be made available to other queue readers.
       
   196      *
       
   197      * @param  string  $name    queue name
       
   198      * @param  integer $timeout default visibility timeout
       
   199      * @return boolean
       
   200      * @throws Zend_Queue_Exception - database error
       
   201      */
       
   202     public function create($name, $timeout = null)
       
   203     {
       
   204         if ($this->isExists($name)) {
       
   205             return false;
       
   206         }
       
   207 
       
   208         $queue = $this->_queueTable->createRow();
       
   209         $queue->queue_name = $name;
       
   210         $queue->timeout    = ($timeout === null) ? self::CREATE_TIMEOUT_DEFAULT : (int)$timeout;
       
   211 
       
   212         try {
       
   213             if ($queue->save()) {
       
   214                 return true;
       
   215             }
       
   216         } catch (Exception $e) {
       
   217             require_once 'Zend/Queue/Exception.php';
       
   218             throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e);
       
   219         }
       
   220 
       
   221         return false;
       
   222     }
       
   223 
       
   224     /**
       
   225      * Delete a queue and all of it's messages
       
   226      *
       
   227      * Returns false if the queue is not found, true if the queue exists
       
   228      *
       
   229      * @param  string  $name queue name
       
   230      * @return boolean
       
   231      * @throws Zend_Queue_Exception - database error
       
   232      */
       
   233     public function delete($name)
       
   234     {
       
   235         $id = $this->getQueueId($name); // get primary key
       
   236 
       
   237         // if the queue does not exist then it must already be deleted.
       
   238         $list = $this->_queueTable->find($id);
       
   239         if (count($list) === 0) {
       
   240             return false;
       
   241         }
       
   242         $queue = $list->current();
       
   243 
       
   244         if ($queue instanceof Zend_Db_Table_Row_Abstract) {
       
   245             try {
       
   246                 $queue->delete();
       
   247             } catch (Exception $e) {
       
   248                 require_once 'Zend/Queue/Exception.php';
       
   249                 throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e);
       
   250             }
       
   251         }
       
   252 
       
   253         if (array_key_exists($name, $this->_queues)) {
       
   254             unset($this->_queues[$name]);
       
   255         }
       
   256 
       
   257         return true;
       
   258     }
       
   259 
       
   260     /*
       
   261      * Get an array of all available queues
       
   262      *
       
   263      * Not all adapters support getQueues(), use isSupported('getQueues')
       
   264      * to determine if the adapter supports this feature.
       
   265      *
       
   266      * @return array
       
   267      * @throws Zend_Queue_Exception - database error
       
   268      */
       
   269     public function getQueues()
       
   270     {
       
   271         $query = $this->_queueTable->select();
       
   272         $query->from($this->_queueTable, array('queue_id', 'queue_name'));
       
   273 
       
   274         $this->_queues = array();
       
   275         foreach ($this->_queueTable->fetchAll($query) as $queue) {
       
   276             $this->_queues[$queue->queue_name] = (int)$queue->queue_id;
       
   277         }
       
   278 
       
   279         $list = array_keys($this->_queues);
       
   280 
       
   281         return $list;
       
   282     }
       
   283 
       
   284     /**
       
   285      * Return the approximate number of messages in the queue
       
   286      *
       
   287      * @param  Zend_Queue $queue
       
   288      * @return integer
       
   289      * @throws Zend_Queue_Exception
       
   290      */
       
   291     public function count(Zend_Queue $queue = null)
       
   292     {
       
   293         if ($queue === null) {
       
   294             $queue = $this->_queue;
       
   295         }
       
   296 
       
   297         $info  = $this->_messageTable->info();
       
   298         $db    = $this->_messageTable->getAdapter();
       
   299         $query = $db->select();
       
   300         $query->from($info['name'], array(new Zend_Db_Expr('COUNT(1)')))
       
   301               ->where('queue_id=?', $this->getQueueId($queue->getName()));
       
   302 
       
   303         // return count results
       
   304         return (int) $db->fetchOne($query);
       
   305     }
       
   306 
       
   307     /********************************************************************
       
   308     * Messsage management functions
       
   309      *********************************************************************/
       
   310 
       
   311     /**
       
   312      * Send a message to the queue
       
   313      *
       
   314      * @param  string     $message Message to send to the active queue
       
   315      * @param  Zend_Queue $queue
       
   316      * @return Zend_Queue_Message
       
   317      * @throws Zend_Queue_Exception - database error
       
   318      */
       
   319     public function send($message, Zend_Queue $queue = null)
       
   320     {
       
   321         if ($this->_messageRow === null) {
       
   322             $this->_messageRow = $this->_messageTable->createRow();
       
   323         }
       
   324 
       
   325         if ($queue === null) {
       
   326             $queue = $this->_queue;
       
   327         }
       
   328 
       
   329         if (is_scalar($message)) {
       
   330             $message = (string) $message;
       
   331         }
       
   332         if (is_string($message)) {
       
   333             $message = trim($message);
       
   334         }
       
   335 
       
   336         if (!$this->isExists($queue->getName())) {
       
   337             require_once 'Zend/Queue/Exception.php';
       
   338             throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
       
   339         }
       
   340 
       
   341         $msg           = clone $this->_messageRow;
       
   342         $msg->queue_id = $this->getQueueId($queue->getName());
       
   343         $msg->created  = time();
       
   344         $msg->body     = $message;
       
   345         $msg->md5      = md5($message);
       
   346         // $msg->timeout = ??? @TODO
       
   347 
       
   348         try {
       
   349             $msg->save();
       
   350         } catch (Exception $e) {
       
   351             require_once 'Zend/Queue/Exception.php';
       
   352             throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e);
       
   353         }
       
   354 
       
   355         $options = array(
       
   356             'queue' => $queue,
       
   357             'data'  => $msg->toArray(),
       
   358         );
       
   359 
       
   360         $classname = $queue->getMessageClass();
       
   361         if (!class_exists($classname)) {
       
   362             require_once 'Zend/Loader.php';
       
   363             Zend_Loader::loadClass($classname);
       
   364         }
       
   365         return new $classname($options);
       
   366     }
       
   367 
       
   368     /**
       
   369      * Get messages in the queue
       
   370      *
       
   371      * @param  integer    $maxMessages  Maximum number of messages to return
       
   372      * @param  integer    $timeout      Visibility timeout for these messages
       
   373      * @param  Zend_Queue $queue
       
   374      * @return Zend_Queue_Message_Iterator
       
   375      * @throws Zend_Queue_Exception - database error
       
   376      */
       
   377     public function receive($maxMessages = null, $timeout = null, Zend_Queue $queue = null)
       
   378     {
       
   379         if ($maxMessages === null) {
       
   380             $maxMessages = 1;
       
   381         }
       
   382         if ($timeout === null) {
       
   383             $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
       
   384         }
       
   385         if ($queue === null) {
       
   386             $queue = $this->_queue;
       
   387         }
       
   388 
       
   389         $msgs      = array();
       
   390         $info      = $this->_messageTable->info();
       
   391         $microtime = microtime(true); // cache microtime
       
   392         $db        = $this->_messageTable->getAdapter();
       
   393 
       
   394         // start transaction handling
       
   395         try {
       
   396             if ( $maxMessages > 0 ) { // ZF-7666 LIMIT 0 clause not included.
       
   397                 $db->beginTransaction();
       
   398 
       
   399                 $query = $db->select();
       
   400                 if ($this->_options['options'][Zend_Db_Select::FOR_UPDATE]) {
       
   401                     // turn on forUpdate
       
   402                     $query->forUpdate();
       
   403                 }
       
   404                 $query->from($info['name'], array('*'))
       
   405                       ->where('queue_id=?', $this->getQueueId($queue->getName()))
       
   406                       ->where('handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime)
       
   407                       ->limit($maxMessages);
       
   408 
       
   409                 foreach ($db->fetchAll($query) as $data) {
       
   410                     // setup our changes to the message
       
   411                     $data['handle'] = md5(uniqid(rand(), true));
       
   412 
       
   413                     $update = array(
       
   414                         'handle'  => $data['handle'],
       
   415                         'timeout' => $microtime,
       
   416                     );
       
   417 
       
   418                     // update the database
       
   419                     $where   = array();
       
   420                     $where[] = $db->quoteInto('message_id=?', $data['message_id']);
       
   421                     $where[] = 'handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime;
       
   422 
       
   423                     $count = $db->update($info['name'], $update, $where);
       
   424 
       
   425                     // we check count to make sure no other thread has gotten
       
   426                     // the rows after our select, but before our update.
       
   427                     if ($count > 0) {
       
   428                         $msgs[] = $data;
       
   429                     }
       
   430                 }
       
   431                 $db->commit();
       
   432             }
       
   433         } catch (Exception $e) {
       
   434             $db->rollBack();
       
   435 
       
   436             require_once 'Zend/Queue/Exception.php';
       
   437             throw new Zend_Queue_Exception($e->getMessage(), $e->getCode(), $e);
       
   438         }
       
   439 
       
   440         $options = array(
       
   441             'queue'        => $queue,
       
   442             'data'         => $msgs,
       
   443             'messageClass' => $queue->getMessageClass(),
       
   444         );
       
   445 
       
   446         $classname = $queue->getMessageSetClass();
       
   447         if (!class_exists($classname)) {
       
   448             require_once 'Zend/Loader.php';
       
   449             Zend_Loader::loadClass($classname);
       
   450         }
       
   451         return new $classname($options);
       
   452     }
       
   453 
       
   454     /**
       
   455      * Delete a message from the queue
       
   456      *
       
   457      * Returns true if the message is deleted, false if the deletion is
       
   458      * unsuccessful.
       
   459      *
       
   460      * @param  Zend_Queue_Message $message
       
   461      * @return boolean
       
   462      * @throws Zend_Queue_Exception - database error
       
   463      */
       
   464     public function deleteMessage(Zend_Queue_Message $message)
       
   465     {
       
   466         $db    = $this->_messageTable->getAdapter();
       
   467         $where = $db->quoteInto('handle=?', $message->handle);
       
   468 
       
   469         if ($this->_messageTable->delete($where)) {
       
   470             return true;
       
   471         }
       
   472 
       
   473         return false;
       
   474     }
       
   475 
       
   476     /********************************************************************
       
   477      * Supporting functions
       
   478      *********************************************************************/
       
   479 
       
   480     /**
       
   481      * Return a list of queue capabilities functions
       
   482      *
       
   483      * $array['function name'] = true or false
       
   484      * true is supported, false is not supported.
       
   485      *
       
   486      * @param  string $name
       
   487      * @return array
       
   488      */
       
   489     public function getCapabilities()
       
   490     {
       
   491         return array(
       
   492             'create'        => true,
       
   493             'delete'        => true,
       
   494             'send'          => true,
       
   495             'receive'       => true,
       
   496             'deleteMessage' => true,
       
   497             'getQueues'     => true,
       
   498             'count'         => true,
       
   499             'isExists'      => true,
       
   500         );
       
   501     }
       
   502 
       
   503     /********************************************************************
       
   504      * Functions that are not part of the Zend_Queue_Adapter_Abstract
       
   505      *********************************************************************/
       
   506     /**
       
   507      * Get the queue ID
       
   508      *
       
   509      * Returns the queue's row identifier.
       
   510      *
       
   511      * @param  string       $name
       
   512      * @return integer|null
       
   513      * @throws Zend_Queue_Exception
       
   514      */
       
   515     protected function getQueueId($name)
       
   516     {
       
   517         if (array_key_exists($name, $this->_queues)) {
       
   518             return $this->_queues[$name];
       
   519         }
       
   520 
       
   521         $query = $this->_queueTable->select();
       
   522         $query->from($this->_queueTable, array('queue_id'))
       
   523               ->where('queue_name=?', $name);
       
   524 
       
   525         $queue = $this->_queueTable->fetchRow($query);
       
   526 
       
   527         if ($queue === null) {
       
   528             require_once 'Zend/Queue/Exception.php';
       
   529             throw new Zend_Queue_Exception('Queue does not exist: ' . $name);
       
   530         }
       
   531 
       
   532         $this->_queues[$name] = (int)$queue->queue_id;
       
   533 
       
   534         return $this->_queues[$name];
       
   535     }
       
   536 }