test_cachedb.py :  » Network » Torrent-Swapper » swapper » test » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Network » Torrent Swapper 
Torrent Swapper » swapper » test » test_cachedb.py
import os
import tempfile
import unittest
import socket
import shutil

from copy import deepcopy
from sets import Set

import Swapper.CacheDB.cachedb as cachedb

class TestBasicDB(unittest.TestCase):
    
    def setUp(self):
        self.dirname = tempfile.mkdtemp()
        self.d = cachedb.BasicDB(self.dirname)

    def tearDown(self):
        self.d.close()
        try:
            shutil.rmtree(self.dirname)
        except Exception, msg:
            print "tearDown problem:",Exception, msg, self.dirname, "not removed"
        
    def test_basic(self):
        
        # test _put, _get, _has_key, _size
        self.d._put('a', 123)
        assert self.d._has_key('a')
        assert not self.d._has_key('b')
        assert self.d._get('a') == 123
        self.d._put('c', {'1':1,'2':2})
        assert self.d._size() == 2
        
        # test _delete
        self.d._put('b', 222)
        assert self.d._has_key('b')
        self.d._delete('b')
        assert not self.d._has_key('b')

        # test _updateItem
        self.d._updateItem('a', 123)    # insert
        assert self.d._get('a') == 123, self.d._items()
        self.d._updateItem('a', 321)
        assert self.d._get('a') == 321, self.d._get('a')
        self.d._updateItem('c', {'2':22, '3':3})
        assert self.d._get('c') == {'1':1, '2':22, '3':3}, self.d._get('c')
        
        # test setDefaultItem
        x = self.d.default_item
        y = self.d.setDefaultItem({})
        assert y == x, x
        z = {'a':123, 'd':3}
        y = self.d.setDefaultItem(z)
        x.update(z)
        assert y == x, y
        
    def test_sync(self):            # write data from mem to disk
        self.d._put('k', 10)
        #self.d.close() # del already closes db
        del self.d
        assert not hasattr(self, 'd')
        self.d = cachedb.BasicDB(self.dirname)
        x = self.d._get('k')
        assert x == 10, x


class TestMyDB(unittest.TestCase):
    
    def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(), 'testdb')
        self.name = socket.gethostname()
        myinfo = {'permid':'my_permid', 'ip':'1.2.3.4', 'name':self.name}
        self.d = cachedb.MyDB.getInstance(myinfo=myinfo, db_dir=self.dirname)
        self.d.initData(myinfo)
        
    def tearDown(self):
        self.d._clear()
    
    def test_initData(self):
        init = {
            'version':cachedb.curr_version, 
            'permid':'my_permid', 
            'ip':'1.2.3.4', 
            'port':0, 
            'name':self.name, 
            'torrent_path':'',
            'prefxchg_queue':[],
            'bootstrapping':1, 
            'max_num_torrents':100000,
            'max_num_my_preferences':1000,
            'superpeers':Set(),
            'friends':Set(),
        }
        
        assert self.d._size() == len(init), self.d._data
        y = {'ip':'1.2.3.4', 'permid':'permid 1'}    # only ip, permid, torrent_path can be updated
        init.update(y)
        init.update({'name':'1n;2_qfq2v@$'})
        self.d.initData(init)
        assert self.d._get('ip') == '1.2.3.4', self.d._get('ip')
        assert self.d._get('permid') == 'permid 1', self.d._get('permid')
        assert self.d._get('name') != '1n;2_qfq2v@$', self.d._get('name')
        
      
    def test_superpeers(self):
        
        # test addSuperPeer, deleteSuperPeer, isSuperPeer, getSuperPeers
        self.d.addSuperPeer('sp1')
        self.d.addSuperPeer('sp2')
        self.d.addSuperPeer('sp1')
        assert self.d.isSuperPeer('sp1'), self.d._data['superpeers']
        assert self.d.isSuperPeer('sp2'), self.d._data['superpeers']
        sp = self.d.getSuperPeers()
        assert isinstance(sp, list)
        assert Set(sp) == Set(['sp1', 'sp2'])
        self.d.deleteSuperPeer('sp2')
        assert not self.d.isSuperPeer('sp2')
        
    def test_friends(self):
        
        # test addFriend, isFriend, getFriends, deleteFriend
        self.d.addFriend('friend1')
        self.d.addFriend('friend2')
        self.d.addFriend('friend1')
        assert self.d.isFriend('friend1'), self.d._data['friends']
        assert self.d.isFriend('friend2'), self.d._data['friends']
        friends = self.d.getFriends()
        assert isinstance(friends, list)
        assert Set(friends) == Set(['friend1', 'friend2'])
        self.d.deleteFriend('friend2')
        assert not self.d.isFriend('friend2')


class TestPeerDB(unittest.TestCase):

    def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(), 'testdb')
        self.d = cachedb.PeerDB.getInstance(db_dir=self.dirname)
        
    def tearDown(self):
        self.d._clear()
        
    def test_peerdb(self):
        
        # test updateItem, getItem, deleteItem
        self.d.updateItem('peer1',update_time=False)
        assert self.d.getItem('peer1'), self.d.getItem('peer1')
        assert self.d.getItem('peer1') == self.d.default_item, self.d.getItem('peer1')
        
        x = {'ip':'1.2.3.4', 'port':3}
        y = deepcopy(self.d.default_item)
        y.update(x)
        self.d.updateItem('peer1', x, update_time=False)
        assert self.d.getItem('peer1') == y, self.d.getItem('peer1')
        
        x2 = {'port':7}
        y = deepcopy(self.d.default_item)
        y.update(x)
        y.update(x2)
        self.d.updateItem('peer1', x2, update_time=False)
        assert self.d.getItem('peer1') == y, self.d.getItem('peer1')
        
        self.d.deleteItem('peer1')
        assert not self.d.getItem('peer1')
        
        
class TestTorrentDB(unittest.TestCase):
    
    def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(), 'testdb')
        self.d = cachedb.TorrentDB.getInstance(db_dir=self.dirname)
        self.d._clear()
        
    def tearDown(self):
        self.d._clear()
        
    def test_torrentdb(self):
        
        # test updateItem, getItem, deleteItem
        self.d.updateItem('torrent1')
        assert self.d.getItem('torrent1'), self.d.getItem('torrent1')
        assert self.d.getItem('torrent1') == self.d.default_item, self.d.getItem('torrent1')
        
        x = {'name':'torrent 1', 'relevance':32}
        y = deepcopy(self.d.default_item)
        y.update(x)
        self.d.updateItem('torrent1', x)
        assert self.d.getItem('torrent1') == y, self.d.getItem('torrent1')
        
        x2 = {'relevance':56}
        y = deepcopy(self.d.default_item)
        y.update(x)
        y.update(x2)
        self.d.updateItem('torrent1', x2)
        assert self.d.getItem('torrent1') == y, self.d.getItem('torrent1')
        
        self.d.deleteItem('torrent1')
        assert not self.d.getItem('torrent1')


class TestPreferenceDB(unittest.TestCase):
    
    def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(), 'testdb')
        self.d = cachedb.PreferenceDB.getInstance(db_dir=self.dirname)
        self.d._clear()
        
    def tearDown(self):
        self.d._clear()
        
    def test_prefdb(self):
        
        # test addPreference, getPreference, hasPreference, getItem
        self.d.addPreference('peer1', 'torrent1')
        self.d.addPreference('peer1', 'torrent2')
        assert self.d.hasPreference('peer1', 'torrent2')
        
        it1 = self.d.getItem('peer1')
        assert isinstance(it1, dict) and len(it1) == 2, it1
        
        self.d.addPreference('peer2', 'torrent3')
        pf2 = self.d.getPreference('peer2', 'torrent1')
        assert pf2 is None, pf2
        
        self.d.addPreference('peer2', 'torrent5')
        pf2 = self.d.getPreference('peer2', 'torrent5')
        assert pf2 == self.d.default_item, pf2

        pf2 = self.d.getPreference('peer2', 'torrent3')
        assert pf2 == self.d.default_item, pf2
        
        z = {'rank':3, 'relevance':5}
        self.d.addPreference('peer2', 'torrent3', z)
        x = deepcopy(self.d.default_item)
        x.update(z)
        pf2 = self.d.getPreference('peer2', 'torrent3')
        assert pf2 == x, pf2
        
        z2 = {'relevance':7}
        self.d.addPreference('peer2', 'torrent3', z2)
        x = deepcopy(self.d.default_item)
        x.update(z)
        x.update(z2)
        pf2 = self.d.getPreference('peer2', 'torrent3')
        assert pf2 == x, pf2
        
        # test deletePreference, deleteItem
        self.d.deletePreference('peer1', 'torrent2')
        assert not self.d.hasPreference('peer1', 'torrent2')
        self.d.deleteItem('peer2')
        assert not self.d.getItem('peer2')
        

class TestMyPreferenceDB(unittest.TestCase):
    
    def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(), 'testdb')
        self.d = cachedb.MyPreferenceDB.getInstance(db_dir=self.dirname)
        self.d._clear()
        
    def tearDown(self):
        self.d._clear()
        
    def test_myprefdb(self):
        
        # test updateItem, getItem, deleteItem
        self.d.updateItem('torrent1')
        assert self.d.getItem('torrent1'), self.d.getItem('torrent1')
        assert self.d.getItem('torrent1') == self.d.default_item, self.d.getItem('torrent1')
        
        x = {'name':'torrent 1', 'rank':3}
        y = deepcopy(self.d.default_item)
        y.update(x)
        self.d.updateItem('torrent1', x)
        item = self.d.getItem('torrent1')
        assert item['name'] == y['name'] and item['rank'] == y['rank'], self.d.getItem('torrent1')
        
        x2 = {'rank':'5'}
        y = deepcopy(self.d.default_item)
        y.update(x)
        y.update(x2)
        self.d.updateItem('torrent1', x2)
        item = self.d.getItem('torrent1')
        assert item['name'] == y['name'] and item['rank'] == y['rank'], self.d.getItem('torrent1')
                
        self.d.deleteItem('torrent1')
        assert not self.d.getItem('torrent1')

class TestOwnerDB(unittest.TestCase):
    
    def setUp(self):
        self.dirname = os.path.join(tempfile.gettempdir(), 'testdb')
        self.d = cachedb.OwnerDB.getInstance(db_dir=self.dirname)
        
    def tearDown(self):
        self.d._clear()
            
    def test_owner(self):
        # test addOwner, getOwner, isOwner, getItem
        self.d.addOwner('torrent1', 'peer1')
        self.d.addOwner('torrent1', 'peer2')
        assert self.d.isOwner('peer2', 'torrent1')
        it1 = self.d.getItem('torrent1')
        assert isinstance(it1, list)
        assert len(it1) == 2, len(it1)
        self.d.addOwner('torrent2', 'peer3')
        self.d.addOwner('torrent2', 'peer3')
        it2 = self.d.getItem('torrent2')
        assert len(it2) == 1, len(it2)
        
        # test deleteOwner, deleteItem
        self.d.deleteOwner('torrent2', 'peer3')
        assert not self.d.isOwner('peer3', 'torrent2')
        self.d.deleteItem('torrent1')
        assert not self.d.getItem('torrent1')
        

def test_suite():
    suite = unittest.TestSuite()
    suite.addTest(unittest.makeSuite(TestBasicDB))
    suite.addTest(unittest.makeSuite(TestMyDB))
    suite.addTest(unittest.makeSuite(TestPeerDB))
    suite.addTest(unittest.makeSuite(TestTorrentDB))
    suite.addTest(unittest.makeSuite(TestPreferenceDB))
    suite.addTest(unittest.makeSuite(TestMyPreferenceDB))
    suite.addTest(unittest.makeSuite(TestOwnerDB))
    
    return suite        
        
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.