|
1 """ Patch utility to apply unified diffs |
|
2 |
|
3 Brute-force line-by-line non-recursive parsing |
|
4 |
|
5 Copyright (c) 2008-2010 anatoly techtonik |
|
6 Available under the terms of MIT license |
|
7 |
|
8 Project home: http://code.google.com/p/python-patch/ |
|
9 |
|
10 |
|
11 $Id: patch.py 76 2010-04-08 19:10:21Z techtonik $ |
|
12 $HeadURL: https://python-patch.googlecode.com/svn/trunk/patch.py $ |
|
13 """ |
|
14 |
|
15 __author__ = "techtonik.rainforce.org" |
|
16 __version__ = "10.04" |
|
17 |
|
18 import copy |
|
19 import logging |
|
20 import re |
|
21 # cStringIO doesn't support unicode in 2.5 |
|
22 from StringIO import StringIO |
|
23 from logging import debug, info, warning |
|
24 |
|
25 from os.path import exists, isfile, abspath |
|
26 from os import unlink |
|
27 |
|
28 |
|
29 #------------------------------------------------ |
|
30 # Logging is controlled by "python_patch" logger |
|
31 |
|
32 debugmode = False |
|
33 |
|
34 logger = logging.getLogger("python_patch") |
|
35 loghandler = logging.StreamHandler() |
|
36 logger.addHandler(loghandler) |
|
37 |
|
38 debug = logger.debug |
|
39 info = logger.info |
|
40 warning = logger.warning |
|
41 |
|
42 #: disable library logging by default |
|
43 logger.setLevel(logging.CRITICAL) |
|
44 |
|
45 #------------------------------------------------ |
|
46 |
|
47 |
|
48 def fromfile(filename): |
|
49 """ Parse patch file and return Patch() object |
|
50 """ |
|
51 |
|
52 info("reading patch from file %s" % filename) |
|
53 fp = open(filename, "rb") |
|
54 patch = Patch(fp) |
|
55 fp.close() |
|
56 return patch |
|
57 |
|
58 |
|
59 def fromstring(s): |
|
60 """ Parse text string and return Patch() object |
|
61 """ |
|
62 |
|
63 return Patch( |
|
64 StringIO.StringIO(s) |
|
65 ) |
|
66 |
|
67 |
|
68 |
|
69 class HunkInfo(object): |
|
70 """ Parsed hunk data container (hunk starts with @@ -R +R @@) """ |
|
71 |
|
72 def __init__(self): |
|
73 self.startsrc=None #: line count starts with 1 |
|
74 self.linessrc=None |
|
75 self.starttgt=None |
|
76 self.linestgt=None |
|
77 self.invalid=False |
|
78 self.text=[] |
|
79 |
|
80 def copy(self): |
|
81 return copy.copy(self) |
|
82 |
|
83 # def apply(self, estream): |
|
84 # """ write hunk data into enumerable stream |
|
85 # return strings one by one until hunk is |
|
86 # over |
|
87 # |
|
88 # enumerable stream are tuples (lineno, line) |
|
89 # where lineno starts with 0 |
|
90 # """ |
|
91 # pass |
|
92 |
|
93 |
|
94 |
|
95 class Patch(object): |
|
96 |
|
97 def __init__(self, stream=None): |
|
98 |
|
99 # define Patch data members |
|
100 # table with a row for every source file |
|
101 |
|
102 #: list of source filenames |
|
103 self.source=None |
|
104 self.target=None |
|
105 #: list of lists of hunks |
|
106 self.hunks=None |
|
107 #: file endings statistics for every hunk |
|
108 self.hunkends=None |
|
109 |
|
110 if stream: |
|
111 self.parse(stream) |
|
112 |
|
113 def copy(self): |
|
114 return copy.copy(self) |
|
115 |
|
116 def parse(self, stream): |
|
117 """ parse unified diff """ |
|
118 self.source = [] |
|
119 self.target = [] |
|
120 self.hunks = [] |
|
121 self.hunkends = [] |
|
122 |
|
123 # define possible file regions that will direct the parser flow |
|
124 header = False # comments before the patch body |
|
125 filenames = False # lines starting with --- and +++ |
|
126 |
|
127 hunkhead = False # @@ -R +R @@ sequence |
|
128 hunkbody = False # |
|
129 hunkskip = False # skipping invalid hunk mode |
|
130 |
|
131 header = True |
|
132 lineends = dict(lf=0, crlf=0, cr=0) |
|
133 nextfileno = 0 |
|
134 nexthunkno = 0 #: even if index starts with 0 user messages number hunks from 1 |
|
135 |
|
136 # hunkinfo holds parsed values, hunkactual - calculated |
|
137 hunkinfo = HunkInfo() |
|
138 hunkactual = dict(linessrc=None, linestgt=None) |
|
139 |
|
140 fe = enumerate(stream) |
|
141 for lineno, line in fe: |
|
142 |
|
143 # analyze state |
|
144 if header and line.startswith("--- "): |
|
145 header = False |
|
146 # switch to filenames state |
|
147 filenames = True |
|
148 #: skip hunkskip and hunkbody code until you read definition of hunkhead |
|
149 if hunkbody: |
|
150 # process line first |
|
151 if re.match(r"^[- \+\\]", line): |
|
152 # gather stats about line endings |
|
153 if line.endswith("\r\n"): |
|
154 self.hunkends[nextfileno-1]["crlf"] += 1 |
|
155 elif line.endswith("\n"): |
|
156 self.hunkends[nextfileno-1]["lf"] += 1 |
|
157 elif line.endswith("\r"): |
|
158 self.hunkends[nextfileno-1]["cr"] += 1 |
|
159 |
|
160 if line.startswith("-"): |
|
161 hunkactual["linessrc"] += 1 |
|
162 elif line.startswith("+"): |
|
163 hunkactual["linestgt"] += 1 |
|
164 elif not line.startswith("\\"): |
|
165 hunkactual["linessrc"] += 1 |
|
166 hunkactual["linestgt"] += 1 |
|
167 hunkinfo.text.append(line) |
|
168 # todo: handle \ No newline cases |
|
169 else: |
|
170 warning("invalid hunk no.%d at %d for target file %s" % (nexthunkno, lineno+1, self.target[nextfileno-1])) |
|
171 # add hunk status node |
|
172 self.hunks[nextfileno-1].append(hunkinfo.copy()) |
|
173 self.hunks[nextfileno-1][nexthunkno-1]["invalid"] = True |
|
174 # switch to hunkskip state |
|
175 hunkbody = False |
|
176 hunkskip = True |
|
177 |
|
178 # check exit conditions |
|
179 if hunkactual["linessrc"] > hunkinfo.linessrc or hunkactual["linestgt"] > hunkinfo.linestgt: |
|
180 warning("extra hunk no.%d lines at %d for target %s" % (nexthunkno, lineno+1, self.target[nextfileno-1])) |
|
181 # add hunk status node |
|
182 self.hunks[nextfileno-1].append(hunkinfo.copy()) |
|
183 self.hunks[nextfileno-1][nexthunkno-1]["invalid"] = True |
|
184 # switch to hunkskip state |
|
185 hunkbody = False |
|
186 hunkskip = True |
|
187 elif hunkinfo.linessrc == hunkactual["linessrc"] and hunkinfo.linestgt == hunkactual["linestgt"]: |
|
188 self.hunks[nextfileno-1].append(hunkinfo.copy()) |
|
189 # switch to hunkskip state |
|
190 hunkbody = False |
|
191 hunkskip = True |
|
192 |
|
193 # detect mixed window/unix line ends |
|
194 ends = self.hunkends[nextfileno-1] |
|
195 if ((ends["cr"]!=0) + (ends["crlf"]!=0) + (ends["lf"]!=0)) > 1: |
|
196 warning("inconsistent line ends in patch hunks for %s" % self.source[nextfileno-1]) |
|
197 if debugmode: |
|
198 debuglines = dict(ends) |
|
199 debuglines.update(file=self.target[nextfileno-1], hunk=nexthunkno) |
|
200 debug("crlf: %(crlf)d lf: %(lf)d cr: %(cr)d\t - file: %(file)s hunk: %(hunk)d" % debuglines) |
|
201 |
|
202 if hunkskip: |
|
203 match = re.match("^@@ -(\d+)(,(\d+))? \+(\d+)(,(\d+))?", line) |
|
204 if match: |
|
205 # switch to hunkhead state |
|
206 hunkskip = False |
|
207 hunkhead = True |
|
208 elif line.startswith("--- "): |
|
209 # switch to filenames state |
|
210 hunkskip = False |
|
211 filenames = True |
|
212 if debugmode and len(self.source) > 0: |
|
213 debug("- %2d hunks for %s" % (len(self.hunks[nextfileno-1]), self.source[nextfileno-1])) |
|
214 |
|
215 if filenames: |
|
216 if line.startswith("--- "): |
|
217 if nextfileno in self.source: |
|
218 warning("skipping invalid patch for %s" % self.source[nextfileno]) |
|
219 del self.source[nextfileno] |
|
220 # double source filename line is encountered |
|
221 # attempt to restart from this second line |
|
222 re_filename = "^--- ([^\t]+)" |
|
223 match = re.match(re_filename, line) |
|
224 # todo: support spaces in filenames |
|
225 if match: |
|
226 self.source.append(match.group(1).strip()) |
|
227 else: |
|
228 warning("skipping invalid filename at line %d" % lineno) |
|
229 # switch back to header state |
|
230 filenames = False |
|
231 header = True |
|
232 elif not line.startswith("+++ "): |
|
233 if nextfileno in self.source: |
|
234 warning("skipping invalid patch with no target for %s" % self.source[nextfileno]) |
|
235 del self.source[nextfileno] |
|
236 else: |
|
237 # this should be unreachable |
|
238 warning("skipping invalid target patch") |
|
239 filenames = False |
|
240 header = True |
|
241 else: |
|
242 if nextfileno in self.target: |
|
243 warning("skipping invalid patch - double target at line %d" % lineno) |
|
244 del self.source[nextfileno] |
|
245 del self.target[nextfileno] |
|
246 nextfileno -= 1 |
|
247 # double target filename line is encountered |
|
248 # switch back to header state |
|
249 filenames = False |
|
250 header = True |
|
251 else: |
|
252 re_filename = "^\+\+\+ ([^\t]+)" |
|
253 match = re.match(re_filename, line) |
|
254 if not match: |
|
255 warning("skipping invalid patch - no target filename at line %d" % lineno) |
|
256 # switch back to header state |
|
257 filenames = False |
|
258 header = True |
|
259 else: |
|
260 self.target.append(match.group(1).strip()) |
|
261 nextfileno += 1 |
|
262 # switch to hunkhead state |
|
263 filenames = False |
|
264 hunkhead = True |
|
265 nexthunkno = 0 |
|
266 self.hunks.append([]) |
|
267 self.hunkends.append(lineends.copy()) |
|
268 continue |
|
269 |
|
270 if hunkhead: |
|
271 match = re.match("^@@ -(\d+)(,(\d+))? \+(\d+)(,(\d+))?", line) |
|
272 if not match: |
|
273 if nextfileno-1 not in self.hunks: |
|
274 warning("skipping invalid patch with no hunks for file %s" % self.target[nextfileno-1]) |
|
275 # switch to header state |
|
276 hunkhead = False |
|
277 header = True |
|
278 continue |
|
279 else: |
|
280 # switch to header state |
|
281 hunkhead = False |
|
282 header = True |
|
283 else: |
|
284 hunkinfo.startsrc = int(match.group(1)) |
|
285 hunkinfo.linessrc = 1 |
|
286 if match.group(3): hunkinfo.linessrc = int(match.group(3)) |
|
287 hunkinfo.starttgt = int(match.group(4)) |
|
288 hunkinfo.linestgt = 1 |
|
289 if match.group(6): hunkinfo.linestgt = int(match.group(6)) |
|
290 hunkinfo.invalid = False |
|
291 hunkinfo.text = [] |
|
292 |
|
293 hunkactual["linessrc"] = hunkactual["linestgt"] = 0 |
|
294 |
|
295 # switch to hunkbody state |
|
296 hunkhead = False |
|
297 hunkbody = True |
|
298 nexthunkno += 1 |
|
299 continue |
|
300 else: |
|
301 if not hunkskip: |
|
302 warning("patch file incomplete - %s" % filename) |
|
303 # sys.exit(?) |
|
304 else: |
|
305 # duplicated message when an eof is reached |
|
306 if debugmode and len(self.source) > 0: |
|
307 debug("- %2d hunks for %s" % (len(self.hunks[nextfileno-1]), self.source[nextfileno-1])) |
|
308 |
|
309 info("total files: %d total hunks: %d" % (len(self.source), sum(len(hset) for hset in self.hunks))) |
|
310 |
|
311 |
|
312 def apply(self): |
|
313 """ apply parsed patch """ |
|
314 |
|
315 total = len(self.source) |
|
316 for fileno, filename in enumerate(self.source): |
|
317 |
|
318 f2patch = filename |
|
319 if not exists(f2patch): |
|
320 f2patch = self.target[fileno] |
|
321 if not exists(f2patch): |
|
322 warning("source/target file does not exist\n--- %s\n+++ %s" % (filename, f2patch)) |
|
323 continue |
|
324 if not isfile(f2patch): |
|
325 warning("not a file - %s" % f2patch) |
|
326 continue |
|
327 filename = f2patch |
|
328 |
|
329 info("processing %d/%d:\t %s" % (fileno+1, total, filename)) |
|
330 |
|
331 # validate before patching |
|
332 f2fp = open(filename) |
|
333 hunkno = 0 |
|
334 hunk = self.hunks[fileno][hunkno] |
|
335 hunkfind = [] |
|
336 hunkreplace = [] |
|
337 validhunks = 0 |
|
338 canpatch = False |
|
339 for lineno, line in enumerate(f2fp): |
|
340 if lineno+1 < hunk.startsrc: |
|
341 continue |
|
342 elif lineno+1 == hunk.startsrc: |
|
343 hunkfind = [x[1:].rstrip("\r\n") for x in hunk.text if x[0] in " -"] |
|
344 hunkreplace = [x[1:].rstrip("\r\n") for x in hunk.text if x[0] in " +"] |
|
345 #pprint(hunkreplace) |
|
346 hunklineno = 0 |
|
347 |
|
348 # todo \ No newline at end of file |
|
349 |
|
350 # check hunks in source file |
|
351 if lineno+1 < hunk.startsrc+len(hunkfind)-1: |
|
352 if line.rstrip("\r\n") == hunkfind[hunklineno]: |
|
353 hunklineno+=1 |
|
354 else: |
|
355 debug("hunk no.%d doesn't match source file %s" % (hunkno+1, filename)) |
|
356 # file may be already patched, but we will check other hunks anyway |
|
357 hunkno += 1 |
|
358 if hunkno < len(self.hunks[fileno]): |
|
359 hunk = self.hunks[fileno][hunkno] |
|
360 continue |
|
361 else: |
|
362 break |
|
363 |
|
364 # check if processed line is the last line |
|
365 if lineno+1 == hunk.startsrc+len(hunkfind)-1: |
|
366 debug("file %s hunk no.%d -- is ready to be patched" % (filename, hunkno+1)) |
|
367 hunkno+=1 |
|
368 validhunks+=1 |
|
369 if hunkno < len(self.hunks[fileno]): |
|
370 hunk = self.hunks[fileno][hunkno] |
|
371 else: |
|
372 if validhunks == len(self.hunks[fileno]): |
|
373 # patch file |
|
374 canpatch = True |
|
375 break |
|
376 else: |
|
377 if hunkno < len(self.hunks[fileno]): |
|
378 warning("premature end of source file %s at hunk %d" % (filename, hunkno+1)) |
|
379 |
|
380 f2fp.close() |
|
381 |
|
382 if validhunks < len(self.hunks[fileno]): |
|
383 if self._match_file_hunks(filename, self.hunks[fileno]): |
|
384 warning("already patched %s" % filename) |
|
385 else: |
|
386 warning("source file is different - %s" % filename) |
|
387 if canpatch: |
|
388 backupname = filename+".orig" |
|
389 if exists(backupname): |
|
390 warning("can't backup original file to %s - aborting" % backupname) |
|
391 else: |
|
392 import shutil |
|
393 shutil.move(filename, backupname) |
|
394 if self.write_hunks(backupname, filename, self.hunks[fileno]): |
|
395 warning("successfully patched %s" % filename) |
|
396 unlink(backupname) |
|
397 else: |
|
398 warning("error patching file %s" % filename) |
|
399 shutil.copy(filename, filename+".invalid") |
|
400 warning("invalid version is saved to %s" % filename+".invalid") |
|
401 # todo: proper rejects |
|
402 shutil.move(backupname, filename) |
|
403 |
|
404 # todo: check for premature eof |
|
405 |
|
406 |
|
407 def can_patch(self, filename): |
|
408 """ Check if specified filename can be patched. Returns None if file can |
|
409 not be found among source filenames. False if patch can not be applied |
|
410 clearly. True otherwise. |
|
411 |
|
412 :returns: True, False or None |
|
413 """ |
|
414 idx = self._get_file_idx(filename, source=True) |
|
415 if idx == None: |
|
416 return None |
|
417 return self._match_file_hunks(filename, self.hunks[idx]) |
|
418 |
|
419 |
|
420 def _match_file_hunks(self, filepath, hunks): |
|
421 matched = True |
|
422 fp = open(abspath(filepath)) |
|
423 |
|
424 class NoMatch(Exception): |
|
425 pass |
|
426 |
|
427 lineno = 1 |
|
428 line = fp.readline() |
|
429 hno = None |
|
430 try: |
|
431 for hno, h in enumerate(hunks): |
|
432 # skip to first line of the hunk |
|
433 while lineno < h.starttgt: |
|
434 if not len(line): # eof |
|
435 debug("check failed - premature eof before hunk: %d" % (hno+1)) |
|
436 raise NoMatch |
|
437 line = fp.readline() |
|
438 lineno += 1 |
|
439 for hline in h.text: |
|
440 if hline.startswith("-"): |
|
441 continue |
|
442 if not len(line): |
|
443 debug("check failed - premature eof on hunk: %d" % (hno+1)) |
|
444 # todo: \ No newline at the end of file |
|
445 raise NoMatch |
|
446 if line.rstrip("\r\n") != hline[1:].rstrip("\r\n"): |
|
447 debug("file is not patched - failed hunk: %d" % (hno+1)) |
|
448 raise NoMatch |
|
449 line = fp.readline() |
|
450 lineno += 1 |
|
451 |
|
452 except NoMatch: |
|
453 matched = False |
|
454 # todo: display failed hunk, i.e. expected/found |
|
455 |
|
456 fp.close() |
|
457 return matched |
|
458 |
|
459 |
|
460 def patch_stream(self, instream, hunks): |
|
461 """ Generator that yields stream patched with hunks iterable |
|
462 |
|
463 Converts lineends in hunk lines to the best suitable format |
|
464 autodetected from input |
|
465 """ |
|
466 |
|
467 # todo: At the moment substituted lineends may not be the same |
|
468 # at the start and at the end of patching. Also issue a |
|
469 # warning/throw about mixed lineends (is it really needed?) |
|
470 |
|
471 hunks = iter(hunks) |
|
472 |
|
473 srclineno = 1 |
|
474 |
|
475 lineends = {'\n':0, '\r\n':0, '\r':0} |
|
476 def get_line(): |
|
477 """ |
|
478 local utility function - return line from source stream |
|
479 collecting line end statistics on the way |
|
480 """ |
|
481 line = instream.readline() |
|
482 # 'U' mode works only with text files |
|
483 if line.endswith("\r\n"): |
|
484 lineends["\r\n"] += 1 |
|
485 elif line.endswith("\n"): |
|
486 lineends["\n"] += 1 |
|
487 elif line.endswith("\r"): |
|
488 lineends["\r"] += 1 |
|
489 return line |
|
490 |
|
491 for hno, h in enumerate(hunks): |
|
492 debug("hunk %d" % (hno+1)) |
|
493 # skip to line just before hunk starts |
|
494 while srclineno < h.startsrc: |
|
495 yield get_line() |
|
496 srclineno += 1 |
|
497 |
|
498 for hline in h.text: |
|
499 # todo: check \ No newline at the end of file |
|
500 if hline.startswith("-") or hline.startswith("\\"): |
|
501 get_line() |
|
502 srclineno += 1 |
|
503 continue |
|
504 else: |
|
505 if not hline.startswith("+"): |
|
506 get_line() |
|
507 srclineno += 1 |
|
508 line2write = hline[1:] |
|
509 # detect if line ends are consistent in source file |
|
510 if sum([bool(lineends[x]) for x in lineends]) == 1: |
|
511 newline = [x for x in lineends if lineends[x] != 0][0] |
|
512 yield line2write.rstrip("\r\n")+newline |
|
513 else: # newlines are mixed |
|
514 yield line2write |
|
515 |
|
516 for line in instream: |
|
517 yield line |
|
518 |
|
519 |
|
520 def write_hunks(self, srcname, tgtname, hunks): |
|
521 src = open(srcname, "rb") |
|
522 tgt = open(tgtname, "wb") |
|
523 |
|
524 debug("processing target file %s" % tgtname) |
|
525 |
|
526 tgt.writelines(self.patch_stream(src, hunks)) |
|
527 |
|
528 tgt.close() |
|
529 src.close() |
|
530 return True |
|
531 |
|
532 |
|
533 def _get_file_idx(self, filename, source=None): |
|
534 """ Detect index of given filename within patch. |
|
535 |
|
536 :param filename: |
|
537 :param source: search filename among sources (True), |
|
538 targets (False), or both (None) |
|
539 :returns: int or None |
|
540 """ |
|
541 filename = abspath(filename) |
|
542 if source == True or source == None: |
|
543 for i,fnm in enumerate(self.source): |
|
544 if filename == abspath(fnm): |
|
545 return i |
|
546 if source == False or source == None: |
|
547 for i,fnm in enumerate(self.target): |
|
548 if filename == abspath(fnm): |
|
549 return i |
|
550 |
|
551 |
|
552 |
|
553 |
|
554 from optparse import OptionParser |
|
555 from os.path import exists |
|
556 import sys |
|
557 |
|
558 if __name__ == "__main__": |
|
559 opt = OptionParser(usage="%prog [options] unipatch-file", version="python-patch %s" % __version__) |
|
560 opt.add_option("--debug", action="store_true", dest="debugmode", help="debug mode") |
|
561 (options, args) = opt.parse_args() |
|
562 |
|
563 if not args: |
|
564 opt.print_version() |
|
565 opt.print_help() |
|
566 sys.exit() |
|
567 debugmode = options.debugmode |
|
568 patchfile = args[0] |
|
569 if not exists(patchfile) or not isfile(patchfile): |
|
570 sys.exit("patch file does not exist - %s" % patchfile) |
|
571 |
|
572 |
|
573 if debugmode: |
|
574 loglevel = logging.DEBUG |
|
575 logformat = "%(levelname)8s %(message)s" |
|
576 else: |
|
577 loglevel = logging.INFO |
|
578 logformat = "%(message)s" |
|
579 logger.setLevel(loglevel) |
|
580 loghandler.setFormatter(logging.Formatter(logformat)) |
|
581 |
|
582 |
|
583 |
|
584 patch = fromfile(patchfile) |
|
585 #pprint(patch) |
|
586 patch.apply() |
|
587 |
|
588 # todo: document and test line ends handling logic - patch.py detects proper line-endings |
|
589 # for inserted hunks and issues a warning if patched file has incosistent line ends |