Python difflib 模块,unified_diff() 实例源码
我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用difflib.unified_diff()。
def unified_diff(filename, content2=None):
# type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]]
"""This function prints a unified diff of the contents of
filename and the standard input, when used from the command line
as follows:
echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt
We get this result:
---
+++
@@ -1 +1 @@
-123
+456
"""
use_stdin = content2 is None
if content2 is None:
# Read binary input stream
stdin = rawstream(sys.stdin)
econtent2 = bytestr(stdin.read())
else:
econtent2 = content2
exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='')
if use_stdin:
write('\n'.join(diff))
return exit_code, diff
def compute_unified_diff(filename, content2, **kwargs):
# type: (str, bytes, **Any) -> Tuple[int, Iterable[str]]
diff = () # type: Iterable[str]
exit_code = ERROR
kw = kwargs.copy()
if 'n' not in kwargs:
# zero context lines
kw['n'] = 0
try:
content1 = get_cached_file(filename)
if PY3:
c1 = unistr(content1)
c2 = unistr(content2)
else:
c1 = content1
c2 = content2
diff = difflib.unified_diff(c1.splitlines(True), c2.splitlines(True), **kw)
exit_code = OK
finally:
return exit_code, diff
# ---------------------------------------------------------------------
# Spare the user from specifying a formatter by finding a suitable one.
def openDiffInTab(viewHandle, edit, oldTextName, newTextName, oldText, newText):
diffs = difflib.unified_diff(oldText.splitlines(), newText.splitlines(), oldTextName, newTextName)
diffText = u"\n".join(line for line in diffs)
if diffText == "":
sublime.status_message("No changes between revisions.")
else:
scratch = viewHandle.window().new_file()
scratch.set_scratch(True)
scratch.set_name("{old} -> {new}".format(old = oldTextName, new = newTextName))
scratch.set_syntax_file("Packages/Diff/Diff.tmLanguage")
if (int(sublime.version()) >= 3000):
scratch.run_command("append", {"characters": diffText})
else:
scratch.insert(edit, 0, diffText)
def test_05_package_to_api2(self):
context = {"model": model,
"session": model.Session}
pkg = model.Session.query(model.Package).filter_by(name='annakarenina').first()
as_dict = pkg.as_dict(ref_package_by='id', ref_group_by='id')
dictize = package_to_api2(pkg, context)
as_dict_string = pformat(as_dict)
dictize_string = pformat(dictize)
print as_dict_string
print dictize_string
assert package_to_api2(pkg, context) == dictize, "\n".join(unified_diff(as_dict_string.split("\n"), dictize_string.split("\n")))
def test_06_package_to_api2_with_relationship(self):
context = {"model": model,
"session": model.Session}
pkg = model.Session.query(model.Package).filter_by(name='homer').one()
as_dict = pkg.as_dict(ref_package_by='id', ref_group_by='id')
as_dict['license_title'] = None
as_dict['num_tags'] = 0
as_dict['num_resources'] = 0
dictize = package_to_api2(pkg, context)
as_dict["relationships"].sort(key=lambda x:x.items())
dictize["relationships"].sort(key=lambda x:x.items())
# the is_dict method doesn't care about organizations
del dictize['organization']
as_dict_string = pformat(as_dict)
dictize_string = pformat(dictize)
print as_dict_string
print dictize_string
assert as_dict == dictize, "\n".join(unified_diff(as_dict_string.split("\n"), dictize_string.split("\n")))
def get_diff_po(po1_fn, po2_fn):
po1_lines = []
po2_lines = []
for entry in sorted(polib.pofile(po1_fn), key=lambda obj: obj.msgid):
po1_lines.append((
u'msgid {}\n\n'
u'msgstr {}\n\n'
).format(entry.msgid, entry.msgstr))
for entry in sorted(polib.pofile(po2_fn), key=lambda obj: obj.msgid):
po2_lines.append((
u'msgid {}\n\n'
u'msgstr {}\n\n'
).format(entry.msgid, entry.msgstr))
added = removed = 0
for diff_line in difflib.unified_diff(po1_lines, po2_lines):
if diff_line.startswith('+++ ') or diff_line.startswith('--- ') or diff_line.startswith('@@ '):
continue
if diff_line.startswith('+'):
added += 1
elif diff_line.startswith('-'):
removed += 1
return added + removed
def compare_configs(cfg1,cfg2):
d = difflib.unified_diff(cfg1, cfg2)
diffstr = ""
for line in d:
if line.find('Current configuration') == -1:
if line.find('Last configuration change') == -1:
if line.find('length 0') == -1:
if line.find('login authentication tacplus') == -1:
if (line.find("+++")==-1) and (line.find("---")==-1):
if (line.find("-!")==-1) and (line.find('+!')==-1):
if line.startswith('+'):
diffstr = diffstr + "\n" + line
elif line.startswith('-'):
diffstr = diffstr + "\n" + line
return diffstr
def _Diff(lhs, rhs):
"""Given two pathnames, compare two files. Raise if they differ."""
# Some people rely on being able to specify TEST_DIFF in the environment to
# have tests use their own diff wrapper for use when updating golden data.
external_diff = os.environ.get('TEST_DIFF')
if external_diff:
return _DiffViaExternalProgram(lhs, rhs, external_diff)
try:
with open(lhs, 'r') as lhs_f:
with open(rhs, 'r') as rhs_f:
diff_text = ''.join(
difflib.unified_diff(lhs_f.readlines(), rhs_f.readlines()))
if not diff_text:
return True
raise OutputDifferedError('\nComparing %s and %s\nTest output differed '
'from golden file:\n%s' % (lhs, rhs, diff_text))
except EnvironmentError as error:
# Unable to read the files.
raise DiffFailureError('\nComparing %s and %s\nFailure diffing test output '
'with golden file: %s\n' % (lhs, rhs, error))
def _Diff(lhs, rhs):
"""Given two pathnames, compare two files. Raise if they differ."""
# Some people rely on being able to specify TEST_DIFF in the environment to
# have tests use their own diff wrapper for use when updating golden data.
external_diff = os.environ.get('TEST_DIFF')
if external_diff:
return _DiffViaExternalProgram(lhs, rhs, external_diff)
try:
with open(lhs, 'r') as lhs_f:
with open(rhs, 'r') as rhs_f:
diff_text = ''.join(
difflib.unified_diff(lhs_f.readlines(), rhs_f.readlines()))
if not diff_text:
return True
raise OutputDifferedError('\nComparing %s and %s\nTest output differed '
'from golden file:\n%s' % (lhs, rhs, diff_text))
except EnvironmentError as error:
# Unable to read the files.
raise DiffFailureError('\nComparing %s and %s\nFailure diffing test output '
'with golden file: %s\n' % (lhs, rhs, error))
def get_diff_text(old, new, filename):
"""Return text of unified diff between old and new."""
newline = '\n'
diff = difflib.unified_diff(
old, new,
'original/' + filename,
'fixed/' + filename,
lineterm=newline)
text = ''
for line in diff:
text += line
# Work around missing newline (http://bugs.python.org/issue2142).
if text and not line.endswith(newline):
text += newline + r'\ No newline at end of file' + newline
return text
def text_diff(old_as_text, new_as_text): # {{{1
""" returns a unicode string containing a diff text showing
differences of the utf8 parameters """
old_as_text = smart_unicode( old_as_text ).splitlines()
new_as_text = smart_unicode( new_as_text ).splitlines()
text_diff = unified_diff(
old_as_text, new_as_text, n = 0, lineterm = "" )
# we now delete from the text diff all control lines
# TODO: when the description field of an event contains such lines,
# they will be deleted: avoid it.
text_diff = [line for line in text_diff if not
re.match(r"^---\s*$", line) and not
re.match(r"^\+\+\+\s*$", line) and not
re.match(r"^@@.*@@$", line)]
text_diff = u'\n'.join( text_diff )
return text_diff
def diff():
"""Show diff of files changed (between index and working copy)."""
changed, _, _ = get_status()
entries_by_path = {e.path: e for e in read_index()}
for i, path in enumerate(changed):
sha1 = entries_by_path[path].sha1.hex()
obj_type, data = read_object(sha1)
assert obj_type == 'blob'
index_lines = data.decode().splitlines()
working_lines = read_file(path).decode().splitlines()
diff_lines = difflib.unified_diff(
index_lines, working_lines,
'{} (index)'.format(path),
'{} (working copy)'.format(path),
lineterm='')
for line in diff_lines:
print(line)
if i < len(changed) - 1:
print('-' * 70)
def diff_with_destination(self, lines):
'''Check if destination exists, if it does, diff files.'''
if isfile(self.dest) and not self.opts['force_overwrite']:
tempfile = self.dest + ' NEW'
with open(self.dest) as f:
content = f.readlines()
# Perform diff
diff_lines = list()
for line in difflib.unified_diff(content, lines, fromfile=self.dest, tofile=tempfile, lineterm='\n'):
if line.startswith('-'):
color_line = self.color.colorize(line, 'red')
elif line.startswith('+'):
color_line = self.color.colorize(line, 'green')
else:
color_line = self.color.colorize(line, 'white')
diff_lines.append(line)
print(color_line)
return self.continue_prompt()
else:
return True
def diff(self):
"""
Yield diffs between each template's render and current file.
"""
for template, dest, result in self.render():
try:
with codecs.open(dest, 'r', 'utf-8') as f:
yield unified_diff(
f.readlines(),
result.splitlines(True),
fromfile=dest,
tofile='%s (rendered)' % dest)
except IOError:
yield [
"=== No destination file \"%s\" for comparison.\n"
% dest]
def get_diff_text(old, new, filename):
"""Return text of unified diff between old and new."""
newline = '\n'
diff = difflib.unified_diff(
old, new,
'original/' + filename,
'fixed/' + filename,
lineterm=newline)
text = ''
for line in diff:
text += line
# Work around missing newline (http://bugs.python.org/issue2142).
if text and not line.endswith(newline):
text += newline + r'\ No newline at end of file' + newline
return text
def file_diff(filename1, filename2, filtered_reader):
remove_absdir(filename1)
remove_absdir(filename2)
#
INPUT=open(filename1, 'r')
lines1 = list(filtered_reader(INPUT))
INPUT.close()
#
INPUT=open(filename2, 'r')
lines2 = list(filtered_reader(INPUT))
INPUT.close()
#
diff = list(difflib.unified_diff(lines2, lines1,
fromfile=filename2, tofile=filename1))
if diff:
make_diff_readable(diff)
raise Exception("ERROR: \n\n%s\n\n%s\n\n" % (lines1, lines2))
diff = '\n'.join(diff)
return diff
def run(self):
try:
manifest = yaml.safe_load(self.wd.read('manifest'))
except:
log.error("No snapshot found")
return (False, "no snapshot found")
fails = []
for filepath in manifest:
f = File(filepath)
s = File(manifest[filepath], parent=self.wd)
diff = list(difflib.unified_diff(f.content.splitlines(), s.content.splitlines()))
if len(diff):
fails.append(filepath)
log.info("Check for {} failed with diff:")
log.info("\n".join(diff))
if len(fails):
return (False, "the following files have changed: {}".format(', '.join(fails)))
else:
return (True, "no files have changed")
def compare_code_solutions(first_solution, second_solution):
current_code = first_solution.code.splitlines()
next_code = second_solution.code.splitlines()
diff_percentage, unified_diff = calculate_difference_percentage(current_code, next_code)
result = ""
if diff_percentage < MIN_ALLOWED_DIFFERENCE_PERCENTAGE:
result = f"""
Matching contents in solutions
{first_solution} from {first_solution.student.email} and
{second_solution} from {second_solution.student.email}
on task {first_solution.task.name}
--------------------------------------------
Differences: {diff_percentage}%\n
"""
for line in unified_diff:
result += line + '\n'
return result
def compare_file_solutions(first_solution, second_solution):
current_code = first_solution.file.read().decode('utf-8').splitlines()
next_code = second_solution.file.read().decode('utf-8').splitlines()
# Reset file pointers
first_solution.file.seek(0)
second_solution.file.seek(0)
diff_percentage, unified_diff = calculate_difference_percentage(current_code, next_code)
result = ""
if diff_percentage < MIN_ALLOWED_DIFFERENCE_PERCENTAGE:
result = f"""
Matching contents in files
{first_solution.file.name} from {first_solution.student.email} and
{second_solution.file.name} from {second_solution.student.email}
on Task: {first_solution.task.name}
Differences: {diff_percentage}%\n
"""
for line in unified_diff:
result += line + '\n'
return result
def compare(self, old: str, old_date: str, new: str, new_date: str,
ctx: common.Context, meta: dict) \
-> ty.Tuple[bool, ty.Optional[str], ty.Optional[dict]]:
# pylint: disable=invalid-sequence-index
old = old.replace(common.RECORD_SEPARATOR, '\n\n')
new = new.replace(common.RECORD_SEPARATOR, '\n\n')
old_lines = old.split('\n')
res = list(difflib.unified_diff(
old_lines, new.split('\n'),
fromfiledate=old_date, tofiledate=new_date,
lineterm='\n'))
changed_lines = sum(1 for line in res[2:]
if line and line[0] != ' ' and line[0] != '@')
if not _check_changes(ctx, changed_lines, len(old_lines),
self.conf.get("changes_threshold"),
self.conf.get("min_changed")):
return False, None, None
return True, "\n".join(res), self.opts
def _assertSchemaEqual(expected, actual, testcase):
"""Utility method to dump diffs if the schema aren't equal.
Args:
expected: object, the expected results.
actual: object, the actual results.
testcase: unittest.TestCase, the test case this assertion is used within.
"""
if expected != actual:
expected_text = json.dumps(expected, indent=2, sort_keys=True)
actual_text = json.dumps(actual, indent=2, sort_keys=True)
diff = difflib.unified_diff(expected_text.splitlines(True),
actual_text.splitlines(True),
fromfile='expected.schema',
tofile='actual.schema')
diff_text = ''.join(list(diff))
testcase.fail('Schema differs from expected:\n%s' % diff_text)
def print_diff(self, row1, row2):
print "Discrepancy for {HITId}".format(**row1)
print yaml.dump(self.inputs(row1), default_flow_style=False)
formatted = [self.format_for_diff(r).split('\n') for r in [row1, row2]]
diff = unified_diff(*formatted, n=20)
for s in diff:
if s.strip() in ('---', '+++'):
continue
if s.startswith('-'):
print colored("{:<40}".format(s[1:]), 'red')
elif s.startswith('+'):
print colored("{:<40}".format(s[1:]), 'blue')
elif s.startswith(' '):
print s[1:]
#print "{:<40} {:<40}".format(s[1:], s[1:])
def _change_set_diff(change_set):
diff = []
for change in change_set.changes:
new = change.new_contents
old = change.old_contents
if old is None:
if change.resource.exists():
old = change.resource.read()
else:
old = ''
result = unified_diff(
old.splitlines(True), new.splitlines(True),
'a/' + change.resource.path, 'b/' + change.resource.path
)
diff.extend(list(result))
diff.append('\n')
return diff
def get_correct_indentation_diff(code, filename):
"""
Generate a diff to make code correctly indented.
:param code: a string containing a file's worth of Python code
:param filename: the filename being considered (used in diff generation only)
:returns: a unified diff to make code correctly indented, or
None if code is already correctedly indented
"""
code_buffer = StringIO(code)
output_buffer = StringIO()
reindenter = reindent.Reindenter(code_buffer)
reindenter.run()
reindenter.write(output_buffer)
reindent_output = output_buffer.getvalue()
output_buffer.close()
if code != reindent_output:
diff_generator = difflib.unified_diff(code.splitlines(True), reindent_output.splitlines(True),
fromfile=filename, tofile=filename + " (reindented)")
# work around http://bugs.python.org/issue2142
diff_tuple = map(clean_diff_line_for_python_bug_2142, diff_generator)
diff = "".join(diff_tuple)
return diff
else:
return None
def compare_file(reference, this_test):
ref_lines = open(reference,'r').readlines()
ref_lines = [ x.rstrip() for x in ref_lines]
this_test = [ x.rstrip() for x in this_test]
for line in difflib.unified_diff(ref_lines, this_test,
fromfile=reference, tofile="current"):
sys.stdout.write(line.rstrip()+'\n')
if len(ref_lines) != len(this_test):
mbuild.msgb("DIFFERENT NUMBER OF LINES", "ref %d test %d" % (len(ref_lines),len(this_test)))
for ref in ref_lines:
mbuild.msgb("EXPECTED",'%s' % (ref.strip()))
return False
for (ref,test) in zip(ref_lines,this_test):
if ref.strip() != test.strip():
if ref.find("XED version") != -1: # skip the version lines
continue
mbuild.msgb("DIFFERENT", "\n\tref [%s]\n\ttest [%s]" % (ref, test))
return False
return True
def diff_texts(a, b, filename):
"""Return a unified diff of two strings."""
a = a.splitlines()
b = b.splitlines()
return difflib.unified_diff(a, b, filename, filename,
"(original)", "(refactored)",
lineterm="")
def generate_diff(current, coming):
"""Generates diff of changes"""
return '\n'.join(
difflib.unified_diff(
current.splitlines(), coming.splitlines(),
'Current content', 'Coming changes'
)
)
def testRoundTrip(self):
import difflib
srcDir = GLYPHSETDIR
dstDir = self.dstDir
src = GlyphSet(srcDir, ufoFormatVersion=2)
dst = GlyphSet(dstDir, ufoFormatVersion=2)
for glyphName in src.keys():
g = src[glyphName]
g.drawPoints(None) # load attrs
dst.writeGlyph(glyphName, g, g.drawPoints)
# compare raw file data:
for glyphName in sorted(src.keys()):
fileName = src.contents[glyphName]
with open(os.path.join(srcDir, fileName), "r") as f:
org = f.read()
with open(os.path.join(dstDir, fileName), "r") as f:
new = f.read()
added = []
removed = []
for line in difflib.unified_diff(
org.split("\n"), new.split("\n")):
if line.startswith("+ "):
added.append(line[1:])
elif line.startswith("- "):
removed.append(line[1:])
self.assertEqual(
added, removed,
"%s.glif file differs after round tripping" % glyphName)
def diff(self, expected, actual):
import difflib
expected = str(self.Glyph(expected)).splitlines(True)
actual = str(self.Glyph(actual)).splitlines(True)
diff = difflib.unified_diff(
expected, actual, fromfile='expected', tofile='actual')
return "".join(diff)
def print_commit(commit, with_diff=False):
t = time.strftime("%a, %d %b %Y %H:%M", time.gmtime(commit.authored_date))
print(commit.hexsha, commit.author.name, t, commit.message)
print("stats:", commit.stats.total)
print()
if with_diff and len(commit.parents):
diffs = commit.diff(commit.parents[0])
for d in diffs:
print(d)
b_lines = str(d.b_blob.data_stream.read()).split()
a_lines = str(d.a_blob.data_stream.read()).split()
differ = difflib.Differ()
delta = differ.compare(b_lines, a_lines)
for i in delta:
print(i)
line_number = 0
for line in delta:
# split off the code
code = line[:2]
# if the line is in both files or just a, increment the
# line number.
if code in (" ", "+ "):
line_number += 1
# if this line is only in a, print the line number and
# the text on the line
if code == "+ ":
print("%d: %s" % (line_number, line[2:].strip()))
# print(b_lines)
# print(a_lines)
# dcont = list(difflib.unified_diff(
# b_lines, a_lines, d.b_path, d.a_path))
# for l in dcont:
# print(l)
print("------------------------")
print("+++++++++++++++++++++++++++++++++++++++++++++++++++++++")
print()
def assert_equal_diff(self, received, expected):
if received != expected:
diff_string = '\n'.join(difflib.unified_diff(expected.split('\n'),
received.split('\n'),
'EXPECTED', 'RECEIVED',
lineterm=''))
self.fail(diff_string)
def assert_startswith_diff(self, received, prefix):
if not received.startswith(prefix):
prefix_lines = prefix.split('\n')
diff_string = '\n'.join(difflib.unified_diff(prefix_lines,
received.split('\n')[:len(prefix_lines)],
'EXPECTED', 'RECEIVED',
lineterm=''))
self.fail(diff_string)
def get_diff(self, article_path, a, b):
"""Return a diff string between two revisions of a given
article title.
"""
revision_a = self.get_revision(article_path, a)
revision_b = self.get_revision(article_path, b)
unified_diff = '\n'.join(
list(
difflib.unified_diff(
revision_a['raw'].splitlines(),
revision_b['raw'].splitlines(),
fromfile='{}/{}'.format('a', article_path),
tofile='{}/{}'.format('b', article_path),
lineterm='',
)
)
)
diff_template = """diff --git a/{title} b/{title}
index {sha_a}..{sha_b} {file_mode}
{diff}
"""
unified_diff = diff_template.format(
title=article_path,
diff=unified_diff,
sha_a=a[0:7],
sha_b=b[0:7],
file_mode=oct(
os.stat(self.full_article_path(article_path)).st_mode
)[2:]
)
# Escape HTML and "non-breaking space"
return self.escape_html(unified_diff)
def _diff_monitors(self, monitor1, monitor1_name, monitor2, monitor2_name):
"""
Convenience method for performing a diff of two monitors, returning the diff as a string.
"""
return "\n".join(difflib.unified_diff(
json.dumps(monitor1, indent=4, sort_keys=True).splitlines(),
json.dumps(monitor2, indent=4, sort_keys=True).splitlines(),
fromfile=monitor1_name,
tofile=monitor2_name
))
def diff(args, config, connection=None):
verbose = args.verbose
last_schema = get_last_schema()
current_schema = get_current_schema()
added, removed, changed = compare_schemas(last_schema, current_schema)
if any((added, removed, changed)):
if added:
print('New items:')
for item in added:
print(' %s' % item)
if removed:
print('Removed items:')
for item in removed:
print(' %s' % item)
if changed:
print('Changed items:')
for item in changed:
print(' %s' % item)
if verbose:
_diff = difflib.unified_diff(last_schema[item]['up'],
current_schema[item]['up'])
print('\n'.join(_diff))
else:
print('No changes')
def _show_diff(self, file_contents):
for line in unified_diff(
file_contents.splitlines(1),
self.output.splitlines(1),
fromfile=self.file_path + ':before',
tofile=self.file_path + ':after',
fromfiledate=str(datetime.fromtimestamp(os.path.getmtime(self.file_path))
if self.file_path else datetime.now()),
tofiledate=str(datetime.now())
):
sys.stdout.write(line)
def diff(self):
texts = []
for old_path, (new_path, old_l, new_l) in self.change_dct.items():
if old_path:
udiff = difflib.unified_diff(old_l, new_l)
else:
udiff = difflib.unified_diff(old_l, new_l, old_path, new_path)
texts.append('\n'.join(udiff))
return '\n'.join(texts)
def diff(self):
texts = []
for old_path, (new_path, old_l, new_l) in self.change_dct.items():
if old_path:
udiff = difflib.unified_diff(old_l, new_l)
else:
udiff = difflib.unified_diff(old_l, new_l, old_path, new_path)
texts.append('\n'.join(udiff))
return '\n'.join(texts)
def indent_files(files, diff=False, debug=False, level=0, inplace=False):
output = []
for f in files:
dst = indent(f, debug=debug, level=level)
output.append([f,dst])
# First, copy to inplace
if inplace:
for src,dst in output:
shutil.copyfile(dst,src)
return True
# now, compare
failed = []
for src,dst in output:
if filecmp.cmp(src,dst) == 0:
failed.append([src, dst])
if len(failed) > 0:
if not diff:
print 'Found %u badly indented files:' % len(failed)
for src,dst in failed:
print ' ' + src
else:
for src,dst in failed:
s = open(src, 'r').readlines()
d = open(dst, 'r').readlines()
for line in difflib.unified_diff(s, d, fromfile=src, tofile=dst):
sys.stdout.write(line)
return False
return True
def _assertExpectedWithOutput(self, name):
filename = name + '.conf'
expected_path = os.path.join(self.expected, filename)
test_path = os.path.join(self.out_dir, filename)
self.assertTrue(os.path.exists(expected_path))
self.assertTrue(os.path.exists(test_path))
with io.open(expected_path, encoding='utf8') as expected_file:
with io.open(test_path, encoding='utf8') as test_file:
expected_data = expected_file.readlines()
test_data = test_file.readlines()
diff = difflib.unified_diff(
expected_data, test_data, lineterm='')
diff_data = ''.join(list(diff))
self.assertTrue(diff_data == '', msg=diff_data)
def diffTwoFiles( self, title, old_lines, new_lines, header_left, header_right ):
if self.app.prefs.view.isDiffUnified():
all_lines = list( difflib.unified_diff( old_lines, new_lines ) )
self.showDiffText( title, all_lines )
elif self.app.prefs.view.isDiffSideBySide():
window = wb_diff_side_by_side_view.DiffSideBySideView(
self.app, None,
title,
old_lines, header_left,
new_lines, header_right )
window.show()
def _print_diff(self, diff, indent_level):
if isinstance(diff, dict):
try:
diff = '\n'.join(difflib.unified_diff(diff['before'].splitlines(),
diff['after'].splitlines(),
fromfile=diff.get('before_header',
'new_file'),
tofile=diff['after_header']))
except AttributeError:
diff = dict_diff(diff['before'], diff['after'])
if diff:
diff = colorize(diff, 'changed')
print(self._indent_text(diff, indent_level+4))
def test_all_correct_contents(self):
"""Tests that the content of all non-static output files is expected."""
for dirname, _, filenames in os.walk(TTS_REFERENCE_PATH):
for filename in filenames:
if filename == caterpillar.SW_SCRIPT_NAME:
# Service worker is partly random, so test it elsewhere.
continue
reference_path = os.path.join(dirname, filename)
relpath = os.path.relpath(reference_path, TTS_REFERENCE_PATH)
if not (relpath.startswith(self.boilerplate_dir)
or relpath.startswith(self.report_dir)):
output_path = os.path.join(self.output_dir, relpath)
with open(reference_path) as reference_file:
with open(output_path) as output_file:
output_data = output_file.read().decode('utf-8')
reference_data = reference_file.read().decode('utf-8')
self.assertEqual(output_data, reference_data,
'Difference found in file `{}`.\n{}'.format(relpath,
'\n'.join(difflib.unified_diff(
output_data.split('\n'),
reference_data.split('\n'),
fromfile=reference_path,
tofile=output_path,
n=0))))
def test_generated_service_worker(self):
"""Tests that the generated service worker is as expected."""
output_sw_path = os.path.join(self.output_dir, caterpillar.SW_SCRIPT_NAME)
reference_sw_path = os.path.join(
TTS_REFERENCE_PATH, caterpillar.SW_SCRIPT_NAME)
with open(output_sw_path) as output_file:
with open(reference_sw_path) as reference_file:
output_data = output_file.read().decode('utf-8')
reference_data = reference_file.read().decode('utf-8')
# The cache version is randomly generated in the output service worker,
# so reset it to be 0, the same cache version as the reference service
# worker.
output_data = re.sub(
r'CACHE_VERSION = \d+', 'CACHE_VERSION = 0', output_data)
self.assertEqual(output_data, reference_data,
'Difference found in file `{}`.\n{}'.format(
caterpillar.SW_SCRIPT_NAME,
'\n'.join(difflib.unified_diff(
output_data.split('\n'),
reference_data.split('\n'),
fromfile=reference_sw_path,
tofile=output_sw_path,
n=0))))
def _handle_existing_policy(name, new_rules, existing_rules):
ret = { 'name': name }
if new_rules == existing_rules:
ret['result'] = True
ret['changes'] = None
ret['comment'] = 'Policy exists, and has the correct content'
return ret
change = ''.join(difflib.unified_diff(existing_rules.splitlines(True), new_rules.splitlines(True)))
if __opts__['test']:
ret['result'] = None
ret['changes'] = { name: { 'change': change } }
ret['comment'] = 'Policy would be changed'
return ret
payload = { 'rules': new_rules }
url = "v1/sys/policy/{0}".format(name)
response = __utils__['vault.make_request']('PUT', url, json=payload)
if response.status_code != 204:
return {
'name': name,
'changes': None,
'result': False,
'comment': 'Failed to change policy: {0}'.format(response.reason)
}
ret['result'] = True
ret['changes'] = { name: { 'change': change } }
ret['comment'] = 'Policy was updated'
return ret
def clobber_if_necessary(new_landmines, src_dir):
"""Does the work of setting, planting, and triggering landmines."""
out_dir = get_build_dir(src_dir)
landmines_path = os.path.normpath(os.path.join(src_dir, '.landmines'))
try:
os.makedirs(out_dir)
except OSError as e:
if e.errno == errno.EEXIST:
pass
if os.path.exists(landmines_path):
with open(landmines_path, 'r') as f:
old_landmines = f.readlines()
if old_landmines != new_landmines:
old_date = time.ctime(os.stat(landmines_path).st_ctime)
diff = difflib.unified_diff(old_landmines, new_landmines,
fromfile='old_landmines', tofile='new_landmines',
fromfiledate=old_date, tofiledate=time.ctime(), n=0)
sys.stdout.write('Clobbering due to:\n')
sys.stdout.writelines(diff)
sys.stdout.flush()
clobber.clobber(out_dir)
# Save current set of landmines for next time.
with open(landmines_path, 'w') as f:
f.writelines(new_landmines)
def apply_deployment(project, template):
body = {
'name': template.name,
'description': 'project: {}, name: {}'.format(project, template.name),
'target': {
'config': {
'content': str(template)
}
}
}
result = None
try:
deployment = get_deployment(project, template.name)
if deployment:
logging.info('Deployment already exists. Getting changes for {}...'.format(template.name))
body['fingerprint'] = deployment.get('fingerprint')
changed = False
existing_template = get_manifest(project, deployment)['config']['content']
for diff in color_diff(difflib.unified_diff(existing_template.splitlines(),
str(template).splitlines(),
fromfile='Existing template', tofile='Proposed template')):
changed = True
print(diff)
if changed and confirm_action():
result = dm.deployments().update(project=project, deployment=template.name, body=body).execute()
else:
logging.info('No changes in the template.')
sys.exit(0)
else:
logging.info('Generated template:\n{}\n'.format(template))
logging.info('Launching a new deployment: {}'.format(template.name))
if confirm_action():
result = dm.deployments().insert(project=project, body=body).execute()
except errors.HttpError as e:
raise e
if result:
return wait_for_completion(project, result)
def unified_diff(gold_path, current_path):
gold = json.dumps(json.load(open(gold_path, "r")), indent=4, sort_keys=True)
current = json.dumps(json.load(open(current_path, "r")), indent=4, sort_keys=True)
udiff = list(difflib.unified_diff(gold.split("\n"), current.split("\n"),
fromfile=os.path.basename(gold_path),
tofile=os.path.basename(current_path),
lineterm=""))
return udiff
#### Test functions
def fetch(self):
urlp = urllib.parse.urlparse(self.url)
lastupd = self.status.load()
old_entries = lastupd.get('entries')
fetch_time = int(time.time())
with ftputil.FTPHost(urlp.hostname, urlp.username or 'anonymous', urlp.password) as host:
try:
st_mtime = host.lstat(urlp.path.rstrip('/')).st_mtime
except ftputil.error.RootDirError:
st_mtime = None
if st_mtime and st_mtime == lastupd.get('mtime'):
return
else:
lastupd['mtime'] = st_mtime
entries = sorted(x for x in host.listdir(urlp.path) if
(not self.regex or self.regex.search(x)))
lastupd['entries'] = entries
self.status = self.status.save(fetch_time, lastupd)
if not old_entries or entries == old_entries:
return
else:
diff = tuple(difflib.unified_diff(old_entries, entries, lineterm=''))
title = 'FTP directory changed'
for text in diff[2:]:
if text[0] == '+':
title = text[1:]
break
content = (markupsafe.Markup('<pre>%s</pre>') % '\n'.join(diff[2:]))
yield Event(self.name, self.category,
lastupd.get('mtime') or fetch_time, title, content, self.url)
def check_path(self, path, files, zip_data):
logging.info("path={0}".format(path))
tally = defaultdict(int)
for filename in files:
if path is None or path == '':
testfile_path = os.path.abspath(os.path.join(self.ref_dir, filename))
testfile_key = filename
else:
testfile_path = os.path.abspath(os.path.join(self.ref_dir, path, filename))
testfile_key = os.path.join(path, filename)
# set up score value for matching output correctly
score = self.default_score
if path in self.path_score:
score = self.path_score[path]
logging.info("Checking {0}".format(testfile_key))
if testfile_key in zip_data:
with open(testfile_path, 'r') as ref:
ref_data = map(lambda x: x.strip(), ref.read().splitlines())
output_data = map(lambda x: x.strip(), zip_data[testfile_key].splitlines())
diff_lines = list(difflib.unified_diff(ref_data, output_data, "reference", "your-output", lineterm=''))
if len(diff_lines) > 0:
logging.info("Diff between reference and your output for {0}".format(testfile_key))
logging.info("{0}{1}".format(self.linesep, self.linesep.join(list(diff_lines))))
else:
tally['score'] += score
tally['num_correct'] += 1
logging.info("{0} Correct!".format(testfile_key))
tally['total'] += 1
self.perf[path] = dict(tally)