test_push.py :  » Development » Bazaar » bzr-2.2b3 » bzrlib » tests » per_branch » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Development » Bazaar 
Bazaar » bzr 2.2b3 » bzrlib » tests » per_branch » test_push.py
# Copyright (C) 2007-2010 Canonical Ltd
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA

"""Tests for branch.push behaviour."""

from cStringIO import StringIO
import os

from bzrlib import (
    branch,
    builtins,
    bzrdir,
    check,
    debug,
    errors,
    memorytree,
    push,
    repository,
    revision,
    tests,
    transport,
    )
from bzrlib.smart import (
    client,
    server,
    repository as _mod_smart_repo,
    )
from bzrlib.tests import (
    per_branch,
    test_server,
    )


class TestPush(per_branch.TestCaseWithBranch):

    def test_push_convergence_simple(self):
        # when revisions are pushed, the left-most accessible parents must
        # become the revision-history.
        mine = self.make_branch_and_tree('mine')
        mine.commit('1st post', rev_id='P1', allow_pointless=True)
        other = mine.bzrdir.sprout('other').open_workingtree()
        other.commit('my change', rev_id='M1', allow_pointless=True)
        mine.merge_from_branch(other.branch)
        mine.commit('merge my change', rev_id='P2')
        result = mine.branch.push(other.branch)
        self.assertEqual(['P1', 'P2'], other.branch.revision_history())
        # result object contains some structured data
        self.assertEqual(result.old_revid, 'M1')
        self.assertEqual(result.new_revid, 'P2')
        # and it can be treated as an integer for compatibility
        self.assertEqual(int(result), 0)

    def test_push_merged_indirect(self):
        # it should be possible to do a push from one branch into another
        # when the tip of the target was merged into the source branch
        # via a third branch - so its buried in the ancestry and is not
        # directly accessible.
        mine = self.make_branch_and_tree('mine')
        mine.commit('1st post', rev_id='P1', allow_pointless=True)
        target = mine.bzrdir.sprout('target').open_workingtree()
        target.commit('my change', rev_id='M1', allow_pointless=True)
        other = mine.bzrdir.sprout('other').open_workingtree()
        other.merge_from_branch(target.branch)
        other.commit('merge my change', rev_id='O2')
        mine.merge_from_branch(other.branch)
        mine.commit('merge other', rev_id='P2')
        mine.branch.push(target.branch)
        self.assertEqual(['P1', 'P2'], target.branch.revision_history())

    def test_push_to_checkout_updates_master(self):
        """Pushing into a checkout updates the checkout and the master branch"""
        master_tree = self.make_branch_and_tree('master')
        checkout = self.make_branch_and_tree('checkout')
        try:
            checkout.branch.bind(master_tree.branch)
        except errors.UpgradeRequired:
            # cant bind this format, the test is irrelevant.
            return
        rev1 = checkout.commit('master')

        other = master_tree.branch.bzrdir.sprout('other').open_workingtree()
        rev2 = other.commit('other commit')
        # now push, which should update both checkout and master.
        other.branch.push(checkout.branch)
        self.assertEqual([rev1, rev2], checkout.branch.revision_history())
        self.assertEqual([rev1, rev2], master_tree.branch.revision_history())

    def test_push_raises_specific_error_on_master_connection_error(self):
        master_tree = self.make_branch_and_tree('master')
        checkout = self.make_branch_and_tree('checkout')
        try:
            checkout.branch.bind(master_tree.branch)
        except errors.UpgradeRequired:
            # cant bind this format, the test is irrelevant.
            return
        other = master_tree.branch.bzrdir.sprout('other').open_workingtree()
        # move the branch out of the way on disk to cause a connection
        # error.
        os.rename('master', 'master_gone')
        # try to push, which should raise a BoundBranchConnectionFailure.
        self.assertRaises(errors.BoundBranchConnectionFailure,
                other.branch.push, checkout.branch)

    def test_push_uses_read_lock(self):
        """Push should only need a read lock on the source side."""
        source = self.make_branch_and_tree('source')
        target = self.make_branch('target')

        self.build_tree(['source/a'])
        source.add(['a'])
        source.commit('a')

        source.branch.lock_read()
        try:
            target.lock_write()
            try:
                source.branch.push(target, stop_revision=source.last_revision())
            finally:
                target.unlock()
        finally:
            source.branch.unlock()

    def test_push_within_repository(self):
        """Push from one branch to another inside the same repository."""
        try:
            repo = self.make_repository('repo', shared=True)
        except (errors.IncompatibleFormat, errors.UninitializableFormat):
            # This Branch format cannot create shared repositories
            return
        # This is a little bit trickier because make_branch_and_tree will not
        # re-use a shared repository.
        a_bzrdir = self.make_bzrdir('repo/tree')
        try:
            a_branch = self.branch_format.initialize(a_bzrdir)
        except (errors.UninitializableFormat):
            # Cannot create these branches
            return
        try:
            tree = a_branch.bzrdir.create_workingtree()
        except errors.NotLocalUrl:
            if self.vfs_transport_factory is test_server.LocalURLServer:
                # the branch is colocated on disk, we cannot create a checkout.
                # hopefully callers will expect this.
                local_controldir= bzrdir.BzrDir.open(
                    self.get_vfs_only_url('repo/tree'))
                tree = local_controldir.create_workingtree()
            else:
                tree = a_branch.create_checkout('repo/tree', lightweight=True)
        self.build_tree(['repo/tree/a'])
        tree.add(['a'])
        tree.commit('a')

        to_bzrdir = self.make_bzrdir('repo/branch')
        to_branch = self.branch_format.initialize(to_bzrdir)
        tree.branch.push(to_branch)

        self.assertEqual(tree.branch.last_revision(),
                         to_branch.last_revision())

    def test_push_overwrite_of_non_tip_with_stop_revision(self):
        """Combining the stop_revision and overwrite options works.

        This was <https://bugs.launchpad.net/bzr/+bug/234229>.
        """
        source = self.make_branch_and_tree('source')
        target = self.make_branch('target')

        source.commit('1st commit')
        source.branch.push(target)
        source.commit('2nd commit', rev_id='rev-2')
        source.commit('3rd commit')

        source.branch.push(target, stop_revision='rev-2', overwrite=True)
        self.assertEqual('rev-2', target.last_revision())

    def test_push_with_default_stacking_does_not_create_broken_branch(self):
        """Pushing a new standalone branch works even when there's a default
        stacking policy at the destination.

        The new branch will preserve the repo format (even if it isn't the
        default for the branch), and will be stacked when the repo format
        allows (which means that the branch format isn't necessarly preserved).
        """
        if isinstance(self.branch_format, branch.BzrBranchFormat4):
            raise tests.TestNotApplicable('Not a metadir format.')
        if isinstance(self.branch_format, branch.BranchReferenceFormat):
            # This test could in principle apply to BranchReferenceFormat, but
            # make_branch_builder doesn't support it.
            raise tests.TestSkipped(
                "BranchBuilder can't make reference branches.")
        # Make a branch called "local" in a stackable repository
        # The branch has 3 revisions:
        #   - rev-1, adds a file
        #   - rev-2, no changes
        #   - rev-3, modifies the file.
        repo = self.make_repository('repo', shared=True, format='1.6')
        builder = self.make_branch_builder('repo/local')
        builder.start_series()
        builder.build_snapshot('rev-1', None, [
            ('add', ('', 'root-id', 'directory', '')),
            ('add', ('filename', 'f-id', 'file', 'content\n'))])
        builder.build_snapshot('rev-2', ['rev-1'], [])
        builder.build_snapshot('rev-3', ['rev-2'],
            [('modify', ('f-id', 'new-content\n'))])
        builder.finish_series()
        trunk = builder.get_branch()
        # Sprout rev-1 to "trunk", so that we can stack on it.
        trunk.bzrdir.sprout(self.get_url('trunk'), revision_id='rev-1')
        # Set a default stacking policy so that new branches will automatically
        # stack on trunk.
        self.make_bzrdir('.').get_config().set_default_stack_on('trunk')
        # Push rev-2 to a new branch "remote".  It will be stacked on "trunk".
        output = StringIO()
        push._show_push_branch(trunk, 'rev-2', self.get_url('remote'), output)
        # Push rev-3 onto "remote".  If "remote" not stacked and is missing the
        # fulltext record for f-id @ rev-1, then this will fail.
        remote_branch = branch.Branch.open(self.get_url('remote'))
        trunk.push(remote_branch)
        check.check_dwim(remote_branch.base, False, True, True)

    def test_no_get_parent_map_after_insert_stream(self):
        # Effort test for bug 331823
        self.setup_smart_server_with_call_log()
        # Make a local branch with four revisions.  Four revisions because:
        # one to push, one there for _walk_to_common_revisions to find, one we
        # don't want to access, one for luck :)
        if isinstance(self.branch_format, branch.BranchReferenceFormat):
            # This test could in principle apply to BranchReferenceFormat, but
            # make_branch_builder doesn't support it.
            raise tests.TestSkipped(
                "BranchBuilder can't make reference branches.")
        try:
            builder = self.make_branch_builder('local')
        except (errors.TransportNotPossible, errors.UninitializableFormat):
            raise tests.TestNotApplicable('format not directly constructable')
        builder.start_series()
        builder.build_snapshot('first', None, [
            ('add', ('', 'root-id', 'directory', ''))])
        builder.build_snapshot('second', ['first'], [])
        builder.build_snapshot('third', ['second'], [])
        builder.build_snapshot('fourth', ['third'], [])
        builder.finish_series()
        local = builder.get_branch()
        local = branch.Branch.open(self.get_vfs_only_url('local'))
        # Initial push of three revisions
        remote_bzrdir = local.bzrdir.sprout(
            self.get_url('remote'), revision_id='third')
        remote = remote_bzrdir.open_branch()
        # Push fourth revision
        self.reset_smart_call_log()
        self.disableOptimisticGetParentMap()
        self.assertFalse(local.is_locked())
        local.push(remote)
        hpss_call_names = [item.call.method for item in self.hpss_calls]
        self.assertTrue('Repository.insert_stream_1.19' in hpss_call_names)
        insert_stream_idx = hpss_call_names.index(
            'Repository.insert_stream_1.19')
        calls_after_insert_stream = hpss_call_names[insert_stream_idx:]
        # After inserting the stream the client has no reason to query the
        # remote graph any further.
        self.assertEqual(
            ['Repository.insert_stream_1.19', 'Repository.insert_stream_1.19',
             'get', 'Branch.set_last_revision_info', 'Branch.unlock'],
            calls_after_insert_stream)

    def disableOptimisticGetParentMap(self):
        # Tweak some class variables to stop remote get_parent_map calls asking
        # for or receiving more data than the caller asked for.
        self.overrideAttr(repository.InterRepository,
                          '_walk_to_common_revisions_batch_size', 1)
        self.overrideAttr(_mod_smart_repo.SmartServerRepositoryGetParentMap,
                          'no_extra_results', True)


class TestPushHook(per_branch.TestCaseWithBranch):

    def setUp(self):
        self.hook_calls = []
        super(TestPushHook, self).setUp()

    def capture_post_push_hook(self, result):
        """Capture post push hook calls to self.hook_calls.

        The call is logged, as is some state of the two branches.
        """
        if result.local_branch:
            local_locked = result.local_branch.is_locked()
            local_base = result.local_branch.base
        else:
            local_locked = None
            local_base = None
        self.hook_calls.append(
            ('post_push', result.source_branch, local_base,
             result.master_branch.base,
             result.old_revno, result.old_revid,
             result.new_revno, result.new_revid,
             result.source_branch.is_locked(), local_locked,
             result.master_branch.is_locked()))

    def test_post_push_empty_history(self):
        target = self.make_branch('target')
        source = self.make_branch('source')
        branch.Branch.hooks.install_named_hook(
            'post_push', self.capture_post_push_hook, None)
        source.push(target)
        # with nothing there we should still get a notification, and
        # have both branches locked at the notification time.
        self.assertEqual([
            ('post_push', source, None, target.base, 0, revision.NULL_REVISION,
             0, revision.NULL_REVISION, True, None, True)
            ],
            self.hook_calls)

    def test_post_push_bound_branch(self):
        # pushing to a bound branch should pass in the master branch to the
        # hook, allowing the correct number of emails to be sent, while still
        # allowing hooks that want to modify the target to do so to both
        # instances.
        target = self.make_branch('target')
        local = self.make_branch('local')
        try:
            local.bind(target)
        except errors.UpgradeRequired:
            # We can't bind this format to itself- typically it is the local
            # branch that doesn't support binding.  As of May 2007
            # remotebranches can't be bound.  Let's instead make a new local
            # branch of the default type, which does allow binding.
            # See https://bugs.launchpad.net/bzr/+bug/112020
            local = bzrdir.BzrDir.create_branch_convenience('local2')
            local.bind(target)
        source = self.make_branch('source')
        branch.Branch.hooks.install_named_hook(
            'post_push', self.capture_post_push_hook, None)
        source.push(local)
        # with nothing there we should still get a notification, and
        # have both branches locked at the notification time.
        self.assertEqual([
            ('post_push', source, local.base, target.base, 0,
             revision.NULL_REVISION, 0, revision.NULL_REVISION,
             True, True, True)
            ],
            self.hook_calls)

    def test_post_push_nonempty_history(self):
        target = self.make_branch_and_memory_tree('target')
        target.lock_write()
        target.add('')
        rev1 = target.commit('rev 1')
        target.unlock()
        sourcedir = target.bzrdir.clone(self.get_url('source'))
        source = memorytree.MemoryTree.create_on_branch(sourcedir.open_branch())
        rev2 = source.commit('rev 2')
        branch.Branch.hooks.install_named_hook(
            'post_push', self.capture_post_push_hook, None)
        source.branch.push(target.branch)
        # with nothing there we should still get a notification, and
        # have both branches locked at the notification time.
        self.assertEqual([
            ('post_push', source.branch, None, target.branch.base, 1, rev1,
             2, rev2, True, None, True)
            ],
            self.hook_calls)


class EmptyPushSmartEffortTests(per_branch.TestCaseWithBranch):
    """Tests that a push of 0 revisions should make a limited number of smart
    protocol RPCs.
    """

    def setUp(self):
        # Skip some scenarios that don't apply to these tests.
        if (self.transport_server is not None and
            issubclass(self.transport_server, server.SmartTCPServer)):
            raise tests.TestNotApplicable(
                'Does not apply when remote backing branch is also '
                'a smart branch')
        if isinstance(self.branch_format, branch.BzrBranchFormat4):
            raise tests.TestNotApplicable(
                'Branch format 4 is not usable via HPSS.')
        super(EmptyPushSmartEffortTests, self).setUp()
        # Create a smart server that publishes whatever the backing VFS server
        # does.
        self.smart_server = test_server.SmartTCPServer_for_testing()
        self.start_server(self.smart_server, self.get_server())
        # Make two empty branches, 'empty' and 'target'.
        self.empty_branch = self.make_branch('empty')
        self.make_branch('target')
        # Log all HPSS calls into self.hpss_calls.
        client._SmartClient.hooks.install_named_hook(
            'call', self.capture_hpss_call, None)
        self.hpss_calls = []

    def capture_hpss_call(self, params):
        self.hpss_calls.append(params.method)

    def test_empty_branch_api(self):
        """The branch_obj.push API should make a limited number of HPSS calls.
        """
        t = transport.get_transport(self.smart_server.get_url()).clone('target')
        target = branch.Branch.open_from_transport(t)
        self.empty_branch.push(target)
        self.assertEqual(
            ['BzrDir.open_2.1',
             'BzrDir.open_branchV3',
             'BzrDir.find_repositoryV3',
             'Branch.get_stacked_on_url',
             'Branch.lock_write',
             'Branch.last_revision_info',
             'Branch.unlock'],
            self.hpss_calls)

    def test_empty_branch_command(self):
        """The 'bzr push' command should make a limited number of HPSS calls.
        """
        cmd = builtins.cmd_push()
        cmd.outf = tests.StringIOWrapper()
        cmd.run(
            directory=self.get_url('empty'),
            location=self.smart_server.get_url() + 'target')
        # HPSS calls as of 2008/09/22:
        # [BzrDir.open, BzrDir.open_branch, BzrDir.find_repositoryV2,
        # Branch.get_stacked_on_url, get, get, Branch.lock_write,
        # Branch.last_revision_info, Branch.unlock]
        self.assertTrue(len(self.hpss_calls) <= 9, self.hpss_calls)


class TestLossyPush(per_branch.TestCaseWithBranch):

    def setUp(self):
        self.hook_calls = []
        super(TestLossyPush, self).setUp()

    def test_lossy_push_raises_same_vcs(self):
        target = self.make_branch('target')
        source = self.make_branch('source')
        self.assertRaises(errors.LossyPushToSameVCS, source.lossy_push, target)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.