# Copyright (C) 2009, 2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
"""Tests for Annotators."""
from bzrlib import (
annotate,
_annotator_py,
errors,
knit,
revision,
tests,
)
def load_tests(standard_tests, module, loader):
"""Parameterize tests for all versions of groupcompress."""
suite, _ = tests.permute_tests_for_extension(standard_tests, loader,
'bzrlib._annotator_py', 'bzrlib._annotator_pyx')
return suite
class TestAnnotator(tests.TestCaseWithMemoryTransport):
module = None # Set by load_tests
fa_key = ('f-id', 'a-id')
fb_key = ('f-id', 'b-id')
fc_key = ('f-id', 'c-id')
fd_key = ('f-id', 'd-id')
fe_key = ('f-id', 'e-id')
ff_key = ('f-id', 'f-id')
def make_no_graph_texts(self):
factory = knit.make_pack_factory(False, False, 2)
self.vf = factory(self.get_transport())
self.ann = self.module.Annotator(self.vf)
self.vf.add_lines(self.fa_key, (), ['simple\n', 'content\n'])
self.vf.add_lines(self.fb_key, (), ['simple\n', 'new content\n'])
def make_simple_text(self):
# TODO: all we really need is a VersionedFile instance, we'd like to
# avoid creating all the intermediate stuff
factory = knit.make_pack_factory(True, True, 2)
self.vf = factory(self.get_transport())
# This assumes nothing special happens during __init__, which may be
# valid
self.ann = self.module.Annotator(self.vf)
# A 'simple|content|'
# |
# B 'simple|new content|'
self.vf.add_lines(self.fa_key, [], ['simple\n', 'content\n'])
self.vf.add_lines(self.fb_key, [self.fa_key],
['simple\n', 'new content\n'])
def make_merge_text(self):
self.make_simple_text()
# A 'simple|content|'
# |\
# B | 'simple|new content|'
# | |
# | C 'simple|from c|content|'
# |/
# D 'simple|from c|new content|introduced in merge|'
self.vf.add_lines(self.fc_key, [self.fa_key],
['simple\n', 'from c\n', 'content\n'])
self.vf.add_lines(self.fd_key, [self.fb_key, self.fc_key],
['simple\n', 'from c\n', 'new content\n',
'introduced in merge\n'])
def make_common_merge_text(self):
"""Both sides of the merge will have introduced a line."""
self.make_simple_text()
# A 'simple|content|'
# |\
# B | 'simple|new content|'
# | |
# | C 'simple|new content|'
# |/
# D 'simple|new content|'
self.vf.add_lines(self.fc_key, [self.fa_key],
['simple\n', 'new content\n'])
self.vf.add_lines(self.fd_key, [self.fb_key, self.fc_key],
['simple\n', 'new content\n'])
def make_many_way_common_merge_text(self):
self.make_simple_text()
# A-. 'simple|content|'
# |\ \
# B | | 'simple|new content|'
# | | |
# | C | 'simple|new content|'
# |/ |
# D | 'simple|new content|'
# | |
# | E 'simple|new content|'
# | /
# F-' 'simple|new content|'
self.vf.add_lines(self.fc_key, [self.fa_key],
['simple\n', 'new content\n'])
self.vf.add_lines(self.fd_key, [self.fb_key, self.fc_key],
['simple\n', 'new content\n'])
self.vf.add_lines(self.fe_key, [self.fa_key],
['simple\n', 'new content\n'])
self.vf.add_lines(self.ff_key, [self.fd_key, self.fe_key],
['simple\n', 'new content\n'])
def make_merge_and_restored_text(self):
self.make_simple_text()
# A 'simple|content|'
# |\
# B | 'simple|new content|'
# | |
# C | 'simple|content|' # reverted to A
# \|
# D 'simple|content|'
# c reverts back to 'a' for the new content line
self.vf.add_lines(self.fc_key, [self.fb_key],
['simple\n', 'content\n'])
# d merges 'a' and 'c', to find both claim last modified
self.vf.add_lines(self.fd_key, [self.fa_key, self.fc_key],
['simple\n', 'content\n'])
def assertAnnotateEqual(self, expected_annotation, key, exp_text=None):
annotation, lines = self.ann.annotate(key)
self.assertEqual(expected_annotation, annotation)
if exp_text is None:
record = self.vf.get_record_stream([key], 'unordered', True).next()
exp_text = record.get_bytes_as('fulltext')
self.assertEqualDiff(exp_text, ''.join(lines))
def test_annotate_missing(self):
self.make_simple_text()
self.assertRaises(errors.RevisionNotPresent,
self.ann.annotate, ('not', 'present'))
def test_annotate_simple(self):
self.make_simple_text()
self.assertAnnotateEqual([(self.fa_key,)]*2, self.fa_key)
self.assertAnnotateEqual([(self.fa_key,), (self.fb_key,)], self.fb_key)
def test_annotate_merge_text(self):
self.make_merge_text()
self.assertAnnotateEqual([(self.fa_key,), (self.fc_key,),
(self.fb_key,), (self.fd_key,)],
self.fd_key)
def test_annotate_common_merge_text(self):
self.make_common_merge_text()
self.assertAnnotateEqual([(self.fa_key,), (self.fb_key, self.fc_key)],
self.fd_key)
def test_annotate_many_way_common_merge_text(self):
self.make_many_way_common_merge_text()
self.assertAnnotateEqual([(self.fa_key,),
(self.fb_key, self.fc_key, self.fe_key)],
self.ff_key)
def test_annotate_merge_and_restored(self):
self.make_merge_and_restored_text()
self.assertAnnotateEqual([(self.fa_key,), (self.fa_key, self.fc_key)],
self.fd_key)
def test_annotate_flat_simple(self):
self.make_simple_text()
self.assertEqual([(self.fa_key, 'simple\n'),
(self.fa_key, 'content\n'),
], self.ann.annotate_flat(self.fa_key))
self.assertEqual([(self.fa_key, 'simple\n'),
(self.fb_key, 'new content\n'),
], self.ann.annotate_flat(self.fb_key))
def test_annotate_flat_merge_and_restored_text(self):
self.make_merge_and_restored_text()
# fc is a simple dominator of fa
self.assertEqual([(self.fa_key, 'simple\n'),
(self.fc_key, 'content\n'),
], self.ann.annotate_flat(self.fd_key))
def test_annotate_common_merge_text(self):
self.make_common_merge_text()
# there is no common point, so we just pick the lexicographical lowest
# and 'b-id' comes before 'c-id'
self.assertEqual([(self.fa_key, 'simple\n'),
(self.fb_key, 'new content\n'),
], self.ann.annotate_flat(self.fd_key))
def test_annotate_many_way_common_merge_text(self):
self.make_many_way_common_merge_text()
self.assertEqual([(self.fa_key, 'simple\n'),
(self.fb_key, 'new content\n')],
self.ann.annotate_flat(self.ff_key))
def test_annotate_flat_respects_break_ann_tie(self):
tiebreaker = annotate._break_annotation_tie
try:
calls = []
def custom_tiebreaker(annotated_lines):
self.assertEqual(2, len(annotated_lines))
left = annotated_lines[0]
self.assertEqual(2, len(left))
self.assertEqual('new content\n', left[1])
right = annotated_lines[1]
self.assertEqual(2, len(right))
self.assertEqual('new content\n', right[1])
calls.append((left[0], right[0]))
# Our custom tiebreaker takes the *largest* value, rather than
# the *smallest* value
if left[0] < right[0]:
return right
else:
return left
annotate._break_annotation_tie = custom_tiebreaker
self.make_many_way_common_merge_text()
self.assertEqual([(self.fa_key, 'simple\n'),
(self.fe_key, 'new content\n')],
self.ann.annotate_flat(self.ff_key))
self.assertEqual([(self.fe_key, self.fc_key),
(self.fe_key, self.fb_key)], calls)
finally:
annotate._break_annotation_tie = tiebreaker
def test_needed_keys_simple(self):
self.make_simple_text()
keys, ann_keys = self.ann._get_needed_keys(self.fb_key)
self.assertEqual([self.fa_key, self.fb_key], sorted(keys))
self.assertEqual({self.fa_key: 1, self.fb_key: 1},
self.ann._num_needed_children)
self.assertEqual(set(), ann_keys)
def test_needed_keys_many(self):
self.make_many_way_common_merge_text()
keys, ann_keys = self.ann._get_needed_keys(self.ff_key)
self.assertEqual([self.fa_key, self.fb_key, self.fc_key,
self.fd_key, self.fe_key, self.ff_key,
], sorted(keys))
self.assertEqual({self.fa_key: 3,
self.fb_key: 1,
self.fc_key: 1,
self.fd_key: 1,
self.fe_key: 1,
self.ff_key: 1,
}, self.ann._num_needed_children)
self.assertEqual(set(), ann_keys)
def test_needed_keys_with_special_text(self):
self.make_many_way_common_merge_text()
spec_key = ('f-id', revision.CURRENT_REVISION)
spec_text = 'simple\nnew content\nlocally modified\n'
self.ann.add_special_text(spec_key, [self.fd_key, self.fe_key],
spec_text)
keys, ann_keys = self.ann._get_needed_keys(spec_key)
self.assertEqual([self.fa_key, self.fb_key, self.fc_key,
self.fd_key, self.fe_key,
], sorted(keys))
self.assertEqual([spec_key], sorted(ann_keys))
def test_needed_keys_with_parent_texts(self):
self.make_many_way_common_merge_text()
# If 'D' and 'E' are already annotated, we don't need to extract all
# the texts
# D | 'simple|new content|'
# | |
# | E 'simple|new content|'
# | /
# F-' 'simple|new content|'
self.ann._parent_map[self.fd_key] = (self.fb_key, self.fc_key)
self.ann._text_cache[self.fd_key] = ['simple\n', 'new content\n']
self.ann._annotations_cache[self.fd_key] = [
(self.fa_key,),
(self.fb_key, self.fc_key),
]
self.ann._parent_map[self.fe_key] = (self.fa_key,)
self.ann._text_cache[self.fe_key] = ['simple\n', 'new content\n']
self.ann._annotations_cache[self.fe_key] = [
(self.fa_key,),
(self.fe_key,),
]
keys, ann_keys = self.ann._get_needed_keys(self.ff_key)
self.assertEqual([self.ff_key], sorted(keys))
self.assertEqual({self.fd_key: 1,
self.fe_key: 1,
self.ff_key: 1,
}, self.ann._num_needed_children)
self.assertEqual([], sorted(ann_keys))
def test_record_annotation_removes_texts(self):
self.make_many_way_common_merge_text()
# Populate the caches
for x in self.ann._get_needed_texts(self.ff_key):
continue
self.assertEqual({self.fa_key: 3,
self.fb_key: 1,
self.fc_key: 1,
self.fd_key: 1,
self.fe_key: 1,
self.ff_key: 1,
}, self.ann._num_needed_children)
self.assertEqual([self.fa_key, self.fb_key, self.fc_key,
self.fd_key, self.fe_key, self.ff_key,
], sorted(self.ann._text_cache.keys()))
self.ann._record_annotation(self.fa_key, [], [])
self.ann._record_annotation(self.fb_key, [self.fa_key], [])
self.assertEqual({self.fa_key: 2,
self.fb_key: 1,
self.fc_key: 1,
self.fd_key: 1,
self.fe_key: 1,
self.ff_key: 1,
}, self.ann._num_needed_children)
self.assertTrue(self.fa_key in self.ann._text_cache)
self.assertTrue(self.fa_key in self.ann._annotations_cache)
self.ann._record_annotation(self.fc_key, [self.fa_key], [])
self.ann._record_annotation(self.fd_key, [self.fb_key, self.fc_key], [])
self.assertEqual({self.fa_key: 1,
self.fb_key: 0,
self.fc_key: 0,
self.fd_key: 1,
self.fe_key: 1,
self.ff_key: 1,
}, self.ann._num_needed_children)
self.assertTrue(self.fa_key in self.ann._text_cache)
self.assertTrue(self.fa_key in self.ann._annotations_cache)
self.assertFalse(self.fb_key in self.ann._text_cache)
self.assertFalse(self.fb_key in self.ann._annotations_cache)
self.assertFalse(self.fc_key in self.ann._text_cache)
self.assertFalse(self.fc_key in self.ann._annotations_cache)
def test_annotate_special_text(self):
# Things like WT and PreviewTree want to annotate an arbitrary text
# ('current:') so we need a way to add that to the group of files to be
# annotated.
self.make_many_way_common_merge_text()
# A-. 'simple|content|'
# |\ \
# B | | 'simple|new content|'
# | | |
# | C | 'simple|new content|'
# |/ |
# D | 'simple|new content|'
# | |
# | E 'simple|new content|'
# | /
# SPEC 'simple|new content|locally modified|'
spec_key = ('f-id', revision.CURRENT_REVISION)
spec_text = 'simple\nnew content\nlocally modified\n'
self.ann.add_special_text(spec_key, [self.fd_key, self.fe_key],
spec_text)
self.assertAnnotateEqual([(self.fa_key,),
(self.fb_key, self.fc_key, self.fe_key),
(spec_key,),
], spec_key,
exp_text=spec_text)
def test_no_graph(self):
self.make_no_graph_texts()
self.assertAnnotateEqual([(self.fa_key,),
(self.fa_key,),
], self.fa_key)
self.assertAnnotateEqual([(self.fb_key,),
(self.fb_key,),
], self.fb_key)
|