|
1 <?php |
|
2 |
|
3 /* |
|
4 * This file is part of SwiftMailer. |
|
5 * (c) 2004-2009 Chris Corbyn |
|
6 * |
|
7 * For the full copyright and license information, please view the LICENSE |
|
8 * file that was distributed with this source code. |
|
9 */ |
|
10 |
|
11 |
|
12 /** |
|
13 * A generic IoBuffer implementation supporting remote sockets and local processes. |
|
14 * @package Swift |
|
15 * @subpackage Transport |
|
16 * @author Chris Corbyn |
|
17 */ |
|
18 class Swift_Transport_StreamBuffer |
|
19 extends Swift_ByteStream_AbstractFilterableInputStream |
|
20 implements Swift_Transport_IoBuffer |
|
21 { |
|
22 |
|
23 /** A primary socket */ |
|
24 private $_stream; |
|
25 |
|
26 /** The input stream */ |
|
27 private $_in; |
|
28 |
|
29 /** The output stream */ |
|
30 private $_out; |
|
31 |
|
32 /** Buffer initialization parameters */ |
|
33 private $_params = array(); |
|
34 |
|
35 /** The ReplacementFilterFactory */ |
|
36 private $_replacementFactory; |
|
37 |
|
38 /** Translations performed on data being streamed into the buffer */ |
|
39 private $_translations = array(); |
|
40 |
|
41 /** |
|
42 * Create a new StreamBuffer using $replacementFactory for transformations. |
|
43 * @param Swift_ReplacementFilterFactory $replacementFactory |
|
44 */ |
|
45 public function __construct( |
|
46 Swift_ReplacementFilterFactory $replacementFactory) |
|
47 { |
|
48 $this->_replacementFactory = $replacementFactory; |
|
49 } |
|
50 |
|
51 /** |
|
52 * Perform any initialization needed, using the given $params. |
|
53 * Parameters will vary depending upon the type of IoBuffer used. |
|
54 * @param array $params |
|
55 */ |
|
56 public function initialize(array $params) |
|
57 { |
|
58 $this->_params = $params; |
|
59 switch ($params['type']) |
|
60 { |
|
61 case self::TYPE_PROCESS: |
|
62 $this->_establishProcessConnection(); |
|
63 break; |
|
64 case self::TYPE_SOCKET: |
|
65 default: |
|
66 $this->_establishSocketConnection(); |
|
67 break; |
|
68 } |
|
69 } |
|
70 |
|
71 /** |
|
72 * Set an individual param on the buffer (e.g. switching to SSL). |
|
73 * @param string $param |
|
74 * @param mixed $value |
|
75 */ |
|
76 public function setParam($param, $value) |
|
77 { |
|
78 if (isset($this->_stream)) |
|
79 { |
|
80 switch ($param) |
|
81 { |
|
82 case 'protocol': |
|
83 if (!array_key_exists('protocol', $this->_params) |
|
84 || $value != $this->_params['protocol']) |
|
85 { |
|
86 if ('tls' == $value) |
|
87 { |
|
88 stream_socket_enable_crypto( |
|
89 $this->_stream, true, STREAM_CRYPTO_METHOD_TLS_CLIENT |
|
90 ); |
|
91 } |
|
92 } |
|
93 break; |
|
94 |
|
95 case 'timeout': |
|
96 if ($this->_stream) |
|
97 { |
|
98 stream_set_timeout($this->_stream, $value); |
|
99 } |
|
100 break; |
|
101 |
|
102 case 'blocking': |
|
103 if ($this->_stream) |
|
104 { |
|
105 stream_set_blocking($this->_stream, 1); |
|
106 } |
|
107 |
|
108 } |
|
109 } |
|
110 $this->_params[$param] = $value; |
|
111 } |
|
112 |
|
113 /** |
|
114 * Perform any shutdown logic needed. |
|
115 */ |
|
116 public function terminate() |
|
117 { |
|
118 if (isset($this->_stream)) |
|
119 { |
|
120 switch ($this->_params['type']) |
|
121 { |
|
122 case self::TYPE_PROCESS: |
|
123 fclose($this->_in); |
|
124 fclose($this->_out); |
|
125 proc_close($this->_stream); |
|
126 break; |
|
127 case self::TYPE_SOCKET: |
|
128 default: |
|
129 fclose($this->_stream); |
|
130 break; |
|
131 } |
|
132 } |
|
133 $this->_stream = null; |
|
134 $this->_out = null; |
|
135 $this->_in = null; |
|
136 } |
|
137 |
|
138 /** |
|
139 * Set an array of string replacements which should be made on data written |
|
140 * to the buffer. This could replace LF with CRLF for example. |
|
141 * @param string[] $replacements |
|
142 */ |
|
143 public function setWriteTranslations(array $replacements) |
|
144 { |
|
145 foreach ($this->_translations as $search => $replace) |
|
146 { |
|
147 if (!isset($replacements[$search])) |
|
148 { |
|
149 $this->removeFilter($search); |
|
150 unset($this->_translations[$search]); |
|
151 } |
|
152 } |
|
153 |
|
154 foreach ($replacements as $search => $replace) |
|
155 { |
|
156 if (!isset($this->_translations[$search])) |
|
157 { |
|
158 $this->addFilter( |
|
159 $this->_replacementFactory->createFilter($search, $replace), $search |
|
160 ); |
|
161 $this->_translations[$search] = true; |
|
162 } |
|
163 } |
|
164 } |
|
165 |
|
166 /** |
|
167 * Get a line of output (including any CRLF). |
|
168 * The $sequence number comes from any writes and may or may not be used |
|
169 * depending upon the implementation. |
|
170 * @param int $sequence of last write to scan from |
|
171 * @return string |
|
172 */ |
|
173 public function readLine($sequence) |
|
174 { |
|
175 if (isset($this->_out) && !feof($this->_out)) |
|
176 { |
|
177 $line = fgets($this->_out); |
|
178 if (strlen($line)==0) |
|
179 { |
|
180 $metas = stream_get_meta_data($this->_out); |
|
181 if ($metas['timed_out']) { |
|
182 throw new Swift_IoException( |
|
183 'Connection to ' . |
|
184 $this->_getReadConnectionDescription() . |
|
185 ' Timed Out' |
|
186 ); |
|
187 } |
|
188 } |
|
189 return $line; |
|
190 } |
|
191 } |
|
192 |
|
193 /** |
|
194 * Reads $length bytes from the stream into a string and moves the pointer |
|
195 * through the stream by $length. If less bytes exist than are requested the |
|
196 * remaining bytes are given instead. If no bytes are remaining at all, boolean |
|
197 * false is returned. |
|
198 * @param int $length |
|
199 * @return string |
|
200 */ |
|
201 public function read($length) |
|
202 { |
|
203 if (isset($this->_out) && !feof($this->_out)) |
|
204 { |
|
205 $ret = fread($this->_out, $length); |
|
206 if (strlen($ret)==0) |
|
207 { |
|
208 $metas = stream_get_meta_data($this->_out); |
|
209 if ($metas['timed_out']) |
|
210 { |
|
211 throw new Swift_IoException( |
|
212 'Connection to ' . |
|
213 $this->_getReadConnectionDescription() . |
|
214 ' Timed Out' |
|
215 ); |
|
216 } |
|
217 } |
|
218 return $ret; |
|
219 } |
|
220 } |
|
221 |
|
222 /** Not implemented */ |
|
223 public function setReadPointer($byteOffset) |
|
224 { |
|
225 } |
|
226 |
|
227 // -- Protected methods |
|
228 |
|
229 /** Flush the stream contents */ |
|
230 protected function _flush() |
|
231 { |
|
232 if (isset($this->_in)) |
|
233 { |
|
234 fflush($this->_in); |
|
235 } |
|
236 } |
|
237 |
|
238 /** Write this bytes to the stream */ |
|
239 protected function _commit($bytes) |
|
240 { |
|
241 if (isset($this->_in) |
|
242 && fwrite($this->_in, $bytes)) |
|
243 { |
|
244 return ++$this->_sequence; |
|
245 } |
|
246 } |
|
247 |
|
248 // -- Private methods |
|
249 |
|
250 /** |
|
251 * Establishes a connection to a remote server. |
|
252 * @access private |
|
253 */ |
|
254 private function _establishSocketConnection() |
|
255 { |
|
256 $host = $this->_params['host']; |
|
257 if (!empty($this->_params['protocol'])) |
|
258 { |
|
259 $host = $this->_params['protocol'] . '://' . $host; |
|
260 } |
|
261 $timeout = 15; |
|
262 if (!empty($this->_params['timeout'])) |
|
263 { |
|
264 $timeout = $this->_params['timeout']; |
|
265 } |
|
266 $options = array(); |
|
267 if (!empty($this->_params['sourceIp'])) |
|
268 { |
|
269 $options['socket']['bindto']=$this->_params['sourceIp'].':0'; |
|
270 } |
|
271 if (!$this->_stream = stream_socket_client($host.':'.$this->_params['port'], $errno, $errstr, $timeout, STREAM_CLIENT_CONNECT, stream_context_create($options))) |
|
272 { |
|
273 throw new Swift_TransportException( |
|
274 'Connection could not be established with host ' . $this->_params['host'] . |
|
275 ' [' . $errstr . ' #' . $errno . ']' |
|
276 ); |
|
277 } |
|
278 if (!empty($this->_params['blocking'])) |
|
279 { |
|
280 stream_set_blocking($this->_stream, 1); |
|
281 } |
|
282 else |
|
283 { |
|
284 stream_set_blocking($this->_stream, 0); |
|
285 } |
|
286 stream_set_timeout($this->_stream, $timeout); |
|
287 $this->_in =& $this->_stream; |
|
288 $this->_out =& $this->_stream; |
|
289 } |
|
290 |
|
291 /** |
|
292 * Opens a process for input/output. |
|
293 * @access private |
|
294 */ |
|
295 private function _establishProcessConnection() |
|
296 { |
|
297 $command = $this->_params['command']; |
|
298 $descriptorSpec = array( |
|
299 0 => array('pipe', 'r'), |
|
300 1 => array('pipe', 'w'), |
|
301 2 => array('pipe', 'w') |
|
302 ); |
|
303 $this->_stream = proc_open($command, $descriptorSpec, $pipes); |
|
304 stream_set_blocking($pipes[2], 0); |
|
305 if ($err = stream_get_contents($pipes[2])) |
|
306 { |
|
307 throw new Swift_TransportException( |
|
308 'Process could not be started [' . $err . ']' |
|
309 ); |
|
310 } |
|
311 $this->_in =& $pipes[0]; |
|
312 $this->_out =& $pipes[1]; |
|
313 } |
|
314 |
|
315 |
|
316 private function _getReadConnectionDescription() |
|
317 { |
|
318 switch ($this->_params['type']) |
|
319 { |
|
320 case self::TYPE_PROCESS: |
|
321 return 'Process '.$this->_params['command']; |
|
322 break; |
|
323 |
|
324 case self::TYPE_SOCKET: |
|
325 default: |
|
326 $host = $this->_params['host']; |
|
327 if (!empty($this->_params['protocol'])) |
|
328 { |
|
329 $host = $this->_params['protocol'] . '://' . $host; |
|
330 } |
|
331 $host.=':'.$this->_params['port']; |
|
332 return $host; |
|
333 break; |
|
334 } |
|
335 } |
|
336 } |