threads.py :  » Network » FtpCube » ftpcube-0.5.1 » libftpcube » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Network » FtpCube 
FtpCube » ftpcube 0.5.1 » libftpcube » threads.py
"""
FtpCube
Copyright (C) Michael Gilfix

This file is part of FtpCube.

You should have received a file COPYING containing license terms
along with this program; if not, write to Michael Gilfix
(mgilfix@eecs.tufts.edu) for a copy.

This version of FtpCube is open source; you can redistribute it and/or
modify it under the terms listed in the file COPYING.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
"""

import dispatcher
import events
from logger import Logger
import messages
import protocol
import transports.loader
import utils

import wx

import exceptions
import time
import threading
import os

class ThreadException(exceptions.Exception):
    """Base class for thread-related exceptions."""

    def __init__(self, args=None):
        self.args = args

class ThreadManager:
    """Transfer thread manager

    This class manages all the different file transfer threads. A distinguished thread is
    labeled as the main thread and is used for the main browsing window. Interactions with
    threads occur through posting of custom events. The main thread is always kept as the
    first entry in the thread list."""

    def __init__(self):
        """Creates the thread manager instance with an empty list of threads."""
        self.threads = [ None ]

        # Listeners for thread manager events
        self.listeners = [ ]

    def getThreadList(self):
        """Returns the current list of threads being managed."""
        return self.threads

    def getThread(self, id):
        """Gets a thread with the specified ID."""
        try:
            return self.threads[id]
        except IndexError:
            return None

    def newMainThread(self, **kwargs):
        """Creates a new main browser thread and returns its thread ID."""
        self.threads[0] = BrowseThread(**kwargs)
        self.threads[0].setId(0)
        event = events.ThreadEvent(0, events.ThreadEvent.EVT_CREATE)
        self.notifyListeners(event)
        return 0

    def getMainThread(self):
        """Returns the main thread instance."""
        return self.threads[0]

    def removeMainThread(self):
        """Removes the main thread instance."""
        self.removeThread(0)

    def newTransferThread(self, **kwargs):
        """Creates a new transfer thread and returns its thread ID."""
        thread = TransferThread(**kwargs)
        try:
            # See if there's an empty thread entry
            id = self.threads[1:].index(None) + 1
            self.threads[id] = thread
        except:
            id = len(self.threads)
            self.threads.append(thread)
        if __debug__:
            print "Creating new transfer thread and setting id to: [%s]" %id
        event = events.ThreadEvent(id, events.ThreadEvent.EVT_CREATE)
        self.notifyListeners(event)
        return id

    def removeTransferThread(self, id):
        """Removes a transfer thread from the managed threads."""
        self.removeThread(self, id)

    def removeThread(self, id):
        """Removes the thread with the specified ID by setting its value to None."""
        try:
            self.threads[id] = None
        except:
            # Do nothing if we were given a bad ID
            pass
        event = events.ThreadEvent(id, events.ThreadEvent.EVT_DESTROY)
        self.notifyListeners(event)

    def addListener(self, widget):
        """Adds a listener for all thread events."""
        self.listeners.append(widget)

    def removeListener(self, widget):
        """Removes a thread event listener."""
        self.listeners.remove(widget)

    def notifyListeners(self, event):
        """Notifies each listener for a thread-related event."""
        for each in self.listeners:
            wx.PostEvent(each, event)

class ConnectionThread(wx.EvtHandler):
    """File transfer site connection thread.

    This class contains the base functionality for all threads that maintain a connection
    with the remote file transfer site, and initiate interactions with the remote site. The
    transport protocol used depends on the selected transport type. The interactions over
    the transport follow the protocol interface and are transparent to the connection thread.
    While the connection thread class does not implement a thread interface, each transport
    instance is expected to have its own dispatcher implementation with its own thread.

    Each connection thread has thread state associated with it for display in the control
    window. Thread state includes information about the underlying connection's activities,
    such as idleness, transfer progress, etc. Such state is meant to reflect the underlying
    connection state only and not the state of which operations are executing."""

    REMOTE_SEP = '/'

    def __init__(self, **kwargs):
        """Creates a new connection thread instance.

        This creates a new transport instance for each connection thread. The following
        keyword arguments are defined:
            'name' : The name of the connection thread
            'opts' : The connection options dictionary. If missing, will result in a
                     ThreadException
            'logger' : The logging instace to use for general logging
            'download_logger' : The logging instance to use for recording download activity
            'upload_logger' : The logging instance to use for recording upload activity
        """
        wx.EvtHandler.__init__(self)

        try:
            self.name = kwargs['name']
        except KeyError:
            self.name = None

        if not kwargs.has_key('opts'):
            raise ThreadException, _("Missing connection options field")
        self.opts = kwargs['opts']
        if self.opts is None or not isinstance(self.opts, dict):
            raise ThreadException, _("Invalid connection option")

        try:
            self.logger = kwargs['logger']
        except KeyError:
            self.logger = None
        try:
            self.download_logger = kwargs['download_logger']
        except KeyError:
            self.download_logger = None
        try:
            self.upload_logger = kwargs['upload_logger']
        except KeyError:
            self.upload_logger = None

        # Initialize instance variables
        self.id = None
        self.progress = (0, 0)
        self.start_time = None
        self.idle = 0
        self.idle_enable = True
        self.done = threading.Event()
        self.connect = None
        self.transfer_state = None

        # Register listeners for status events
        evt_registry = events.getEventRegistry()
        evt_registry.registerEventListener(events.EVT_THREAD_STATUS_TYPE, self)
        self.Bind(events.EVT_THREAD_STATUS, self.onThreadStatus)

    def setId(self, id):
        """Sets the connection thread ID."""
        self.id = id

    def getId(self):
        """Returns the ID for this connection thread."""
        return self.id

    def getName(self):
        """Returns the name of this connection thread."""
        return self.name

    def postEvent(self, kind, data=None):
        """Tells the thread manager to notify all listeners of this connection thread
        event.""" 
        event = events.ThreadEvent(self.id, kind, data=data)
        thread_mgr = utils.getAppThreadManager()
        thread_mgr.notifyListeners(event)

    def onThreadStatus(self, event):
        """Handles a thread status event."""
        if event.kind == events.ThreadStatusEvent.EVT_DESTROY_CONNECT:
            self.destroyConnection()
        elif event.kind == events.ThreadStatusEvent.EVT_UPDATE_PROGRESS:
            cur, max = event.data
            self.updateProgress(cur, max)
        elif event.kind == events.ThreadStatusEvent.EVT_FINISH_TRANSFER:
            event = event.data
            self.finishTransferEvent(event)

    def initiateConnect(self):
        """Initiates an outbound connection to a remote file transfer site."""
        self.postEvent(events.ThreadEvent.EVT_CONNECT)

        transport = self.opts['transport']
        try:
            self.connect = transports.loader.createProtocolInstance(transport,
                host=self.opts['host'], port=self.opts['port'], logger=self.logger)
        except protocol.ProtocolException, strerror:
            raise ThreadException, strerror

        self.connect.initiateConnect(err_handler=self.getConnectErrorHandler())
        self.connect.getWelcomeMessage(err_handler=self.getConnectErrorHandler())
        self.connect.login(self.opts['username'], self.opts['password'],
            err_handler=self.getConnectErrorHandler())

    def getConnectErrorHandler(self):
        """Returns a connection initiation error handler object."""
        handler = ConnectErrorHandler(self, self.logger)
        return handler

    def destroyConnection(self):
        """Destroys the current connection."""
        if self.connect:
            self.connect.destroy()

    def finished(self):
        """Thread-safe: Indicates whether this connection thread is finished with its
        current task."""
        return self.done.isSet()

    def setDone(self):
        """Thread-safe: Indicates that this connection thread is done."""
        self.done.set()

    def isTransfering(self):
        """Indicates whether this thread is actively performing a file transfer."""
        cur, total = self.progress
        return cur != 0 and cur != total

    def getConnection(self):
        """Returns the underlying protocol connection object."""
        return self.connect

    def getOptions(self):
        """Returns a dictionary of connection options for this thread.

        The dictionary returned in a copy of the connection thread dictionary."""
        opts = { }
        opts.update(self.opts)
        return opts

    def getHost(self):
        """Gets the host to which the connection thread is connected."""
        return self.opts['host']

    def getPort(self):
        """Returns the port to which the connection thread is connected."""
        return self.opts['port']

    def getTransport(self):
        """Returns the transport to which this thread is connected."""
        return self.opts['transport']

    def getPassive(self):
        """Returns a boolean indicating whether this connection is in passive mode."""
        return self.opts['passive']

    def getRetries(self):
        """Returns the number of retries for connection attempts."""
        return self.opts['retries']

    def getProgress(self):
        """Returns a tuple indicating the current byte count as the first element and the
        total byte count as the second element."""
        return self.progress

    def isIdleEnabled(self):
        """Returns a boolean indicating whether maintaining idle connections is enabled."""
        return self.idle_enable

    def setIdleEnabled(self, bool):
        """Sets a boolean indicating whether maintaining idle connections is enabled."""
        self.idle_enable = bool

    def getIdleTime(self):
        """Returns the amount of time this connection has been idle."""
        return self.idle

    def updateIdleTime(self):
        """Updates the amount of idle time by a second.

        When the idle timeout period is reached, a no-op is sent to prevent disconnecting
        by the remote site."""
        self.idle = self.idle + 1
        if (self.idle % self.opts['timeout']) == 0:
            self.connect.noop()

    def getTransferSpeed(self):
        """Returns the average transfer speed in KB."""
        elapsed = time.time() - self.start_time
        if elapsed:
            return (float(self.progress[0]) / float(elapsed)) / 1024 # In KB
        return 0

    def updateProgress(self, current, max):
        """Updates transfer progress with the current byte count and the maximum byte
        count."""
        self.progress = (current, max)

    def saveTransferState(self, event):
        """Processes a transfer queue event and saves the state of the transfer into internal
        structures."""
        if not isinstance(event, events.QueueEvent):
            raise TypeError, _("Expected Queue Event")
        self.transfer_state = event

    def loadTransferState(self):
        """Loads the current state surrounding a transfer event."""
        return self.transfer_state

    def getLocalFileSize(self, path):
        """Returns the size of a local file at the specified path.

        If the size cannot be determined (because the path is a directory, for example),
        then the None value is returned. Any OS related errors will be raised through an
        OSError exception."""
        file_size = None
        if os.path.exists(path) and os.path.isfile(path):
            try:
                file_size = os.path.getsize(path)
            except OSError, strerror:
                messages.displayErrorDialog(utils.getMainWindow(),
                    _("Couldn't obtain file size: %(err)s") %{ 'err' : strerror })
                raise
        return file_size

    def initiateTransfer(self, event, err_handler=None):
        """Initiates a file transfer with the remote connection.

        The transfer may be an upload or a download. The appropriate transfer type (binary
        vs. ASCII) will be chosen. This method will perform resuming of partially
        transferred files."""
        self.saveTransferState(event)
        self.cwd(event.remote_path)
        self.start_time = time.time()
        self.progress = (0, event.size)

        # For whether we need resuming
        rest = None
        file_size = None

        local_file = os.path.join(event.local_path, event.file)
        try:
            file_size = self.getLocalFileSize(local_file)
        except:
            return

        if event.direction == protocol.ProtocolInterface.DOWNLOAD:
            # Skip the file if it's exactly the same size. This means that a user must
            # delete a rogue file if file size matches.
            if file_size == event.size:
                return

            if file_size is not None and file_size < event.size:
                rest = file_size
                self.progress = (file_size, event.size)

            status = TransferStatus(self, event, self.progress[0], self.progress[1])
            if event.method == protocol.ProtocolInterface.BINARY:
                self.connect.retrieveBinary(event.file, event.local_path, status=status,
                    rest=rest, passive=self.getPassive(), err_handler=err_handler)
            else:
                self.connect.retrieveAscii(event.file, event.local_path, status=status,
                    passive=self.getPassive(), err_handler=err_handler)

            self.logTransfer(protocol.ProtocolInterface.DOWNLOAD, event.username, event.host,
                self.REMOTE_SEP.join([ event.remote_path, event.file ]),
                os.path.join(event.local_path, event.file), event.size)
        elif event.direction == protocol.ProtocolInterface.UPLOAD:
            if file_size is not None and event.size < file_size:
                rest = event.size
                self.progress = (event.size, file_size)

            status = TransferStatus(self, event, self.progress[0], self.progress[1])
            if event.method == protocol.ProtocolInterface.BINARY:
                self.connect.uploadBinary(event.file, event.local_path, status=status,
                    rest=rest, err_handler=err_handler)
            else:
                self.connect.uploadAscii(event.file, event.local_path, status=status,
                    err_handler=err_handler)

            self.logTransfer(protocol.ProtocolInterface.UPLOAD, event.username, event.host,
                os.path.join(event.local_path, event.file),
                self.REMOTE_SEP.join([ event.remote_path, event.file ]),
                event.size)
        else:
            raise TypeError, _("Invalid direction for FTP transfer: [%(dir)s") \
                %{ 'err' : event.direction }

    def finishTransferEvent(self, event):
        """Processes a finished transfer event.

        Unfinished transfers result in the generation of a failure event, which appears in
        the failure window. Successfully finished transfers result in the generation of
        either a download event or a refresh event for the remote window."""
        evt_registry = events.getEventRegistry()
        if event.direction == protocol.ProtocolInterface.DOWNLOAD:
            # Grab the file size so we can check if our transfer was completed. If not, it'll
            # go into the fialure window as an incomplete
            try:
                file_size = self.getLocalFileSize(os.path.join(event.local_path, event.file))
            except:
                file_size = None

            if file_size != event.size:
                opts = event.toHash()
                # Explicitly set the attempts to max attempts so this will appear in the
                # failure window
                opts['attempt'] = opts['max_attempts']
                opts['error'] = _("Transfer incomplete - File sizes do not match")
                event = events.FailureEvent(**opts)
            else:
                event = events.TransferEvent(**event.toHash())
            evt_registry.postEvent(event)

            # Refresh the local window if needed
            local_win = utils.getLocalWindow()
            if local_win.getDir() == event.local_path:
                win_evt = events.LocalWindowEvent(
                    events.LocalWindowEvent.EVT_LIST_REFRESH, event.remote_path)
                evt_registry.postEvent(win_evt)
        elif event.direction == protocol.ProtocolInterface.UPLOAD:
            event = events.TransferEvent(**event.toHash())
            evt_registry.postEvent(event)

            win_evt = events.RemoteWindowEvent(
                events.RemoteWindowEvent.EVT_LIST_UPDATE, event.remote_path)
            evt_registry.postEvent(event)

    def logTransfer(self, direction, user, host, src, dest, size):
        """Writes the transfer to the appropriate logger.

        Direction corresponds to a protocol interface constant direction."""
        msg = _("%(user)s@%(host)s: %(src)s -> %(dest)s [%(size)d bytes]\n") \
            %{ 'user' : user, 'host' : host, 'src' : src, 'dest' : dest,
               'size' : size }
        if direction == protocol.ProtocolInterface.DOWNLOAD and self.download_logger:
            self.download_logger.log(msg)
        elif direction == protocol.ProtocolInterface.UPLOAD and self.upload_logger:
            self.upload_logger.log(msg)

    def recurseDirectory(self, event, err_handler=None):
        """Processes a directory recursion event.

        Recursion causes the directory to be entered and the list of entries in the directory
        to be added to the transfer queue. This applies to both uploading and downloading.
        This event should only be called to recurse into a directory structure."""
        self.saveTransferState(event)
        self.postEvent(events.ThreadEvent.EVT_LIST)

        if event.remote_path == self.REMOTE_SEP:
            remote_path = event.remote_path + event.file
        else:
            remote_path = self.REMOTE_SEP.join([ event.remote_path, event.file ])

        local_path = os.path.join(event.local_path, event.file)
        if event.direction == protocol.ProtocolInterface.DOWNLOAD:
            if not os.path.exists(local_path):
                try:
                    os.makedirs(local_path)
                except OSError, strerror:
                    messages.displayErrorDialog(utils.getMainWindow(),
                        _("Error creating local directories: %(err)s") %{ 'err' : strerror })
                    return
            self.connect.cwd(remote_path)
            event_copy = events.EnqueueEvent(**event.toHash())
            event_copy.remote_path = remote_path
            event_copy.local_path = local_path
            status = DirectoryStatus(self, event_copy)
            self.connect.list(status=status, passive=self.getPassive())
        elif event.direction == protocol.ProtocolInterface.UPLOAD:
            self.connect.mkdir(remote_path)
            try:
                files = os.listdir(local_path)
            except OSError, strerror:
                messages.displayErrorDialog(utils.getMainWindow(),
                    _("Error reading directory: %(err)s") %{ 'err' : strerror })
                return
            for f in files:
                new_path = os.path.join(local_path, f)
                try:
                    size = os.path.getsize(new_path)
                except OSError, strerror:
                    messages.displayErrorDialog(utils.getMainWindow(),
                        _("Error obtaining file size for file %(file)s: %(err)s")
                        %{ 'file' : f, 'err' : strerror })
                    continue
                if os.path.isfile(new_path):
                    flag = 'f'
                elif os.path.isdir(new_path):
                    flag = 'd'
                elif os.path.islink(new_path):
                    flag = 'l'
                else:
                    flag = '-'
                new_event = events.EnqueueEvent(
                    file        = f,
                    host        = event.host,
                    port        = event.port,
                    username    = event.username,
                    password    = event.password,
                    size        = size,
                    flags       = flag,
                    remote_path = remote_path,
                    local_path  = local_path,
                    direction   = event.direction,
                    attempt     = 1,
                    max_attempts = event.max_attempts,
                    method      = event.method,
                    transport   = event.transport,
                )
                evt_registry = events.getEventRegistry()
                evt_registry.postEvent(new_event)
        else:
            raise TypeError, _("Invalid direction for transfer: [%(dir)s]") \
                %{ 'dir' : event.direction }

    def recurseLink(self, event):
        """Attempts to recurse on a remote link.

        It is unknown in advance whether links lead to remote directories or files. First an
        attempt is made to initiate a transfer. An error handler is provided to catch a
        directory except, after which an attempt is made to recurse into the directory."""
        class ErrHandler(dispatcher.ErrorHandler):
            def __init__(self, thread):
                self.thread = thread
            def handle(self, error, dispatcher):
                event = self.thread.loadTransferState()
                self.thread.recurseDirectory(event)

        err_handler = ErrHandler(self)
        self.initiateTransfer(event, err_handler=err_handler)

    def noop(self, err_handler=None):
        """Sends a no-op through the remote connection."""
        self.idle = 0
        self.connect.noop(err_handler=err_handler)

    def cwd(self, dir, err_handler=None):
        """Changes the current working directory."""
        self.postEvent(events.ThreadEvent.EVT_CWD)
        self.idle = 0
        self.connect.cwd(dir, err_handler=err_handler)

    def list(self, err_handler=None):
        """Retrieves a remote listing."""
        self.postEvent(events.ThreadEvent.EVT_LIST)
        self.idle = 0
        list_handler = ListStatus()
        self.connect.list(status=list_handler, passive=self.getPassive(),
            err_handler=err_handler)

    def pwd(self, status=None, err_handler=None):
        """Gets the current working directory."""
        self.idle = 0
        self.connect.pwd(status=status, err_handler=err_handler)

    def rename(self, old_name, new_name, err_handler=None):
        """Peforms a remote renaming of a file or directory."""
        data = (old_name, new_name)
        self.postEvent(events.ThreadEvent.EVT_DELETE, data=data)
        self.idle = 0
        self.connect.rename(old_name, new_name, err_handler=err_handler)

    def delete(self, name, err_handler=None):
        """Deletes a remote file."""
        self.postEvent(events.ThreadEvent.EVT_DELETE)
        self.idle = 0
        self.connect.delete(name, err_handler=err_handler)

    def mkdir(self, name, err_handler=None):
        """Creates a new remote directory."""
        self.postEvent(events.ThreadEvent.EVT_MKDIR)
        self.idle = 0
        self.connect.mkdir(name, err_handler=err_handler)

    def rmdir(self, name, err_handler=None):
        """Removse a remote directory."""
        self.postEvent(events.ThreadEvent.EVT_RMDIR)
        self.idle = 0
        self.connect.rmdir(name, err_handler=err_handler)

    def chmod(self, name, perm, err_handler=None):
        """Changes a remote file's permissions."""
        self.postEvent(events.ThreadEvent.EVT_CHMOD)
        self.idle = 0
        self.connect.chmod(perm, name, err_handler=err_handler)

    def quit(self):
        """Quits the current file transfer session."""
        self.connect.quit()

    def abort(self):
        """Aborts the currently execution command."""
        if self.connect:
            self.connect.abort()

    def busy(self):
        """Returns true if a command is executing or there are queued commands for the
        protocol connection."""
        if self.connect is None:
            return False
        return self.connect.busy()

class BrowseThread(ConnectionThread):
    """Remote site browser thread.

    The remote browser thread acts as the main browsing thread for the remote site. It can
    also be used to transfer files. There will only be one instance of the browsing thread
    for navigating the remote site. This thread augments the connection thread with some
    additional capability for updating main UI elements with regards to browsing events."""

    def __init__(self, **kwargs):
        """Creates a new browse thread instance."""
        ConnectionThread.__init__(self, **kwargs)
        self.abort_connect = threading.Event()

    def onThreadStatus(self, event):
        """Handles thread status event and extends the base method with browse specific
        event kinds."""
        if event.kind == events.ThreadStatusEvent.EVT_RETRY_CONNECT:
            attempt = event.data
            self.retryConnection(attempt)
        else:
            ConnectionThread.onThreadStatus(self, event)

    def initiateConnect(self):
        """Initiates a connection to the remote site."""
        self.abort_connect.clear()
        self.attempt = 0
        ConnectionThread.initiateConnect(self)
        self.completeConnection()

    def completeConnection(self):
        """Performs UI accounting and sends commands to get the current directory and listing
        after the connection is completed."""
        # Change to appropriate local directory
        if self.opts['localdir']:
            evt = events.LocalWindowEvent(events.LocalWindowEvent.EVT_LIST_UPDATE,
                self.opts['localdir'])
            evt_registry = events.getEventRegistry()
            evt_registry.postEvent(evt)

        # Change to appropriate remote directory
        if self.opts['remotedir']:
            self.connect.cwd(self.opts['remotedir'])

        pwd_status = PwdStatus()
        self.connect.pwd(status=pwd_status)
        list_handler = ListStatus()
        self.connect.list(status=list_handler, passive=self.getPassive())

    def retryConnection(self, attempt):
        """Retries a connection attempt if the prior connection has failed."""
        self.attempt = attempt
        ConnectionThread.initiateConnect(self)
        # This will get wiped out of the queue if the handler is called.
        self.completeConnection()

    def abort(self):
        """Aborts the establishment of a connection or a current executing command."""
        if self.abort_connect.isSet():
            ConnectionThread.abort(self)
        else:
            self.abort_connect.set()

    def shouldAbortConnection(self):
        """Thread-safe: Returns true if the connection is in an aborted state."""
        return self.abort_connect.isSet() or self.finished()

    def getConnectErrorHandler(self):
        """Gets the error handler for establishing the connection."""
        err_handler = BrowseConnectErrorHandler(self, self.attempt, self.logger)
        return err_handler

    def updateIdleTime(self):
        """Updates the idle time counter."""
        ConnectionThread.updateIdleTime(self)

        # Check if it's time to give up and disconnect
        if self.idle == self.opts['main_idle']:
            main_win = utils.getMainWindow()
            main_win.onDisconnect(None)

    def initiateTransfer(self, event):
        """Initiates a transfer using the browser thread."""
        dispatcher = self.connect.getDispatcher()
        remote_win = utils.getRemoteWindow()
        old_dir = remote_win.getDir()
        ConnectionThread.initiateTransfer(self, event)
        self.connect.cwd(old_dir)

    def recurseDirectory(self, event):
        """Recurses through the current directory."""
        remote_win = utils.getRemoteWindow()
        old_dir = remote_win.getDir()
        ConnectionThread.recurseDirectory(self, event)
        self.connect.cwd(old_dir)

    def recurseLink(self, event):
        """Recurses through a link."""
        remote_win = utils.getRemoteWindow()
        old_dir = remote_win.getDir()
        ConnectionThread.recurseLink(self, event)
        self.connect.cwd(old_dir)

class TransferThread(ConnectionThread):
    """Transfer worker thread.

    This thread is not intended for main browsing, only for connecting and transferring.
    The thread is controlled entirely by the control window."""

    def initiateConnect(self):
        """Initiates an connection to the remote site."""
        err_handler = DefaultErrorHandler(self)
        try:
            ConnectionThread.initiateConnect(self)
        except Exception, strerror:
            err_handler.handle(strerror, None)
        dispatcher = self.connect.getDispatcher()
        dispatcher.setDefaultErrorHandler(err_handler)

    def updateIdleTime(self):
        """Updates the idle time counter.

        This will mark the thread as finished its task once the thread idle time has been
        reached. At this point, the thread may be cleaned up."""
        ConnectionThread.updateIdleTime(self)

        if self.idle == self.opts['thread_idle']:
            self.setDone()

class PwdStatus(dispatcher.DispatchStatus):
    """Status object for getting the current directory.

    When the command to retrieve the current directory has finished executing, it will call
    back to this status object and provide the current directory path in the optional data
    attribute."""

    def __init_(self):
        dispatcher.DispatchStatus.__init__(self)

    def start(self):
        pass

    def update(self, update):
        pass

    def finished(self):
        """Gets the optional data containing the path and generates a UI event."""
        data = self.getOptionalData()
        event = events.RemoteWindowEvent(events.RemoteWindowEvent.EVT_CWD, data)
        evt_registry = events.getEventRegistry()
        evt_registry.postEvent(event)

class ListStatus(dispatcher.DispatchStatus):
    """Status object for getting directory listing.

    When the command to retrieve the directory listing has finished executing, the listing
    information will be added into the optional data attribute."""

    def __init__(self):
        dispatcher.DispatchStatus.__init__(self)

    def start(self):
        pass

    def update(self, update):
        pass

    def finished(self):
        """Gets the optional data containing the listing and generates a remote window UI
        event."""
        data = self.getOptionalData()
        event = events.RemoteWindowEvent(events.RemoteWindowEvent.EVT_LIST_UPDATE, data)
        evt_registry = events.getEventRegistry()
        evt_registry.postEvent(event)

class DefaultErrorHandler(dispatcher.ErrorHandler):
    """Default error handler class."""

    def __init__(self, thread):
        """Creates a new error handler instance and associates the error handler with the
        specified connection thread object."""
        dispatcher.ErrorHandler.__init__(self)
        self.thread = thread

    def handle(self, error, dispatcher):
        """Handles a command error.

        A transfer event failure results in a queue failure event being generated."""
        event = self.thread.loadTransferState()
        if isinstance(event, events.TransferEvent):
            opts = event.toHash()
            opts['error'] = error
            new_event = events.FailureEvent(**opts)
            evt_registry = events.getEventRegistry()
            evt_registry.postEvent(new_event)

class ConnectErrorHandler(DefaultErrorHandler):
    """Handles connection errors."""

    def __init__(self, thread, logger):
        """Creates a new connection error handler instance and associates the error handler
        with the specified connection thread object."""
        DefaultErrorHandler.__init__(self, thread)
        self.logger = logger

    def handle(self, error, dispatcher):
        """Processes a connection error handling event and destroys the current connection."""
        if self.logger:
            self.logger.log(Logger.ERROR, _("Thread connection error: %(err)s")
                %{ 'err' : error })
        DefaultErrorHandler.handle(self, error, dispatcher)
        event = events.ThreadStatusEvent(events.ThreadStatusEvent.EVT_DESTROY_CONNECT)
        wx.PostEvent(self.thread, event)

class BrowseConnectErrorHandler(dispatcher.ErrorHandler):
    """Handlers connection errors for the browser thread."""

    def __init__(self, thread, attempt, logger):
        """Creates a new browser connection error handler instance and associates the error
        handler with the specified connection thread object."""
        dispatcher.ErrorHandler.__init__(self)
        self.thread = thread
        self.opts = self.thread.getOptions()
        self.attempt = attempt
        self.logger = logger

    def handle(self, error, dispatcher):
        """Handles a browser connection error.

        This performs a retry to establish a connection provided that the maximum number of
        retries has not yet been met."""
        evt_registry = events.getEventRegistry()

        if self.attempt >= self.opts['retries']:
            if self.logger:
                self.logger.log(Logger.ERROR,
                    _("Maximum connection attempt reached. Aborting connection..."))
                self.cancelConnect()
                return

        delay = self.opts['delay']
        if self.logger:
            self.logger.log(Logger.ERROR,
                _("Waiting %(delay)s secs before re-attempting connection...")
                %{ 'delay' : delay })

        slept = 0
        while slept < delay:
            time.sleep(1)
            slept = slept + 1
            if self.thread.shouldAbortConnection():
                self.cancelConnect()
                break

        self.cancelConnect()
        if not self.thread.shouldAbortConnection():
            retry = self.attempt + 1
            event = events.ThreadStatusEvent(events.ThreadStatusEvent.EVT_RETRY_CONNECT,
                data=retry)
            evt_registry.postEvent(event)

    def cancelConnect(self):
        """Cancels the connection thread's current connection establishment."""
        event = events.ThreadStatusEvent(events.ThreadStatusEvent.EVT_DESTROY_CONNECT)
        wx.PostEvent(self.thread, event)

    def destroyConnect(self):
        """Destroys the connection thread's current connection."""
        self.cancelConnect()
        self.thread.setDone()

class TransferStatus(dispatcher.DispatchStatus):
    """Status object for getting status regarding transfers.

    This status object is called at the start of the transfer to initialize the progress
    tuple datastructure. The status object is then called for update of progress. When
    the transfer is completed, the finished method is called to finalize the transfer."""

    def __init__(self, thread, event, min, max):
        """Creates a new transfer status object and associates it with the specified
        connection thread."""
        dispatcher.DispatchStatus.__init__(self)
        self.thread = thread
        self.event = event
        self.progress = (min, max)

    def start(self):
        """Handles a start of a transfer status update."""
        self.postProgress(self.progress)  

    def update(self, update):
        """Handles a progress update for a transfer."""
        cur, max = self.progress
        cur = cur + update
        self.progress = (cur, max)
        self.postProgress(self.progress)

    def finished(self):
        event = events.ThreadStatusEvent(events.ThreadStatusEvent.EVT_FINISH_TRANSFER,
            data=self.event)
        wx.PostEvent(self.thread, event)

    def postProgress(self, progress):
        """Progress a progress update event."""
        event = events.ThreadStatusEvent(events.ThreadStatusEvent.EVT_UPDATE_PROGRESS,
            data=progress)
        wx.PostEvent(self.thread, event)

class DirectoryStatus(dispatcher.DispatchStatus):
    """Directory status object for getting status regarding directory listings.

    This status object is useful for transfer threads and recursion. It generates a queue
    event for event entry in the directory when the directory listing has completed."""

    def __init__(self, thread, event):
        """Creates a new directory status object and associates it with the specified
        connection thread."""
        dispatcher.DispatchStatus.__init__(self)
        self.thread = thread
        self.event = event

    def start(self):
        pass

    def update(self, update):
        pass

    def finished(self):
        """Called when the directory status listing has been completely retrieved.

        The listing is stored in the optional data field. An event is generated for each
        entry in the listing, with the exception of the special '.' and '..' entries."""
        listing = self.getOptionalData()
        files = [ ]
        remote_window = utils.getRemoteWindow()
        evt_registry = events.getEventRegistry()
        for file in listing:
            entry = remote_window.parseListEntry(file)
            if entry[0] in ('.', '..'):
                continue
            new_event = events.EnqueueEvent(
                file         = entry[0],
                host         = self.event.host,
                port         = self.event.port,
                username     = self.event.username,
                password     = self.event.password,
                size         = entry[1],
                flags        = entry[3][0],
                remote_path  = self.event.remote_path,
                local_path   = self.event.local_path,
                direction    = self.event.direction,
                attempt      = 1,
                max_attempts = self.event.max_attempts,
                method       = self.event.method,
                transport    = self.event.transport,
            )
            evt_registry.postEvent(new_event)
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.