web/lib/Zend/Queue/Adapter/PlatformJobQueue.php
changeset 64 162c1de6545a
parent 19 1c2f13fd785c
child 68 ecaf28ffe26e
equal deleted inserted replaced
63:5b37998e522e 64:162c1de6545a
       
     1 <?php
       
     2 /**
       
     3  * Zend Framework
       
     4  *
       
     5  * LICENSE
       
     6  *
       
     7  * This source file is subject to the new BSD license that is bundled
       
     8  * with this package in the file LICENSE.txt.
       
     9  * It is also available through the world-wide-web at this URL:
       
    10  * http://framework.zend.com/license/new-bsd
       
    11  * If you did not receive a copy of the license and are unable to
       
    12  * obtain it through the world-wide-web, please send an email
       
    13  * to license@zend.com so we can send you a copy immediately.
       
    14  *
       
    15  * @category   Zend
       
    16  * @package    Zend_Queue
       
    17  * @subpackage Adapter
       
    18  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    19  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    20  * @version    $Id: PlatformJobQueue.php 20096 2010-01-06 02:05:09Z bkarwin $
       
    21  */
       
    22 
       
    23 /**
       
    24  * @see Zend_Queue_Adapter_AdapterAbstract
       
    25  */
       
    26 require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
       
    27 
       
    28 /**
       
    29  * Zend Platform JobQueue adapter
       
    30  *
       
    31  * @category   Zend
       
    32  * @package    Zend_Queue
       
    33  * @subpackage Adapter
       
    34  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    35  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    36  */
       
    37 class Zend_Queue_Adapter_PlatformJobQueue extends Zend_Queue_Adapter_AdapterAbstract
       
    38 {
       
    39     /**
       
    40      * @var ZendApi_JobQueue
       
    41      */
       
    42     protected $_zendQueue;
       
    43 
       
    44     /**
       
    45      * Constructor
       
    46      *
       
    47      * @param  array|Zend_Config $options
       
    48      * @param  Zend_Queue|null $queue
       
    49      * @return void
       
    50      */
       
    51     public function __construct($options, Zend_Queue $queue = null)
       
    52     {
       
    53         parent::__construct($options, $queue);
       
    54 
       
    55         if (!extension_loaded("jobqueue_client")) {
       
    56             require_once 'Zend/Queue/Exception.php';
       
    57             throw new Zend_Queue_Exception('Platform Job Queue extension does not appear to be loaded');
       
    58         }
       
    59 
       
    60         if (! isset($this->_options['daemonOptions'])) {
       
    61             require_once 'Zend/Queue/Exception.php';
       
    62             throw new Zend_Queue_Exception('Job Queue host and password should be provided');
       
    63         }
       
    64 
       
    65         $options = $this->_options['daemonOptions'];
       
    66 
       
    67         if (!array_key_exists('host', $options)) {
       
    68             require_once 'Zend/Queue/Exception.php';
       
    69             throw new Zend_Queue_Exception('Platform Job Queue host should be provided');
       
    70         }
       
    71         if (!array_key_exists('password', $options)) {
       
    72             require_once 'Zend/Queue/Exception.php';
       
    73             throw new Zend_Queue_Exception('Platform Job Queue password should be provided');
       
    74         }
       
    75 
       
    76         $this->_zendQueue = new ZendApi_Queue($options['host']);
       
    77 
       
    78         if (!$this->_zendQueue) {
       
    79             require_once 'Zend/Queue/Exception.php';
       
    80             throw new Zend_Queue_Exception('Platform Job Queue connection failed');
       
    81         }
       
    82         if (!$this->_zendQueue->login($options['password'])) {
       
    83             require_once 'Zend/Queue/Exception.php';
       
    84             throw new Zend_Queue_Exception('Job Queue login failed');
       
    85         }
       
    86 
       
    87         if ($this->_queue) {
       
    88             $this->_queue->setMessageClass('Zend_Queue_Message_PlatformJob');
       
    89         }
       
    90     }
       
    91 
       
    92     /********************************************************************
       
    93      * Queue management functions
       
    94      ********************************************************************/
       
    95 
       
    96     /**
       
    97      * Does a queue already exist?
       
    98      *
       
    99      * @param  string $name
       
   100      * @return boolean
       
   101      * @throws Zend_Queue_Exception (not supported)
       
   102      */
       
   103     public function isExists($name)
       
   104     {
       
   105         require_once 'Zend/Queue/Exception.php';
       
   106         throw new Zend_Queue_Exception('isExists() is not supported in this adapter');
       
   107     }
       
   108 
       
   109     /**
       
   110      * Create a new queue
       
   111      *
       
   112      * @param  string  $name    queue name
       
   113      * @param  integer $timeout default visibility timeout
       
   114      * @return void
       
   115      * @throws Zend_Queue_Exception
       
   116      */
       
   117     public function create($name, $timeout=null)
       
   118     {
       
   119         require_once 'Zend/Queue/Exception.php';
       
   120         throw new Zend_Queue_Exception('create() is not supported in ' . get_class($this));
       
   121     }
       
   122 
       
   123     /**
       
   124      * Delete a queue and all of its messages
       
   125      *
       
   126      * @param  string $name queue name
       
   127      * @return void
       
   128      * @throws Zend_Queue_Exception
       
   129      */
       
   130     public function delete($name)
       
   131     {
       
   132         require_once 'Zend/Queue/Exception.php';
       
   133         throw new Zend_Queue_Exception('delete() is not supported in ' . get_class($this));
       
   134     }
       
   135 
       
   136     /**
       
   137      * Get an array of all available queues
       
   138      *
       
   139      * @return void
       
   140      * @throws Zend_Queue_Exception
       
   141      */
       
   142     public function getQueues()
       
   143     {
       
   144         require_once 'Zend/Queue/Exception.php';
       
   145         throw new Zend_Queue_Exception('getQueues() is not supported in this adapter');
       
   146     }
       
   147 
       
   148     /**
       
   149      * Return the approximate number of messages in the queue
       
   150      *
       
   151      * @param  Zend_Queue|null $queue
       
   152      * @return integer
       
   153      */
       
   154     public function count(Zend_Queue $queue = null)
       
   155     {
       
   156         if ($queue !== null) {
       
   157             require_once 'Zend/Queue/Exception.php';
       
   158             throw new Zend_Queue_Exception('Queue parameter is not supported');
       
   159         }
       
   160 
       
   161         return $this->_zendQueue->getNumOfJobsInQueue();
       
   162     }
       
   163 
       
   164     /********************************************************************
       
   165      * Messsage management functions
       
   166      ********************************************************************/
       
   167 
       
   168     /**
       
   169      * Send a message to the queue
       
   170      *
       
   171      * @param  array | ZendAPI_job $message Message to send to the active queue
       
   172      * @param  Zend_Queue $queue     Not supported
       
   173      * @return Zend_Queue_Message
       
   174      * @throws Zend_Queue_Exception
       
   175      */
       
   176     public function send($message, Zend_Queue $queue = null)
       
   177     {
       
   178         if ($queue !== null) {
       
   179             require_once 'Zend/Queue/Exception.php';
       
   180             throw new Zend_Queue_Exception('Queue parameter is not supported');
       
   181         }
       
   182 
       
   183         // This adapter can work only for this message type
       
   184         $classname = $this->_queue->getMessageClass();
       
   185         if (!class_exists($classname)) {
       
   186             require_once 'Zend/Loader.php';
       
   187             Zend_Loader::loadClass($classname);
       
   188         }
       
   189 
       
   190         if ($message instanceof ZendAPI_Job) {
       
   191             $message = array('data' => $message);
       
   192         }
       
   193 
       
   194         $zendApiJob = new $classname($message);
       
   195 
       
   196         // Unfortunately, the Platform JQ API is PHP4-style...
       
   197         $platformJob = $zendApiJob->getJob();
       
   198 
       
   199         $jobId = $this->_zendQueue->addJob($platformJob);
       
   200 
       
   201         if (!$jobId) {
       
   202             require_once 'Zend/Queue/Exception.php';
       
   203             throw new Zend_Queue_Exception('Failed to add a job to queue: '
       
   204                 . $this->_zendQueue->getLastError());
       
   205         }
       
   206 
       
   207         $zendApiJob->setJobId($jobId);
       
   208         return $zendApiJob;
       
   209     }
       
   210 
       
   211     /**
       
   212      * Get messages in the queue
       
   213      *
       
   214      * @param  integer    $maxMessages  Maximum number of messages to return
       
   215      * @param  integer    $timeout      Ignored
       
   216      * @param  Zend_Queue $queue        Not supported
       
   217      * @throws Zend_Queue_Exception
       
   218      * @return ArrayIterator
       
   219      */
       
   220     public function receive($maxMessages = null, $timeout = null, Zend_Queue $queue = null)
       
   221     {
       
   222         if ($maxMessages === null) {
       
   223             $maxMessages = 1;
       
   224         }
       
   225 
       
   226         if ($queue !== null) {
       
   227             require_once 'Zend/Queue/Exception.php';
       
   228             throw new Zend_Queue_Exception('Queue shouldn\'t be set');
       
   229         }
       
   230 
       
   231         $jobs = $this->_zendQueue->getJobsInQueue(null, $maxMessages, true);
       
   232 
       
   233         $classname = $this->_queue->getMessageClass();
       
   234         if (!class_exists($classname)) {
       
   235             require_once 'Zend/Loader.php';
       
   236             Zend_Loader::loadClass($classname);
       
   237         }
       
   238 
       
   239         $options = array(
       
   240             'queue'        => $this->_queue,
       
   241             'data'         => $jobs,
       
   242             'messageClass' => $this->_queue->getMessageClass(),
       
   243         );
       
   244 
       
   245         $classname = $this->_queue->getMessageSetClass();
       
   246 
       
   247         if (!class_exists($classname)) {
       
   248             require_once 'Zend/Loader.php';
       
   249             Zend_Loader::loadClass($classname);
       
   250         }
       
   251         return new $classname($options);
       
   252     }
       
   253 
       
   254     /**
       
   255      * Delete a message from the queue
       
   256      *
       
   257      * Returns true if the message is deleted, false if the deletion is
       
   258      * unsuccessful.
       
   259      *
       
   260      * @param  Zend_Queue_Message $message
       
   261      * @return boolean
       
   262      * @throws Zend_Queue_Exception
       
   263      */
       
   264     public function deleteMessage(Zend_Queue_Message $message)
       
   265     {
       
   266         if (get_class($message) != $this->_queue->getMessageClass()) {
       
   267             require_once 'Zend/Queue/Exception.php';
       
   268             throw new Zend_Queue_Exception(
       
   269                 'Failed to remove job from the queue; only messages of type '
       
   270                 . 'Zend_Queue_Message_PlatformJob may be used'
       
   271             );
       
   272         }
       
   273 
       
   274         return $this->_zendQueue->removeJob($message->getJobId());
       
   275     }
       
   276 
       
   277     public function isJobIdExist($id)
       
   278     {
       
   279          return (($this->_zendQueue->getJob($id))? true : false);
       
   280     }
       
   281 
       
   282     /********************************************************************
       
   283      * Supporting functions
       
   284      ********************************************************************/
       
   285 
       
   286     /**
       
   287      * Return a list of queue capabilities functions
       
   288      *
       
   289      * $array['function name'] = true or false
       
   290      * true is supported, false is not supported.
       
   291      *
       
   292      * @param  string $name
       
   293      * @return array
       
   294      */
       
   295     public function getCapabilities()
       
   296     {
       
   297          return array(
       
   298             'create'                => false,
       
   299             'delete'                => false,
       
   300             'getQueues'             => false,
       
   301             'isExists'              => false,
       
   302             'count'                 => true,
       
   303             'send'                  => true,
       
   304             'receive'               => true,
       
   305             'deleteMessage'         => true,
       
   306         );
       
   307     }
       
   308 
       
   309     /********************************************************************
       
   310      * Functions that are not part of the Zend_Queue_Adapter_AdapterAbstract
       
   311      ********************************************************************/
       
   312 
       
   313     /**
       
   314      * Serialize
       
   315      *
       
   316      * @return array
       
   317      */
       
   318     public function __sleep()
       
   319     {
       
   320         return array('_options');
       
   321     }
       
   322 
       
   323     /**
       
   324      * Unserialize
       
   325      *
       
   326      * @return void
       
   327      */
       
   328     public function __wakeup()
       
   329     {
       
   330         $options = $this->_options['daemonOptions'];
       
   331 
       
   332         $this->_zendQueue = new ZendApi_Queue($options['host']);
       
   333 
       
   334         if (!$this->_zendQueue) {
       
   335             require_once 'Zend/Queue/Exception.php';
       
   336             throw new Zend_Queue_Exception('Platform Job Queue connection failed');
       
   337         }
       
   338         if (!$this->_zendQueue->login($options['password'])) {
       
   339             require_once 'Zend/Queue/Exception.php';
       
   340             throw new Zend_Queue_Exception('Job Queue login failed');
       
   341         }
       
   342     }
       
   343 }