"""
FtpCube
Copyright (C) Michael Gilfix
This file is part of FtpCube.
You should have received a file COPYING containing license terms
along with this program; if not, write to Michael Gilfix
(mgilfix@eecs.tufts.edu) for a copy.
This version of FtpCube is open source; you can redistribute it and/or
modify it under the terms listed in the file COPYING.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
"""
import dispatcher
import events
from logger import Logger
import messages
import protocol
import transports.loader
import utils
import wx
import exceptions
import time
import threading
import os
class ThreadException(exceptions.Exception):
"""Base class for thread-related exceptions."""
def __init__(self, args=None):
self.args = args
class ThreadManager:
"""Transfer thread manager
This class manages all the different file transfer threads. A distinguished thread is
labeled as the main thread and is used for the main browsing window. Interactions with
threads occur through posting of custom events. The main thread is always kept as the
first entry in the thread list."""
def __init__(self):
"""Creates the thread manager instance with an empty list of threads."""
self.threads = [ None ]
# Listeners for thread manager events
self.listeners = [ ]
def getThreadList(self):
"""Returns the current list of threads being managed."""
return self.threads
def getThread(self, id):
"""Gets a thread with the specified ID."""
try:
return self.threads[id]
except IndexError:
return None
def newMainThread(self, **kwargs):
"""Creates a new main browser thread and returns its thread ID."""
self.threads[0] = BrowseThread(**kwargs)
self.threads[0].setId(0)
event = events.ThreadEvent(0, events.ThreadEvent.EVT_CREATE)
self.notifyListeners(event)
return 0
def getMainThread(self):
"""Returns the main thread instance."""
return self.threads[0]
def removeMainThread(self):
"""Removes the main thread instance."""
self.removeThread(0)
def newTransferThread(self, **kwargs):
"""Creates a new transfer thread and returns its thread ID."""
thread = TransferThread(**kwargs)
try:
# See if there's an empty thread entry
id = self.threads[1:].index(None) + 1
self.threads[id] = thread
except:
id = len(self.threads)
self.threads.append(thread)
if __debug__:
print "Creating new transfer thread and setting id to: [%s]" %id
event = events.ThreadEvent(id, events.ThreadEvent.EVT_CREATE)
self.notifyListeners(event)
return id
def removeTransferThread(self, id):
"""Removes a transfer thread from the managed threads."""
self.removeThread(self, id)
def removeThread(self, id):
"""Removes the thread with the specified ID by setting its value to None."""
try:
self.threads[id] = None
except:
# Do nothing if we were given a bad ID
pass
event = events.ThreadEvent(id, events.ThreadEvent.EVT_DESTROY)
self.notifyListeners(event)
def addListener(self, widget):
"""Adds a listener for all thread events."""
self.listeners.append(widget)
def removeListener(self, widget):
"""Removes a thread event listener."""
self.listeners.remove(widget)
def notifyListeners(self, event):
"""Notifies each listener for a thread-related event."""
for each in self.listeners:
wx.PostEvent(each, event)
class ConnectionThread(wx.EvtHandler):
"""File transfer site connection thread.
This class contains the base functionality for all threads that maintain a connection
with the remote file transfer site, and initiate interactions with the remote site. The
transport protocol used depends on the selected transport type. The interactions over
the transport follow the protocol interface and are transparent to the connection thread.
While the connection thread class does not implement a thread interface, each transport
instance is expected to have its own dispatcher implementation with its own thread.
Each connection thread has thread state associated with it for display in the control
window. Thread state includes information about the underlying connection's activities,
such as idleness, transfer progress, etc. Such state is meant to reflect the underlying
connection state only and not the state of which operations are executing."""
REMOTE_SEP = '/'
def __init__(self, **kwargs):
"""Creates a new connection thread instance.
This creates a new transport instance for each connection thread. The following
keyword arguments are defined:
'name' : The name of the connection thread
'opts' : The connection options dictionary. If missing, will result in a
ThreadException
'logger' : The logging instace to use for general logging
'download_logger' : The logging instance to use for recording download activity
'upload_logger' : The logging instance to use for recording upload activity
"""
wx.EvtHandler.__init__(self)
try:
self.name = kwargs['name']
except KeyError:
self.name = None
if not kwargs.has_key('opts'):
raise ThreadException, _("Missing connection options field")
self.opts = kwargs['opts']
if self.opts is None or not isinstance(self.opts, dict):
raise ThreadException, _("Invalid connection option")
try:
self.logger = kwargs['logger']
except KeyError:
self.logger = None
try:
self.download_logger = kwargs['download_logger']
except KeyError:
self.download_logger = None
try:
self.upload_logger = kwargs['upload_logger']
except KeyError:
self.upload_logger = None
# Initialize instance variables
self.id = None
self.progress = (0, 0)
self.start_time = None
self.idle = 0
self.idle_enable = True
self.done = threading.Event()
self.connect = None
self.transfer_state = None
# Register listeners for status events
evt_registry = events.getEventRegistry()
evt_registry.registerEventListener(events.EVT_THREAD_STATUS_TYPE, self)
self.Bind(events.EVT_THREAD_STATUS, self.onThreadStatus)
def setId(self, id):
"""Sets the connection thread ID."""
self.id = id
def getId(self):
"""Returns the ID for this connection thread."""
return self.id
def getName(self):
"""Returns the name of this connection thread."""
return self.name
def postEvent(self, kind, data=None):
"""Tells the thread manager to notify all listeners of this connection thread
event."""
event = events.ThreadEvent(self.id, kind, data=data)
thread_mgr = utils.getAppThreadManager()
thread_mgr.notifyListeners(event)
def onThreadStatus(self, event):
"""Handles a thread status event."""
if event.kind == events.ThreadStatusEvent.EVT_DESTROY_CONNECT:
self.destroyConnection()
elif event.kind == events.ThreadStatusEvent.EVT_UPDATE_PROGRESS:
cur, max = event.data
self.updateProgress(cur, max)
elif event.kind == events.ThreadStatusEvent.EVT_FINISH_TRANSFER:
event = event.data
self.finishTransferEvent(event)
def initiateConnect(self):
"""Initiates an outbound connection to a remote file transfer site."""
self.postEvent(events.ThreadEvent.EVT_CONNECT)
transport = self.opts['transport']
try:
self.connect = transports.loader.createProtocolInstance(transport,
host=self.opts['host'], port=self.opts['port'], logger=self.logger)
except protocol.ProtocolException, strerror:
raise ThreadException, strerror
self.connect.initiateConnect(err_handler=self.getConnectErrorHandler())
self.connect.getWelcomeMessage(err_handler=self.getConnectErrorHandler())
self.connect.login(self.opts['username'], self.opts['password'],
err_handler=self.getConnectErrorHandler())
def getConnectErrorHandler(self):
"""Returns a connection initiation error handler object."""
handler = ConnectErrorHandler(self, self.logger)
return handler
def destroyConnection(self):
"""Destroys the current connection."""
if self.connect:
self.connect.destroy()
def finished(self):
"""Thread-safe: Indicates whether this connection thread is finished with its
current task."""
return self.done.isSet()
def setDone(self):
"""Thread-safe: Indicates that this connection thread is done."""
self.done.set()
def isTransfering(self):
"""Indicates whether this thread is actively performing a file transfer."""
cur, total = self.progress
return cur != 0 and cur != total
def getConnection(self):
"""Returns the underlying protocol connection object."""
return self.connect
def getOptions(self):
"""Returns a dictionary of connection options for this thread.
The dictionary returned in a copy of the connection thread dictionary."""
opts = { }
opts.update(self.opts)
return opts
def getHost(self):
"""Gets the host to which the connection thread is connected."""
return self.opts['host']
def getPort(self):
"""Returns the port to which the connection thread is connected."""
return self.opts['port']
def getTransport(self):
"""Returns the transport to which this thread is connected."""
return self.opts['transport']
def getPassive(self):
"""Returns a boolean indicating whether this connection is in passive mode."""
return self.opts['passive']
def getRetries(self):
"""Returns the number of retries for connection attempts."""
return self.opts['retries']
def getProgress(self):
"""Returns a tuple indicating the current byte count as the first element and the
total byte count as the second element."""
return self.progress
def isIdleEnabled(self):
"""Returns a boolean indicating whether maintaining idle connections is enabled."""
return self.idle_enable
def setIdleEnabled(self, bool):
"""Sets a boolean indicating whether maintaining idle connections is enabled."""
self.idle_enable = bool
def getIdleTime(self):
"""Returns the amount of time this connection has been idle."""
return self.idle
def updateIdleTime(self):
"""Updates the amount of idle time by a second.
When the idle timeout period is reached, a no-op is sent to prevent disconnecting
by the remote site."""
self.idle = self.idle + 1
if (self.idle % self.opts['timeout']) == 0:
self.connect.noop()
def getTransferSpeed(self):
"""Returns the average transfer speed in KB."""
elapsed = time.time() - self.start_time
if elapsed:
return (float(self.progress[0]) / float(elapsed)) / 1024 # In KB
return 0
def updateProgress(self, current, max):
"""Updates transfer progress with the current byte count and the maximum byte
count."""
self.progress = (current, max)
def saveTransferState(self, event):
"""Processes a transfer queue event and saves the state of the transfer into internal
structures."""
if not isinstance(event, events.QueueEvent):
raise TypeError, _("Expected Queue Event")
self.transfer_state = event
def loadTransferState(self):
"""Loads the current state surrounding a transfer event."""
return self.transfer_state
def getLocalFileSize(self, path):
"""Returns the size of a local file at the specified path.
If the size cannot be determined (because the path is a directory, for example),
then the None value is returned. Any OS related errors will be raised through an
OSError exception."""
file_size = None
if os.path.exists(path) and os.path.isfile(path):
try:
file_size = os.path.getsize(path)
except OSError, strerror:
messages.displayErrorDialog(utils.getMainWindow(),
_("Couldn't obtain file size: %(err)s") %{ 'err' : strerror })
raise
return file_size
def initiateTransfer(self, event, err_handler=None):
"""Initiates a file transfer with the remote connection.
The transfer may be an upload or a download. The appropriate transfer type (binary
vs. ASCII) will be chosen. This method will perform resuming of partially
transferred files."""
self.saveTransferState(event)
self.cwd(event.remote_path)
self.start_time = time.time()
self.progress = (0, event.size)
# For whether we need resuming
rest = None
file_size = None
local_file = os.path.join(event.local_path, event.file)
try:
file_size = self.getLocalFileSize(local_file)
except:
return
if event.direction == protocol.ProtocolInterface.DOWNLOAD:
# Skip the file if it's exactly the same size. This means that a user must
# delete a rogue file if file size matches.
if file_size == event.size:
return
if file_size is not None and file_size < event.size:
rest = file_size
self.progress = (file_size, event.size)
status = TransferStatus(self, event, self.progress[0], self.progress[1])
if event.method == protocol.ProtocolInterface.BINARY:
self.connect.retrieveBinary(event.file, event.local_path, status=status,
rest=rest, passive=self.getPassive(), err_handler=err_handler)
else:
self.connect.retrieveAscii(event.file, event.local_path, status=status,
passive=self.getPassive(), err_handler=err_handler)
self.logTransfer(protocol.ProtocolInterface.DOWNLOAD, event.username, event.host,
self.REMOTE_SEP.join([ event.remote_path, event.file ]),
os.path.join(event.local_path, event.file), event.size)
elif event.direction == protocol.ProtocolInterface.UPLOAD:
if file_size is not None and event.size < file_size:
rest = event.size
self.progress = (event.size, file_size)
status = TransferStatus(self, event, self.progress[0], self.progress[1])
if event.method == protocol.ProtocolInterface.BINARY:
self.connect.uploadBinary(event.file, event.local_path, status=status,
rest=rest, err_handler=err_handler)
else:
self.connect.uploadAscii(event.file, event.local_path, status=status,
err_handler=err_handler)
self.logTransfer(protocol.ProtocolInterface.UPLOAD, event.username, event.host,
os.path.join(event.local_path, event.file),
self.REMOTE_SEP.join([ event.remote_path, event.file ]),
event.size)
else:
raise TypeError, _("Invalid direction for FTP transfer: [%(dir)s") \
%{ 'err' : event.direction }
def finishTransferEvent(self, event):
"""Processes a finished transfer event.
Unfinished transfers result in the generation of a failure event, which appears in
the failure window. Successfully finished transfers result in the generation of
either a download event or a refresh event for the remote window."""
evt_registry = events.getEventRegistry()
if event.direction == protocol.ProtocolInterface.DOWNLOAD:
# Grab the file size so we can check if our transfer was completed. If not, it'll
# go into the fialure window as an incomplete
try:
file_size = self.getLocalFileSize(os.path.join(event.local_path, event.file))
except:
file_size = None
if file_size != event.size:
opts = event.toHash()
# Explicitly set the attempts to max attempts so this will appear in the
# failure window
opts['attempt'] = opts['max_attempts']
opts['error'] = _("Transfer incomplete - File sizes do not match")
event = events.FailureEvent(**opts)
else:
event = events.TransferEvent(**event.toHash())
evt_registry.postEvent(event)
# Refresh the local window if needed
local_win = utils.getLocalWindow()
if local_win.getDir() == event.local_path:
win_evt = events.LocalWindowEvent(
events.LocalWindowEvent.EVT_LIST_REFRESH, event.remote_path)
evt_registry.postEvent(win_evt)
elif event.direction == protocol.ProtocolInterface.UPLOAD:
event = events.TransferEvent(**event.toHash())
evt_registry.postEvent(event)
win_evt = events.RemoteWindowEvent(
events.RemoteWindowEvent.EVT_LIST_UPDATE, event.remote_path)
evt_registry.postEvent(event)
def logTransfer(self, direction, user, host, src, dest, size):
"""Writes the transfer to the appropriate logger.
Direction corresponds to a protocol interface constant direction."""
msg = _("%(user)s@%(host)s: %(src)s -> %(dest)s [%(size)d bytes]\n") \
%{ 'user' : user, 'host' : host, 'src' : src, 'dest' : dest,
'size' : size }
if direction == protocol.ProtocolInterface.DOWNLOAD and self.download_logger:
self.download_logger.log(msg)
elif direction == protocol.ProtocolInterface.UPLOAD and self.upload_logger:
self.upload_logger.log(msg)
def recurseDirectory(self, event, err_handler=None):
"""Processes a directory recursion event.
Recursion causes the directory to be entered and the list of entries in the directory
to be added to the transfer queue. This applies to both uploading and downloading.
This event should only be called to recurse into a directory structure."""
self.saveTransferState(event)
self.postEvent(events.ThreadEvent.EVT_LIST)
if event.remote_path == self.REMOTE_SEP:
remote_path = event.remote_path + event.file
else:
remote_path = self.REMOTE_SEP.join([ event.remote_path, event.file ])
local_path = os.path.join(event.local_path, event.file)
if event.direction == protocol.ProtocolInterface.DOWNLOAD:
if not os.path.exists(local_path):
try:
os.makedirs(local_path)
except OSError, strerror:
messages.displayErrorDialog(utils.getMainWindow(),
_("Error creating local directories: %(err)s") %{ 'err' : strerror })
return
self.connect.cwd(remote_path)
event_copy = events.EnqueueEvent(**event.toHash())
event_copy.remote_path = remote_path
event_copy.local_path = local_path
status = DirectoryStatus(self, event_copy)
self.connect.list(status=status, passive=self.getPassive())
elif event.direction == protocol.ProtocolInterface.UPLOAD:
self.connect.mkdir(remote_path)
try:
files = os.listdir(local_path)
except OSError, strerror:
messages.displayErrorDialog(utils.getMainWindow(),
_("Error reading directory: %(err)s") %{ 'err' : strerror })
return
for f in files:
new_path = os.path.join(local_path, f)
try:
size = os.path.getsize(new_path)
except OSError, strerror:
messages.displayErrorDialog(utils.getMainWindow(),
_("Error obtaining file size for file %(file)s: %(err)s")
%{ 'file' : f, 'err' : strerror })
continue
if os.path.isfile(new_path):
flag = 'f'
elif os.path.isdir(new_path):
flag = 'd'
elif os.path.islink(new_path):
flag = 'l'
else:
flag = '-'
new_event = events.EnqueueEvent(
file = f,
host = event.host,
port = event.port,
username = event.username,
password = event.password,
size = size,
flags = flag,
remote_path = remote_path,
local_path = local_path,
direction = event.direction,
attempt = 1,
max_attempts = event.max_attempts,
method = event.method,
transport = event.transport,
)
evt_registry = events.getEventRegistry()
evt_registry.postEvent(new_event)
else:
raise TypeError, _("Invalid direction for transfer: [%(dir)s]") \
%{ 'dir' : event.direction }
def recurseLink(self, event):
"""Attempts to recurse on a remote link.
It is unknown in advance whether links lead to remote directories or files. First an
attempt is made to initiate a transfer. An error handler is provided to catch a
directory except, after which an attempt is made to recurse into the directory."""
class ErrHandler(dispatcher.ErrorHandler):
def __init__(self, thread):
self.thread = thread
def handle(self, error, dispatcher):
event = self.thread.loadTransferState()
self.thread.recurseDirectory(event)
err_handler = ErrHandler(self)
self.initiateTransfer(event, err_handler=err_handler)
def noop(self, err_handler=None):
"""Sends a no-op through the remote connection."""
self.idle = 0
self.connect.noop(err_handler=err_handler)
def cwd(self, dir, err_handler=None):
"""Changes the current working directory."""
self.postEvent(events.ThreadEvent.EVT_CWD)
self.idle = 0
self.connect.cwd(dir, err_handler=err_handler)
def list(self, err_handler=None):
"""Retrieves a remote listing."""
self.postEvent(events.ThreadEvent.EVT_LIST)
self.idle = 0
list_handler = ListStatus()
self.connect.list(status=list_handler, passive=self.getPassive(),
err_handler=err_handler)
def pwd(self, status=None, err_handler=None):
"""Gets the current working directory."""
self.idle = 0
self.connect.pwd(status=status, err_handler=err_handler)
def rename(self, old_name, new_name, err_handler=None):
"""Peforms a remote renaming of a file or directory."""
data = (old_name, new_name)
self.postEvent(events.ThreadEvent.EVT_DELETE, data=data)
self.idle = 0
self.connect.rename(old_name, new_name, err_handler=err_handler)
def delete(self, name, err_handler=None):
"""Deletes a remote file."""
self.postEvent(events.ThreadEvent.EVT_DELETE)
self.idle = 0
self.connect.delete(name, err_handler=err_handler)
def mkdir(self, name, err_handler=None):
"""Creates a new remote directory."""
self.postEvent(events.ThreadEvent.EVT_MKDIR)
self.idle = 0
self.connect.mkdir(name, err_handler=err_handler)
def rmdir(self, name, err_handler=None):
"""Removse a remote directory."""
self.postEvent(events.ThreadEvent.EVT_RMDIR)
self.idle = 0
self.connect.rmdir(name, err_handler=err_handler)
def chmod(self, name, perm, err_handler=None):
"""Changes a remote file's permissions."""
self.postEvent(events.ThreadEvent.EVT_CHMOD)
self.idle = 0
self.connect.chmod(perm, name, err_handler=err_handler)
def quit(self):
"""Quits the current file transfer session."""
self.connect.quit()
def abort(self):
"""Aborts the currently execution command."""
if self.connect:
self.connect.abort()
def busy(self):
"""Returns true if a command is executing or there are queued commands for the
protocol connection."""
if self.connect is None:
return False
return self.connect.busy()
class BrowseThread(ConnectionThread):
"""Remote site browser thread.
The remote browser thread acts as the main browsing thread for the remote site. It can
also be used to transfer files. There will only be one instance of the browsing thread
for navigating the remote site. This thread augments the connection thread with some
additional capability for updating main UI elements with regards to browsing events."""
def __init__(self, **kwargs):
"""Creates a new browse thread instance."""
ConnectionThread.__init__(self, **kwargs)
self.abort_connect = threading.Event()
def onThreadStatus(self, event):
"""Handles thread status event and extends the base method with browse specific
event kinds."""
if event.kind == events.ThreadStatusEvent.EVT_RETRY_CONNECT:
attempt = event.data
self.retryConnection(attempt)
else:
ConnectionThread.onThreadStatus(self, event)
def initiateConnect(self):
"""Initiates a connection to the remote site."""
self.abort_connect.clear()
self.attempt = 0
ConnectionThread.initiateConnect(self)
self.completeConnection()
def completeConnection(self):
"""Performs UI accounting and sends commands to get the current directory and listing
after the connection is completed."""
# Change to appropriate local directory
if self.opts['localdir']:
evt = events.LocalWindowEvent(events.LocalWindowEvent.EVT_LIST_UPDATE,
self.opts['localdir'])
evt_registry = events.getEventRegistry()
evt_registry.postEvent(evt)
# Change to appropriate remote directory
if self.opts['remotedir']:
self.connect.cwd(self.opts['remotedir'])
pwd_status = PwdStatus()
self.connect.pwd(status=pwd_status)
list_handler = ListStatus()
self.connect.list(status=list_handler, passive=self.getPassive())
def retryConnection(self, attempt):
"""Retries a connection attempt if the prior connection has failed."""
self.attempt = attempt
ConnectionThread.initiateConnect(self)
# This will get wiped out of the queue if the handler is called.
self.completeConnection()
def abort(self):
"""Aborts the establishment of a connection or a current executing command."""
if self.abort_connect.isSet():
ConnectionThread.abort(self)
else:
self.abort_connect.set()
def shouldAbortConnection(self):
"""Thread-safe: Returns true if the connection is in an aborted state."""
return self.abort_connect.isSet() or self.finished()
def getConnectErrorHandler(self):
"""Gets the error handler for establishing the connection."""
err_handler = BrowseConnectErrorHandler(self, self.attempt, self.logger)
return err_handler
def updateIdleTime(self):
"""Updates the idle time counter."""
ConnectionThread.updateIdleTime(self)
# Check if it's time to give up and disconnect
if self.idle == self.opts['main_idle']:
main_win = utils.getMainWindow()
main_win.onDisconnect(None)
def initiateTransfer(self, event):
"""Initiates a transfer using the browser thread."""
dispatcher = self.connect.getDispatcher()
remote_win = utils.getRemoteWindow()
old_dir = remote_win.getDir()
ConnectionThread.initiateTransfer(self, event)
self.connect.cwd(old_dir)
def recurseDirectory(self, event):
"""Recurses through the current directory."""
remote_win = utils.getRemoteWindow()
old_dir = remote_win.getDir()
ConnectionThread.recurseDirectory(self, event)
self.connect.cwd(old_dir)
def recurseLink(self, event):
"""Recurses through a link."""
remote_win = utils.getRemoteWindow()
old_dir = remote_win.getDir()
ConnectionThread.recurseLink(self, event)
self.connect.cwd(old_dir)
class TransferThread(ConnectionThread):
"""Transfer worker thread.
This thread is not intended for main browsing, only for connecting and transferring.
The thread is controlled entirely by the control window."""
def initiateConnect(self):
"""Initiates an connection to the remote site."""
err_handler = DefaultErrorHandler(self)
try:
ConnectionThread.initiateConnect(self)
except Exception, strerror:
err_handler.handle(strerror, None)
dispatcher = self.connect.getDispatcher()
dispatcher.setDefaultErrorHandler(err_handler)
def updateIdleTime(self):
"""Updates the idle time counter.
This will mark the thread as finished its task once the thread idle time has been
reached. At this point, the thread may be cleaned up."""
ConnectionThread.updateIdleTime(self)
if self.idle == self.opts['thread_idle']:
self.setDone()
class PwdStatus(dispatcher.DispatchStatus):
"""Status object for getting the current directory.
When the command to retrieve the current directory has finished executing, it will call
back to this status object and provide the current directory path in the optional data
attribute."""
def __init_(self):
dispatcher.DispatchStatus.__init__(self)
def start(self):
pass
def update(self, update):
pass
def finished(self):
"""Gets the optional data containing the path and generates a UI event."""
data = self.getOptionalData()
event = events.RemoteWindowEvent(events.RemoteWindowEvent.EVT_CWD, data)
evt_registry = events.getEventRegistry()
evt_registry.postEvent(event)
class ListStatus(dispatcher.DispatchStatus):
"""Status object for getting directory listing.
When the command to retrieve the directory listing has finished executing, the listing
information will be added into the optional data attribute."""
def __init__(self):
dispatcher.DispatchStatus.__init__(self)
def start(self):
pass
def update(self, update):
pass
def finished(self):
"""Gets the optional data containing the listing and generates a remote window UI
event."""
data = self.getOptionalData()
event = events.RemoteWindowEvent(events.RemoteWindowEvent.EVT_LIST_UPDATE, data)
evt_registry = events.getEventRegistry()
evt_registry.postEvent(event)
class DefaultErrorHandler(dispatcher.ErrorHandler):
"""Default error handler class."""
def __init__(self, thread):
"""Creates a new error handler instance and associates the error handler with the
specified connection thread object."""
dispatcher.ErrorHandler.__init__(self)
self.thread = thread
def handle(self, error, dispatcher):
"""Handles a command error.
A transfer event failure results in a queue failure event being generated."""
event = self.thread.loadTransferState()
if isinstance(event, events.TransferEvent):
opts = event.toHash()
opts['error'] = error
new_event = events.FailureEvent(**opts)
evt_registry = events.getEventRegistry()
evt_registry.postEvent(new_event)
class ConnectErrorHandler(DefaultErrorHandler):
"""Handles connection errors."""
def __init__(self, thread, logger):
"""Creates a new connection error handler instance and associates the error handler
with the specified connection thread object."""
DefaultErrorHandler.__init__(self, thread)
self.logger = logger
def handle(self, error, dispatcher):
"""Processes a connection error handling event and destroys the current connection."""
if self.logger:
self.logger.log(Logger.ERROR, _("Thread connection error: %(err)s")
%{ 'err' : error })
DefaultErrorHandler.handle(self, error, dispatcher)
event = events.ThreadStatusEvent(events.ThreadStatusEvent.EVT_DESTROY_CONNECT)
wx.PostEvent(self.thread, event)
class BrowseConnectErrorHandler(dispatcher.ErrorHandler):
"""Handlers connection errors for the browser thread."""
def __init__(self, thread, attempt, logger):
"""Creates a new browser connection error handler instance and associates the error
handler with the specified connection thread object."""
dispatcher.ErrorHandler.__init__(self)
self.thread = thread
self.opts = self.thread.getOptions()
self.attempt = attempt
self.logger = logger
def handle(self, error, dispatcher):
"""Handles a browser connection error.
This performs a retry to establish a connection provided that the maximum number of
retries has not yet been met."""
evt_registry = events.getEventRegistry()
if self.attempt >= self.opts['retries']:
if self.logger:
self.logger.log(Logger.ERROR,
_("Maximum connection attempt reached. Aborting connection..."))
self.cancelConnect()
return
delay = self.opts['delay']
if self.logger:
self.logger.log(Logger.ERROR,
_("Waiting %(delay)s secs before re-attempting connection...")
%{ 'delay' : delay })
slept = 0
while slept < delay:
time.sleep(1)
slept = slept + 1
if self.thread.shouldAbortConnection():
self.cancelConnect()
break
self.cancelConnect()
if not self.thread.shouldAbortConnection():
retry = self.attempt + 1
event = events.ThreadStatusEvent(events.ThreadStatusEvent.EVT_RETRY_CONNECT,
data=retry)
evt_registry.postEvent(event)
def cancelConnect(self):
"""Cancels the connection thread's current connection establishment."""
event = events.ThreadStatusEvent(events.ThreadStatusEvent.EVT_DESTROY_CONNECT)
wx.PostEvent(self.thread, event)
def destroyConnect(self):
"""Destroys the connection thread's current connection."""
self.cancelConnect()
self.thread.setDone()
class TransferStatus(dispatcher.DispatchStatus):
"""Status object for getting status regarding transfers.
This status object is called at the start of the transfer to initialize the progress
tuple datastructure. The status object is then called for update of progress. When
the transfer is completed, the finished method is called to finalize the transfer."""
def __init__(self, thread, event, min, max):
"""Creates a new transfer status object and associates it with the specified
connection thread."""
dispatcher.DispatchStatus.__init__(self)
self.thread = thread
self.event = event
self.progress = (min, max)
def start(self):
"""Handles a start of a transfer status update."""
self.postProgress(self.progress)
def update(self, update):
"""Handles a progress update for a transfer."""
cur, max = self.progress
cur = cur + update
self.progress = (cur, max)
self.postProgress(self.progress)
def finished(self):
event = events.ThreadStatusEvent(events.ThreadStatusEvent.EVT_FINISH_TRANSFER,
data=self.event)
wx.PostEvent(self.thread, event)
def postProgress(self, progress):
"""Progress a progress update event."""
event = events.ThreadStatusEvent(events.ThreadStatusEvent.EVT_UPDATE_PROGRESS,
data=progress)
wx.PostEvent(self.thread, event)
class DirectoryStatus(dispatcher.DispatchStatus):
"""Directory status object for getting status regarding directory listings.
This status object is useful for transfer threads and recursion. It generates a queue
event for event entry in the directory when the directory listing has completed."""
def __init__(self, thread, event):
"""Creates a new directory status object and associates it with the specified
connection thread."""
dispatcher.DispatchStatus.__init__(self)
self.thread = thread
self.event = event
def start(self):
pass
def update(self, update):
pass
def finished(self):
"""Called when the directory status listing has been completely retrieved.
The listing is stored in the optional data field. An event is generated for each
entry in the listing, with the exception of the special '.' and '..' entries."""
listing = self.getOptionalData()
files = [ ]
remote_window = utils.getRemoteWindow()
evt_registry = events.getEventRegistry()
for file in listing:
entry = remote_window.parseListEntry(file)
if entry[0] in ('.', '..'):
continue
new_event = events.EnqueueEvent(
file = entry[0],
host = self.event.host,
port = self.event.port,
username = self.event.username,
password = self.event.password,
size = entry[1],
flags = entry[3][0],
remote_path = self.event.remote_path,
local_path = self.event.local_path,
direction = self.event.direction,
attempt = 1,
max_attempts = self.event.max_attempts,
method = self.event.method,
transport = self.event.transport,
)
evt_registry.postEvent(new_event)
|