catlib.py :  » Network » Python-Wikipedia-Robot-Framework » pywikipedia » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Network » Python Wikipedia Robot Framework 
Python Wikipedia Robot Framework » pywikipedia » catlib.py
#!/usr/bin/python
# -*- coding: utf-8  -*-
"""
Library to work with category pages on Wikipedia
"""
#
# (C) Rob W.W. Hooft, Andre Engels, 2004-2007
# (C) Daniel Herding, 2004-2007
# (C) Russell Blau, 2005
# (C) Cyde Weys, 2005-2007
# (C) Leonardo Gregianin, 2005-2007
#
# Distributed under the terms of the MIT license.
#
__version__ = '$Id: catlib.py 8175 2010-05-14 21:59:02Z amir $'
#
import re, time, urllib, query
import wikipedia
try:
    set # introduced in Python 2.4: faster and future
except NameError:
    # fallback solution for Python 2.3
    from sets import Set

msg_created_for_renaming = {
    'ar':u':   %s. : %s',
    'de':u'Bot: Verschoben von %s. Autoren: %s',
    'en':u'Robot: Moved from %s. Authors: %s',
    'es':u'Bot: Traslado desde %s. Autores: %s',
    'fa':u':  %s.:%s',
    'fi':u'Botti siirsi luokan %s. Muokkaajat: %s',
    'fr':u'Robot : dplac depuis %s. Auteurs: %s',
    'he':u':   %s. : %s',
    'ia':u'Robot: Transferite de %s. Autores: %s',
    'id':u'Bot: Memindahkan dari %s. Kontributor: %s',
    'it':u'Bot: Voce spostata da %s. Autori: %s',
    'ja': u': %s  %s',
    'ksh':u'Bot: hjeholldt von %s. Schriiver: %s',
    'nds':u'Kat-Bot: herschaven von %s. Schriever: %s',
    'nl':u'Bot: hernoemd van %s. Auteurs: %s',
    'nn':u'robot: flytta fr %s. Bidragsytarar: %s',
    'pl':u'Robot przenosi z %s. Autorzy: %s',
    'pt':u'Bot: Movido de %s. Autor: %s',
    'zh':u':  %s  %s',
    }

# some constants that are used internally
ARTICLE = 0
SUBCATEGORY = 1

def isCatTitle(title, site):
    return ':' in title and title[:title.index(':')] in site.category_namespaces()

def unique(l):
    """Given a list of hashable object, return an alphabetized unique list.
    """
    l=dict.fromkeys(l).keys()
    l.sort()
    return l


class Category(wikipedia.Page):
    """Subclass of Page that has some special tricks that only work for
       category: pages"""

    def __init__(self, site, title = None, insite = None, sortKey = None):
        wikipedia.Page.__init__(self, site = site, title = title, insite = insite, defaultNamespace = 14)
        self.sortKey = sortKey
        if self.namespace() != 14:
            raise ValueError(u'BUG: %s is not in the category namespace!' % title)
        self.completelyCached = False
        self.articleCache = []
        self.subcatCache = []

    def aslink(self, forceInterwiki=False, textlink=False, noInterwiki=False):
        """A string representation in the form of a link.

        This method is different from Page.aslink() as the sortkey may have
        to be included.

        """
        if self.sortKey:
            titleWithSortKey = '%s|%s' % (self.title(savetitle=True), self.sortKey)
        else:
            titleWithSortKey = self.title(savetitle=True)
        if not noInterwiki and (forceInterwiki
                                or self.site() != wikipedia.getSite()):
            if self.site().family != wikipedia.getSite().family \
                    and self.site().family.name != self.site().lang:
                return '[[%s:%s:%s]]' % (self.site().family.name,
                                         self.site().lang, self.title(savetitle=True))
            else:
                return '[[%s:%s]]' % (self.site().lang, self.title(savetitle=True))
        elif textlink:
            return '[[:%s]]' % self.title(savetitle=True)
        else:
            return '[[%s]]' % titleWithSortKey

    def _getAndCacheContents(self, recurse=False, purge=False, startFrom=None, cache=None):
        """
        Cache results of _parseCategory for a second call.

        If recurse is a bool, and value is True, then recursively retrieves
        contents of all subcategories without limit. If recurse is an int,
        recursively retrieves contents of subcategories to that depth only.

        Other parameters are analogous to _parseCategory(). If purge is True,
        cached results will be discarded. If startFrom is used, nothing
        will be cached.

        This should not be used outside of this module.
        """
        if cache is None:
            cache = []
        if purge:
            self.completelyCached = False
        if recurse:
            if type(recurse) is int:
                newrecurse = recurse - 1
            else:
                newrecurse = recurse
        if self.completelyCached:
            for article in self.articleCache:
                if article not in cache:
                    cache.append(article)
                    yield ARTICLE, article
            for subcat in self.subcatCache:
                if subcat not in cache:
                    cache.append(subcat)
                    yield SUBCATEGORY, subcat
                    if recurse:
                        # contents of subcategory are cached by calling
                        # this method recursively; therefore, do not cache
                        # them again
                        for item in subcat._getAndCacheContents(newrecurse, purge, cache=cache):
                            yield item
        else:
            for tag, page in self._parseCategory(purge, startFrom):
                if tag == ARTICLE:
                    self.articleCache.append(page)
                    if not page in cache:
                        cache.append(page)
                        yield ARTICLE, page
                elif tag == SUBCATEGORY:
                    self.subcatCache.append(page)
                    if not page in cache:
                        cache.append(page)
                        yield SUBCATEGORY, page
                        if recurse:
                            # contents of subcategory are cached by calling
                            # this method recursively; therefore, do not cache
                            # them again
                            for item in page._getAndCacheContents(newrecurse, purge, cache=cache):
                                yield item
            if not startFrom:
                self.completelyCached = True

    def _getContentsNaive(self, recurse=False, startFrom=None):
        """
        Simple category content yielder. Naive, do not attempts to
        cache anything
        """
        for tag, page in self._parseCategory(startFrom=startFrom):
            yield tag, page
            if tag == SUBCATEGORY and recurse:
                for item in page._getContentsNaive(recurse=True):
                    yield item

    def _parseCategory(self, purge=False, startFrom=None):
        """
        Yields all articles and subcategories that are in this category by API.

        Set startFrom to a string which is the title of the page to start from.

        Yielded results are tuples in the form (tag, page) where tag is one
        of the constants ARTICLE and SUBCATEGORY, and title is the Page or Category
        object.

        Note that results of this method need not be unique.

        This should not be used outside of this module.
        """
        if not self.site().has_api() or self.site().versionnumber() < 11:
            for tag, page in self._oldParseCategory(purge, startFrom):
                yield tag, page
            return
        
        currentPageOffset = None
        params = {
            'action': 'query',
            'list': 'categorymembers',
            'cmtitle': self.title(),
            'cmprop': ['title', 'ids', 'sortkey', 'timestamp'],
            #'': '',
        }
        while True:
            if wikipedia.config.special_page_limit > 500:
                params['cmlimit'] = 500
            else:
                params['cmlimit'] = wikipedia.config.special_page_limit

            if currentPageOffset:
                params['cmcontinue'] = currentPageOffset
                wikipedia.output('Getting [[%s]] list from %s...'
                                 % (self.title(), currentPageOffset[:-1])) # cmcontinue last key is '|'
            elif startFrom:
                params['cmstartsortkey'] = startFrom
                wikipedia.output('Getting [[%s]] list starting at %s...'
                                 % (self.title(), startFrom))
            else:
                wikipedia.output('Getting [[%s]]...' % self.title())

            wikipedia.get_throttle()
            data = query.GetData(params, self.site())
            if 'error' in data:
                raise RuntimeError("%s" % data['error'])
            count = 0

            for memb in data['query']['categorymembers']:
                count += 1
                # For MediaWiki versions where subcats look like articles
                if memb['ns'] == 14:
                    yield SUBCATEGORY, Category(self.site(), memb['title'], sortKey=memb['sortkey'])
                elif memb['ns'] == 6:
                    yield ARTICLE, wikipedia.ImagePage(self.site(), memb['title'])
                else:
                    yield ARTICLE, wikipedia.Page(self.site(), memb['title'], defaultNamespace=memb['ns'])
                if count >= params['cmlimit']:
                    break
            # try to find a link to the next list page
            if 'query-continue' in data and count < params['cmlimit']:
                currentPageOffset = data['query-continue']['categorymembers']['cmcontinue']
            else:
                break

    def _oldParseCategory(self, purge=False, startFrom=None):
        """
        Yields all articles and subcategories that are in this category.

        Set purge to True to instruct MediaWiki not to serve a cached version.

        Set startFrom to a string which is the title of the page to start from.

        Yielded results are tuples in the form (tag, page) where tag is one
        of the constants ARTICLE and SUBCATEGORY, and title is the Page or Category
        object.

        Note that results of this method need not be unique.

        This should not be used outside of this module.
        """
        if self.site().versionnumber() < 4:
            Rtitle = re.compile('title\s?=\s?\"([^\"]*)\"')
        elif self.site().versionnumber() < 8:
            # FIXME seems to parse all links
            Rtitle = re.compile('/\S*(?: title\s?=\s?)?\"([^\"]*)\"')
        else:
            Rtitle = re.compile(
            '<li>(?:<span.*?>)?<a href=\".*?\"\s?title\s?=\s?\"([^\"]*)\"\>\+?[^\<\+]')
        if self.site().versionnumber() < 8:
            Rsubcat = None
            Rimage = None
        else:
            Rsubcat = re.compile(
                'CategoryTreeLabelCategory\"\s?href=\".+?\">(.+?)</a>')
            Rimage = re.compile(
                '<div class\s?=\s?\"thumb\"\sstyle=\"[^\"]*\">(?:<div style=\"[^\"]*\">)?<a href=\".*?\"(?:\sclass="image")?\stitle\s?=\s?\"([^\"]*)\"')
        ns = self.site().category_namespaces()
        # regular expression matching the "(next 200)" link
        RLinkToNextPage = re.compile('&amp;from=(.*?)" title="');

        if startFrom:
            currentPageOffset = urllib.quote(startFrom.encode(self.site().encoding()))
        else:
            currentPageOffset = None
        while True:
            path = self.site().get_address(self.urlname())
            if purge:
                path += '&action=purge'
            if currentPageOffset:
                path += '&from=' + currentPageOffset
                wikipedia.output('Getting [[%s]] starting at %s...'
                                 % (self.title(), wikipedia.url2link(currentPageOffset, self.site(), self.site())))
            else:
                wikipedia.output('Getting [[%s]]...' % self.title())
            wikipedia.get_throttle()
            txt = self.site().getUrl(path)
            # index where subcategory listing begins
            if self.site().versionnumber() >= 9:
                # These IDs were introduced in 1.9
                if '<div id="mw-subcategories">' in txt:
                    ibegin = txt.index('<div id="mw-subcategories">')
                elif '<div id="mw-pages">' in txt:
                    ibegin = txt.index('<div id="mw-pages">')
                elif '<div id="mw-category-media">' in txt:
                    ibegin = txt.index('<div id="mw-category-media">')
                else:
                    # No pages
                    return
            else:
                ibegin = txt.index('<!-- start content -->') # does not work for cats without text
                # TODO: This parses category text and may think they are
                # pages in category! Check for versions before 1.9
            # index where article listing ends
            if '<div class="printfooter">' in txt:
                iend = txt.index('<div class="printfooter">')
            elif '<div class="catlinks">' in txt:
                iend = txt.index('<div class="catlinks">')
            else:
                iend = txt.index('<!-- end content -->')
            txt = txt[ibegin:iend]
            for title in Rtitle.findall(txt):
                if title == self.title():
                    # This is only a link to "previous 200" or "next 200".
                    # Ignore it.
                    pass
                # For MediaWiki versions where subcats look like articles
                elif isCatTitle(title, self.site()):
                    yield SUBCATEGORY, Category(self.site(), title)
                else:
                    yield ARTICLE, wikipedia.Page(self.site(), title)
            if Rsubcat:
                # For MediaWiki versions where subcats look differently
                for titleWithoutNamespace in Rsubcat.findall(txt):
                    title = 'Category:%s' % titleWithoutNamespace
                    yield SUBCATEGORY, Category(self.site(), title)
            if Rimage:
                # For MediaWiki versions where images work through galleries
                for title in Rimage.findall(txt):
                    # In some MediaWiki versions, the titles contain the namespace,
                    # but they don't in other (newer) versions. Use the ImagePage's
                    # defaultNamespace feature to get everything correctly.
                    yield ARTICLE, wikipedia.ImagePage(self.site(), title)
            # try to find a link to the next list page
            matchObj = RLinkToNextPage.search(txt)
            if matchObj:
                currentPageOffset = matchObj.group(1)
            else:
                break

    def subcategories(self, recurse=False, startFrom=None, cacheResults=False):
        """
        Yields all subcategories of the current category.

        If recurse is True, also yields subcategories of the subcategories.
        If recurse is a number, also yields subcategories of subcategories,
        but only at most that number of levels deep (that is, recurse = 0 is
        equivalent to recurse = False, recurse = 1 gives first-level
        subcategories of subcategories but no deeper, etcetera).

        cacheResults - cache the category contents: useful if you need to 
        do several passes on the category members list. The simple cache
        system is *not* meant to be memory or cpu efficient for large
        categories

        Results a sorted (as sorted by MediaWiki), but need not be unique.
        """
        if cacheResults:
            gen = self._getAndCacheContents
        else:
            gen = self._getContentsNaive
        for tag, subcat in gen(recurse=recurse, startFrom=startFrom):
            if tag == SUBCATEGORY:
                yield subcat

    def subcategoriesList(self, recurse=False):
        """
        Creates a list of all subcategories of the current category.

        If recurse is True, also return subcategories of the subcategories.
        Recurse can also be a number, as explained above.

        The elements of the returned list are sorted and unique.
        """
        subcats = []
        for cat in self.subcategories(recurse):
            subcats.append(cat)
        return unique(subcats)

    def articles(self, recurse=False, startFrom=None, cacheResults=False):
        """
        Yields all articles of the current category.

        If recurse is True, also yields articles of the subcategories.
        Recurse can be a number to restrict the depth at which subcategories
        are included.

        cacheResults - cache the category contents: useful if you need to 
        do several passes on the category members list. The simple cache
        system is *not* meant to be memory or cpu efficient for large
        categories

        Results are unsorted (except as sorted by MediaWiki), and need not
        be unique.
        """
        if cacheResults:
            gen = self._getAndCacheContents
        else:
            gen = self._getContentsNaive
        for tag, page in gen(recurse=recurse, startFrom=startFrom):
            if tag == ARTICLE:
                yield page

    def articlesList(self, recurse=False):
        """
        Creates a list of all articles of the current category.

        If recurse is True, also return articles of the subcategories.
        Recurse can be a number to restrict the depth at which subcategories
        are included.

        The elements of the returned list are sorted and unique.
        """
        articles = []
        for article in self.articles(recurse):
            articles.append(article)
        return unique(articles)

    def supercategories(self):
        """
        Yields all supercategories of the current category.

        Results are stored in the order in which they were entered, and need
        not be unique.
        """
        for supercat in self.categories():
            yield supercat

    def supercategoriesList(self):
        """
        Creates a list of all supercategories of the current category.

        The elements of the returned list are sorted and unique.
        """
        supercats = []
        for cat in self.supercategories():
            supercats.append(cat)
        return unique(supercats)

    def isEmpty(self):
        # TODO: rename; naming conflict with Page.isEmpty
        for tag, title in self._parseCategory():
            return False
        return True

    def copyTo(self, catname):
        """
        Returns true if copying was successful, false if target page already
        existed.
        """
        catname = self.site().category_namespace() + ':' + catname
        targetCat = wikipedia.Page(self.site(), catname)
        if targetCat.exists():
            wikipedia.output('Target page %s already exists!' % targetCat.title())
            return False
        else:
            wikipedia.output('Moving text from %s to %s.' % (self.title(), targetCat.title()))
            authors = ', '.join(self.contributingUsers())
            creationSummary = wikipedia.translate(wikipedia.getSite(), msg_created_for_renaming) % (self.title(), authors)
            targetCat.put(self.get(), creationSummary)
            return True

    #Like copyTo above, except this removes a list of templates (like deletion templates) that appear in
    #the old category text.  It also removes all text between the two HTML comments BEGIN CFD TEMPLATE
    #and END CFD TEMPLATE. (This is to deal with CFD templates that are substituted.)
    def copyAndKeep(self, catname, cfdTemplates):
        """
        Returns true if copying was successful, false if target page already
        existed.
        """
        targetCat = wikipedia.Page(self.site(), catname, defaultNamespace=14)
        if targetCat.exists():
            wikipedia.output('Target page %s already exists!' % targetCat.title())
            return False
        else:
            wikipedia.output('Moving text from %s to %s.' % (self.title(), targetCat.title()))
            authors = ', '.join(self.contributingUsers())
            creationSummary = wikipedia.translate(wikipedia.getSite(), msg_created_for_renaming) % (self.title(), authors)
            newtext = self.get()
        for regexName in cfdTemplates:
            matchcfd = re.compile(r"{{%s.*?}}" % regexName, re.IGNORECASE)
            newtext = matchcfd.sub('',newtext)
            matchcomment = re.compile(r"<!--BEGIN CFD TEMPLATE-->.*<!--END CFD TEMPLATE-->", re.IGNORECASE | re.MULTILINE | re.DOTALL)
            newtext = matchcomment.sub('',newtext)
            pos = 0
            while (newtext[pos:pos+1] == "\n"):
                pos = pos + 1
            newtext = newtext[pos:]
        targetCat.put(newtext, creationSummary)
        return True

#def Category(code, name):
#    """Factory method to create category link objects from the category name"""
#    # Standardized namespace
#    ns = wikipedia.getSite().category_namespaces()[0]
#    # Prepend it
#    return Category(code, "%s:%s" % (ns, name))

def change_category(article, oldCat, newCat, comment=None, sortKey=None, inPlace=False):
    """
    Given an article which is in category oldCat, moves it to
    category newCat. Moves subcategories of oldCat as well.
    oldCat and newCat should be Category objects.
    If newCat is None, the category will be removed.
    """
    cats = article.categories(get_redirect=True)
    site = article.site()
    changesMade = False

    if not article.canBeEdited():
        wikipedia.output("Can't edit %s, skipping it..." % article.aslink())
        return False
    if inPlace == True:
        oldtext = article.get(get_redirect=True)
        newtext = wikipedia.replaceCategoryInPlace(oldtext, oldCat, newCat)
        if newtext == oldtext:
            wikipedia.output(
                u'No changes in made in page %s.' % article.aslink())
            return False
        try:
            article.put(newtext, comment)
            return True
        except wikipedia.EditConflict:
            wikipedia.output(
                u'Skipping %s because of edit conflict' % article.aslink())
        except wikipedia.LockedPage:
            wikipedia.output(u'Skipping locked page %s' % article.aslink())
        except wikipedia.SpamfilterError, error:
            wikipedia.output(
                u'Changing page %s blocked by spam filter (URL=%s)'
                             % (article.aslink(), error.url))
        except wikipedia.NoUsername:
            wikipedia.output(
                u"Page %s not saved; sysop privileges required."
                             % article.aslink())
        except wikipedia.PageNotSaved, error:
            wikipedia.output(u"Saving page %s failed: %s"
                             % (article.aslink(), error.message))
        return False

    # This loop will replace all occurrences of the category to be changed,
    # and remove duplicates.
    newCatList = []
    newCatSet = set()
    for i in range(len(cats)):
        cat = cats[i]
        if cat == oldCat:
            changesMade = True
            if not sortKey:
                sortKey = cat.sortKey
            if newCat:
                if newCat.title() not in newCatSet:
                    newCategory = Category(site, newCat.title(),
                                           sortKey=sortKey)
                    newCatSet.add(newCat.title())
                    newCatList.append(newCategory)
        elif cat.title() not in newCatSet:
            newCatSet.add(cat.title())
            newCatList.append(cat)

    if not changesMade:
        wikipedia.output(u'ERROR: %s is not in category %s!' % (article.aslink(), oldCat.title()))
    else:
        text = article.get(get_redirect=True)
        try:
            text = wikipedia.replaceCategoryLinks(text, newCatList)
        except ValueError:
            # Make sure that the only way replaceCategoryLinks() can return
            # a ValueError is in the case of interwiki links to self.
            wikipedia.output(
                    u'Skipping %s because of interwiki link to self' % article)
        try:
            article.put(text, comment)
        except wikipedia.EditConflict:
            wikipedia.output(
                    u'Skipping %s because of edit conflict' % article.title())
        except wikipedia.SpamfilterError, e:
            wikipedia.output(
                    u'Skipping %s because of blacklist entry %s'
                    % (article.title(), e.url))
        except wikipedia.LockedPage:
            wikipedia.output(
                    u'Skipping %s because page is locked' % article.title())
        except wikipedia.PageNotSaved, error:
            wikipedia.output(u"Saving page %s failed: %s"
                             % (article.aslink(), error.message))

def categoryAllElementsAPI(CatName, cmlimit = 5000, categories_parsed = [], site = None):
    #action=query&list=categorymembers&cmlimit=500&cmtitle=Category:License_tags
    """
    Category to load all the elements in a category using the APIs. Limit: 5000 elements.
    """
    wikipedia.output("Loading %s..." % CatName)
    
    params = {
        'action'    :'query',
        'list'      :'categorymembers',
        'cmlimit'   :cmlimit,
        'cmtitle'   :CatName,
        }

    data = query.GetData(params, site, encodeTitle = False)
    categories_parsed.append(CatName)
    try:
        members = data['query']['categorymembers']
    except KeyError:
        if int(cmlimit) != 500:
            wikipedia.output(u'An Error occured, trying to reload the category.')
            return categoryAllElementsAPI(CatName, cmlimit = 500)
        else:
            raise wikipedia.Error(data)
    if len(members) == int(cmlimit):
        raise wikipedia.Error(u'The category selected has >= %s elements, limit reached.' % cmlimit)
    allmembers = members
    results = list()
    for subcat in members:
        ns = subcat['ns']
        pageid = subcat['pageid']
        title = subcat['title']
        if ns == 14:
            if title not in categories_parsed:
                categories_parsed.append(title)
                (results_part, categories_parsed) = categoryAllElementsAPI(title, 5000, categories_parsed)
                allmembers.extend(results_part)
    for member in allmembers:
        ns = member['ns']
        pageid = member['pageid']
        title = member['title']
        results.append(member)
    return (results, categories_parsed)

def categoryAllPageObjectsAPI(CatName, cmlimit = 5000, categories_parsed = [], site = None):
    """
    From a list of dictionaries, return a list of page objects.
    """
    final = list()
    if site == None:
        site = wikipedia.getSite()
    for element in categoryAllElementsAPI(CatName, cmlimit, categories_parsed, site)[0]:
        final.append(wikipedia.Page(site, element['title']))
    return final

def test():
    site = wikipedia.getSite()

    cat = Category(site, 'Category:Software')

    wikipedia.output(u'SUBCATEGORIES:')
    for subcat in cat.subcategories():
        wikipedia.output(subcat.title())
    wikipedia.output(u'\nARTICLES:')
    for article in cat.articles():
        wikipedia.output(article.title())

if __name__=="__main__":
    import sys
    for arg in sys.argv[1:]:
        wikipedia.output(u'Ignored argument: %s' % arg)
    test()
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.