vendor/swiftmailer/lib/classes/Swift/Transport/StreamBuffer.php
changeset 0 7f95f8617b0b
equal deleted inserted replaced
-1:000000000000 0:7f95f8617b0b
       
     1 <?php
       
     2 
       
     3 /*
       
     4  * This file is part of SwiftMailer.
       
     5  * (c) 2004-2009 Chris Corbyn
       
     6  *
       
     7  * For the full copyright and license information, please view the LICENSE
       
     8  * file that was distributed with this source code.
       
     9  */
       
    10 
       
    11 
       
    12 /**
       
    13  * A generic IoBuffer implementation supporting remote sockets and local processes.
       
    14  * @package Swift
       
    15  * @subpackage Transport
       
    16  * @author Chris Corbyn
       
    17  */
       
    18 class Swift_Transport_StreamBuffer
       
    19   extends Swift_ByteStream_AbstractFilterableInputStream
       
    20   implements Swift_Transport_IoBuffer
       
    21 {
       
    22   
       
    23   /** A primary socket */
       
    24   private $_stream;
       
    25   
       
    26   /** The input stream */
       
    27   private $_in;
       
    28   
       
    29   /** The output stream */
       
    30   private $_out;
       
    31   
       
    32   /** Buffer initialization parameters */
       
    33   private $_params = array();
       
    34   
       
    35   /** The ReplacementFilterFactory */
       
    36   private $_replacementFactory;
       
    37   
       
    38   /** Translations performed on data being streamed into the buffer */
       
    39   private $_translations = array();
       
    40   
       
    41   /**
       
    42    * Create a new StreamBuffer using $replacementFactory for transformations.
       
    43    * @param Swift_ReplacementFilterFactory $replacementFactory
       
    44    */
       
    45   public function __construct(
       
    46     Swift_ReplacementFilterFactory $replacementFactory)
       
    47   {
       
    48     $this->_replacementFactory = $replacementFactory;
       
    49   }
       
    50   
       
    51   /**
       
    52    * Perform any initialization needed, using the given $params.
       
    53    * Parameters will vary depending upon the type of IoBuffer used.
       
    54    * @param array $params
       
    55    */
       
    56   public function initialize(array $params)
       
    57   {
       
    58     $this->_params = $params;
       
    59     switch ($params['type'])
       
    60     {
       
    61       case self::TYPE_PROCESS:
       
    62         $this->_establishProcessConnection();
       
    63         break;
       
    64       case self::TYPE_SOCKET:
       
    65       default:
       
    66         $this->_establishSocketConnection();
       
    67         break;
       
    68     }
       
    69   }
       
    70   
       
    71   /**
       
    72    * Set an individual param on the buffer (e.g. switching to SSL).
       
    73    * @param string $param
       
    74    * @param mixed $value
       
    75    */
       
    76   public function setParam($param, $value)
       
    77   {
       
    78     if (isset($this->_stream))
       
    79     {
       
    80       switch ($param)
       
    81       {
       
    82         case 'protocol':
       
    83           if (!array_key_exists('protocol', $this->_params)
       
    84             || $value != $this->_params['protocol'])
       
    85           {
       
    86             if ('tls' == $value)
       
    87             {
       
    88               stream_socket_enable_crypto(
       
    89                 $this->_stream, true, STREAM_CRYPTO_METHOD_TLS_CLIENT
       
    90                 );
       
    91             }
       
    92           }
       
    93           break;
       
    94           
       
    95         case 'timeout':
       
    96           if ($this->_stream)
       
    97           {
       
    98             stream_set_timeout($this->_stream, $value);
       
    99           }
       
   100           break;
       
   101           
       
   102         case 'blocking':
       
   103           if ($this->_stream)
       
   104           {
       
   105             stream_set_blocking($this->_stream, 1);
       
   106           }
       
   107           
       
   108       }
       
   109     }
       
   110     $this->_params[$param] = $value;
       
   111   }
       
   112   
       
   113   /**
       
   114    * Perform any shutdown logic needed.
       
   115    */
       
   116   public function terminate()
       
   117   {
       
   118     if (isset($this->_stream))
       
   119     {
       
   120       switch ($this->_params['type'])
       
   121       {
       
   122         case self::TYPE_PROCESS:
       
   123           fclose($this->_in);
       
   124           fclose($this->_out);
       
   125           proc_close($this->_stream);
       
   126           break;
       
   127         case self::TYPE_SOCKET:
       
   128         default:
       
   129           fclose($this->_stream);
       
   130           break;
       
   131       }
       
   132     }
       
   133     $this->_stream = null;
       
   134     $this->_out = null;
       
   135     $this->_in = null;
       
   136   }
       
   137   
       
   138   /**
       
   139    * Set an array of string replacements which should be made on data written
       
   140    * to the buffer.  This could replace LF with CRLF for example.
       
   141    * @param string[] $replacements
       
   142    */
       
   143   public function setWriteTranslations(array $replacements)
       
   144   {
       
   145     foreach ($this->_translations as $search => $replace)
       
   146     {
       
   147       if (!isset($replacements[$search]))
       
   148       {
       
   149         $this->removeFilter($search);
       
   150         unset($this->_translations[$search]);
       
   151       }
       
   152     }
       
   153     
       
   154     foreach ($replacements as $search => $replace)
       
   155     {
       
   156       if (!isset($this->_translations[$search]))
       
   157       {
       
   158         $this->addFilter(
       
   159           $this->_replacementFactory->createFilter($search, $replace), $search
       
   160           );
       
   161         $this->_translations[$search] = true;
       
   162       }
       
   163     }
       
   164   }
       
   165   
       
   166   /**
       
   167    * Get a line of output (including any CRLF).
       
   168    * The $sequence number comes from any writes and may or may not be used
       
   169    * depending upon the implementation.
       
   170    * @param int $sequence of last write to scan from
       
   171    * @return string
       
   172    */
       
   173   public function readLine($sequence)
       
   174   {
       
   175     if (isset($this->_out) && !feof($this->_out))
       
   176     {
       
   177       $line = fgets($this->_out);
       
   178       if (strlen($line)==0) 
       
   179       {
       
   180         $metas = stream_get_meta_data($this->_out);
       
   181         if ($metas['timed_out']) {
       
   182           throw new Swift_IoException(
       
   183             'Connection to ' . 
       
   184               $this->_getReadConnectionDescription() . 
       
   185             ' Timed Out'
       
   186           );
       
   187         }
       
   188       }
       
   189       return $line;
       
   190     }
       
   191   }
       
   192   
       
   193   /**
       
   194    * Reads $length bytes from the stream into a string and moves the pointer
       
   195    * through the stream by $length. If less bytes exist than are requested the
       
   196    * remaining bytes are given instead. If no bytes are remaining at all, boolean
       
   197    * false is returned.
       
   198    * @param int $length
       
   199    * @return string
       
   200    */
       
   201   public function read($length)
       
   202   {
       
   203     if (isset($this->_out) && !feof($this->_out))
       
   204     {
       
   205       $ret = fread($this->_out, $length);
       
   206       if (strlen($ret)==0) 
       
   207       {
       
   208         $metas = stream_get_meta_data($this->_out);
       
   209         if ($metas['timed_out']) 
       
   210         {
       
   211           throw new Swift_IoException(
       
   212             'Connection to ' . 
       
   213               $this->_getReadConnectionDescription() . 
       
   214             ' Timed Out'
       
   215           );
       
   216         }
       
   217       }
       
   218       return $ret;
       
   219     }
       
   220   }
       
   221   
       
   222   /** Not implemented */
       
   223   public function setReadPointer($byteOffset)
       
   224   {
       
   225   }
       
   226   
       
   227   // -- Protected methods
       
   228   
       
   229   /** Flush the stream contents */
       
   230   protected function _flush()
       
   231   {
       
   232     if (isset($this->_in))
       
   233     {
       
   234       fflush($this->_in);
       
   235     }
       
   236   }
       
   237   
       
   238   /** Write this bytes to the stream */
       
   239   protected function _commit($bytes)
       
   240   {
       
   241     if (isset($this->_in)
       
   242       && fwrite($this->_in, $bytes))
       
   243     {
       
   244       return ++$this->_sequence;
       
   245     }
       
   246   }
       
   247   
       
   248   // -- Private methods
       
   249   
       
   250   /**
       
   251    * Establishes a connection to a remote server.
       
   252    * @access private
       
   253    */
       
   254   private function _establishSocketConnection()
       
   255   {
       
   256     $host = $this->_params['host'];
       
   257     if (!empty($this->_params['protocol']))
       
   258     {
       
   259       $host = $this->_params['protocol'] . '://' . $host;
       
   260     }
       
   261     $timeout = 15;
       
   262     if (!empty($this->_params['timeout']))
       
   263     {
       
   264       $timeout = $this->_params['timeout'];
       
   265     }
       
   266     $options = array();
       
   267     if (!empty($this->_params['sourceIp']))
       
   268     {
       
   269     	$options['socket']['bindto']=$this->_params['sourceIp'].':0';
       
   270     }
       
   271     if (!$this->_stream = stream_socket_client($host.':'.$this->_params['port'], $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, stream_context_create($options)))
       
   272     {
       
   273       throw new Swift_TransportException(
       
   274         'Connection could not be established with host ' . $this->_params['host'] .
       
   275         ' [' . $errstr . ' #' . $errno . ']'
       
   276         );
       
   277     }
       
   278     if (!empty($this->_params['blocking']))
       
   279     {
       
   280       stream_set_blocking($this->_stream, 1);
       
   281     }
       
   282     else
       
   283     {
       
   284       stream_set_blocking($this->_stream, 0);
       
   285     }
       
   286     stream_set_timeout($this->_stream, $timeout);
       
   287     $this->_in =& $this->_stream;
       
   288     $this->_out =& $this->_stream;
       
   289   }
       
   290   
       
   291   /**
       
   292    * Opens a process for input/output.
       
   293    * @access private
       
   294    */
       
   295   private function _establishProcessConnection()
       
   296   {
       
   297     $command = $this->_params['command'];
       
   298     $descriptorSpec = array(
       
   299       0 => array('pipe', 'r'),
       
   300       1 => array('pipe', 'w'),
       
   301       2 => array('pipe', 'w')
       
   302       );
       
   303     $this->_stream = proc_open($command, $descriptorSpec, $pipes);
       
   304     stream_set_blocking($pipes[2], 0);
       
   305     if ($err = stream_get_contents($pipes[2]))
       
   306     {
       
   307       throw new Swift_TransportException(
       
   308         'Process could not be started [' . $err . ']'
       
   309         );
       
   310     }
       
   311     $this->_in =& $pipes[0];
       
   312     $this->_out =& $pipes[1];
       
   313   }
       
   314   
       
   315   
       
   316   private function _getReadConnectionDescription()
       
   317   {
       
   318     switch ($this->_params['type'])
       
   319     {
       
   320       case self::TYPE_PROCESS:
       
   321         return 'Process '.$this->_params['command'];
       
   322         break;
       
   323         
       
   324       case self::TYPE_SOCKET:
       
   325       default:
       
   326         $host = $this->_params['host'];
       
   327         if (!empty($this->_params['protocol']))
       
   328         {
       
   329           $host = $this->_params['protocol'] . '://' . $host;
       
   330         }
       
   331         $host.=':'.$this->_params['port'];
       
   332         return $host;
       
   333         break;
       
   334     }
       
   335   }
       
   336 }