web/lib/Zend/Queue/Stomp/Client/Connection.php
changeset 64 162c1de6545a
parent 19 1c2f13fd785c
child 68 ecaf28ffe26e
equal deleted inserted replaced
63:5b37998e522e 64:162c1de6545a
       
     1 <?php
       
     2 /**
       
     3  * Zend Framework
       
     4  *
       
     5  * LICENSE
       
     6  *
       
     7  * This source file is subject to the new BSD license that is bundled
       
     8  * with this package in the file LICENSE.txt.
       
     9  * It is also available through the world-wide-web at this URL:
       
    10  * http://framework.zend.com/license/new-bsd
       
    11  * If you did not receive a copy of the license and are unable to
       
    12  * obtain it through the world-wide-web, please send an email
       
    13  * to license@zend.com so we can send you a copy immediately.
       
    14  *
       
    15  * @category   Zend
       
    16  * @package    Zend_Queue
       
    17  * @subpackage Stomp
       
    18  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    19  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    20  * @version    $Id: Connection.php 20096 2010-01-06 02:05:09Z bkarwin $
       
    21  */
       
    22 
       
    23 /**
       
    24  * @see Zend_Queue_Stomp_Client_ConnectionInterface
       
    25  */
       
    26 require_once 'Zend/Queue/Stomp/Client/ConnectionInterface.php';
       
    27 
       
    28 /**
       
    29  * The Stomp client interacts with a Stomp server.
       
    30  *
       
    31  * @category   Zend
       
    32  * @package    Zend_Queue
       
    33  * @subpackage Stomp
       
    34  * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
       
    35  * @license    http://framework.zend.com/license/new-bsd     New BSD License
       
    36  */
       
    37 class Zend_Queue_Stomp_Client_Connection
       
    38     implements Zend_Queue_Stomp_Client_ConnectionInterface
       
    39 {
       
    40     const READ_TIMEOUT_DEFAULT_USEC = 0; // 0 microseconds
       
    41     const READ_TIMEOUT_DEFAULT_SEC = 5; // 5 seconds
       
    42 
       
    43     /**
       
    44      * Connection options
       
    45      * @var array
       
    46      */
       
    47     protected $_options;
       
    48 
       
    49     /**
       
    50      * tcp/udp socket
       
    51      *
       
    52      * @var resource
       
    53      */
       
    54     protected $_socket = false;
       
    55 
       
    56     /**
       
    57      * open() opens a socket to the Stomp server
       
    58      *
       
    59      * @param  array $options ('scheme', 'host', 'port')
       
    60      * @param  string $scheme
       
    61      * @param  string $host
       
    62      * @param  int $port
       
    63      * @param  array $options Accepts "timeout_sec" and "timeout_usec" keys
       
    64      * @return true;
       
    65      * @throws Zend_Queue_Exception
       
    66      */
       
    67     public function open($scheme, $host, $port, array $options = array())
       
    68     {
       
    69         $str = $scheme . '://' . $host;
       
    70         $this->_socket = fsockopen($str, $port, $errno, $errstr);
       
    71 
       
    72         if ($this->_socket === false) {
       
    73             // aparently there is some reason that fsockopen will return false
       
    74             // but it normally throws an error.
       
    75             require_once 'Zend/Queue/Exception.php';
       
    76             throw new Zend_Queue_Exception("Unable to connect to $str; error = $errstr ( errno = $errno )");
       
    77         }
       
    78 
       
    79         stream_set_blocking($this->_socket, 0); // non blocking
       
    80 
       
    81         if (!isset($options['timeout_sec'])) {
       
    82             $options['timeout_sec'] = self::READ_TIMEOUT_DEFAULT_SEC;
       
    83         }
       
    84         if (! isset($options['timeout_usec'])) {
       
    85             $options['timeout_usec'] = self::READ_TIMEOUT_DEFAULT_USEC;
       
    86         }
       
    87 
       
    88         $this->_options = $options;
       
    89 
       
    90         return true;
       
    91     }
       
    92 
       
    93     /**
       
    94      * Close the socket explicitly when destructed
       
    95      *
       
    96      * @return void
       
    97      */
       
    98     public function __destruct()
       
    99     {
       
   100     }
       
   101 
       
   102     /**
       
   103      * Close connection
       
   104      *
       
   105      * @param  boolean $destructor
       
   106      * @return void
       
   107      */
       
   108     public function close($destructor = false)
       
   109     {
       
   110         // Gracefully disconnect
       
   111         if (!$destructor) {
       
   112             $frame = $this->createFrame();
       
   113             $frame->setCommand('DISCONNECT');
       
   114             $this->write($frame);
       
   115         }
       
   116 
       
   117         // @todo: Should be fixed.
       
   118         // When the socket is "closed", it will trigger the below error when php exits
       
   119         // Fatal error: Exception thrown without a stack frame in Unknown on line 0
       
   120 
       
   121         // Danlo: I suspect this is because this has already been claimed by the interpeter
       
   122         // thus trying to shutdown this resources, which is already shutdown is a problem.
       
   123         if (is_resource($this->_socket)) {
       
   124             // fclose($this->_socket);
       
   125         }
       
   126 
       
   127         // $this->_socket = null;
       
   128     }
       
   129 
       
   130     /**
       
   131      * Check whether we are connected to the server
       
   132      *
       
   133      * @return true
       
   134      * @throws Zend_Queue_Exception
       
   135      */
       
   136     public function ping()
       
   137     {
       
   138         if (!is_resource($this->_socket)) {
       
   139             require_once 'Zend/Queue/Exception.php';
       
   140             throw new Zend_Queue_Exception('Not connected to Stomp server');
       
   141         }
       
   142         return true;
       
   143     }
       
   144 
       
   145     /**
       
   146      * Write a frame to the stomp server
       
   147      *
       
   148      * example: $response = $client->write($frame)->read();
       
   149      *
       
   150      * @param Zend_Queue_Stom_FrameInterface $frame
       
   151      * @return $this
       
   152      */
       
   153     public function write(Zend_Queue_Stomp_FrameInterface $frame)
       
   154     {
       
   155         $this->ping();
       
   156         $output = $frame->toFrame();
       
   157 
       
   158         $bytes = fwrite($this->_socket, $output, strlen($output));
       
   159         if ($bytes === false || $bytes == 0) {
       
   160             require_once 'Zend/Queue/Exception.php';
       
   161             throw new Zend_Queue_Exception('No bytes written');
       
   162         }
       
   163 
       
   164         return $this;
       
   165     }
       
   166 
       
   167     /**
       
   168      * Tests the socket to see if there is data for us
       
   169      *
       
   170      * @return boolean
       
   171      */
       
   172     public function canRead()
       
   173     {
       
   174         $read   = array($this->_socket);
       
   175         $write  = null;
       
   176         $except = null;
       
   177 
       
   178         return stream_select(
       
   179             $read,
       
   180             $write,
       
   181             $except,
       
   182             $this->_options['timeout_sec'],
       
   183             $this->_options['timeout_usec']
       
   184         ) == 1;
       
   185         // see http://us.php.net/manual/en/function.stream-select.php
       
   186     }
       
   187 
       
   188     /**
       
   189      * Reads in a frame from the socket or returns false.
       
   190      *
       
   191      * @return Zend_Queue_Stomp_FrameInterface|false
       
   192      * @throws Zend_Queue_Exception
       
   193      */
       
   194     public function read()
       
   195     {
       
   196         $this->ping();
       
   197 
       
   198         $response = '';
       
   199         $prev     = '';
       
   200 
       
   201         // while not end of file.
       
   202         while (!feof($this->_socket)) {
       
   203             // read in one character until "\0\n" is found
       
   204             $data = fread($this->_socket, 1);
       
   205 
       
   206             // check to make sure that the connection is not lost.
       
   207             if ($data === false) {
       
   208                 require_once 'Zend/Queue/Exception.php';
       
   209                 throw new Zend_Queue_Exception('Connection lost');
       
   210             }
       
   211 
       
   212             // append last character read to $response
       
   213             $response .= $data;
       
   214 
       
   215             // is this \0 (prev) \n (data)? END_OF_FRAME
       
   216             if (ord($data) == 10 && ord($prev) == 0) {
       
   217                 break;
       
   218             }
       
   219             $prev = $data;
       
   220         }
       
   221 
       
   222         if ($response === '') {
       
   223             return false;
       
   224         }
       
   225 
       
   226         $frame = $this->createFrame();
       
   227         $frame->fromFrame($response);
       
   228         return $frame;
       
   229     }
       
   230 
       
   231     /**
       
   232      * Set the frameClass to be used
       
   233      *
       
   234      * This must be a Zend_Queue_Stomp_FrameInterface.
       
   235      *
       
   236      * @param  string $classname - class is an instance of Zend_Queue_Stomp_FrameInterface
       
   237      * @return $this;
       
   238      */
       
   239     public function setFrameClass($classname)
       
   240     {
       
   241         $this->_options['frameClass'] = $classname;
       
   242         return $this;
       
   243     }
       
   244 
       
   245     /**
       
   246      * Get the frameClass
       
   247      *
       
   248      * @return string
       
   249      */
       
   250     public function getFrameClass()
       
   251     {
       
   252         return isset($this->_options['frameClass'])
       
   253             ? $this->_options['frameClass']
       
   254             : 'Zend_Queue_Stomp_Frame';
       
   255     }
       
   256 
       
   257     /**
       
   258      * Create an empty frame
       
   259      *
       
   260      * @return Zend_Queue_Stomp_FrameInterface
       
   261      */
       
   262     public function createFrame()
       
   263     {
       
   264         $class = $this->getFrameClass();
       
   265 
       
   266         if (!class_exists($class)) {
       
   267             require_once 'Zend/Loader.php';
       
   268             Zend_Loader::loadClass($class);
       
   269         }
       
   270 
       
   271         $frame = new $class();
       
   272 
       
   273         if (!$frame instanceof Zend_Queue_Stomp_FrameInterface) {
       
   274             require_once 'Zend/Queue/Exception.php';
       
   275             throw new Zend_Queue_Exception('Invalid Frame class provided; must implement Zend_Queue_Stomp_FrameInterface');
       
   276         }
       
   277 
       
   278         return $frame;
       
   279     }
       
   280 }