trace_inspection.py :  » Math » Modular-toolkit-for-Data-Processing » MDP-2.6 » bimdp » inspection » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Math » Modular toolkit for Data Processing 
Modular toolkit for Data Processing » MDP 2.6 » bimdp » inspection » trace_inspection.py
"""
Module to trace and document the training and execution of a BiFlow.

The BiMDPTraceInspector class is used to decorate a flow for inspection, the
snapshots are created with the TraceBiMDPHTMLTranslator class.

This module also supports (Bi)HiNet structures. Monkey patching is used to
inject the tracing code into the Flow.
"""

import os
import new
import cPickle as pickle
import fnmatch
import copy
import traceback

import numpy as n

import mdp.hinet as hinet

from bimdp import BiNode
from bimdp import BiFlow
from bimdp.hinet import BiFlowNode,CloneBiLayer

from bihinet_translator import BiHTMLTranslator
from utils import robust_pickle

# TODO: wrap inner methods (e.g. _train) to document effective arguments?

PICKLE_EXT = ".pckl"
PICKLE_PROTO = -1
SNAPSHOT_FILENAME = "snapshot"

SLIDE_CSS_FILENAME = "inspect.css"
CLICKABLE_NODE_ID = "clickable_node_%d"

NODE_TRACE_METHOD_NAMES = ["execute", "train", "stop_training"]
BINODE_TRACE_METHOD_NAMES = ["stop_message"]
TRACING_WRAP_FLAG = "_insp_is_wrapped_for_tracing_"
ORIGINAL_METHOD_PREFIX = "_insp_original_"

# additions to the BINET_STYLE for marking the currently traced nodes
INSPECT_TRACE_STYLE = """
table.current_node {
    background-color: #D1FFC7;
}

table.training_node {
    background-color: #FFFFCC;
}

table.clickable {
    cursor: pointer;
}

#inspect_biflow_td {
    vertical-align: top;
    padding: 0 100 0 0;
}

#inspect_result_td {
    vertical-align: top;
}

#displayed {
    border-top: 1px solid #003399;
}

table.inspect_io_data {
    font-family: monospace;
}

table.inspect_io_data td {
    vertical-align: top;
}

table.inspect_io_data pre {
    font-weight: bold;
}

span.inactive_section {
    color: #0000EE;
    cursor: pointer;
    font-weight: bold;
}

div.error {
    color: #FF0000;
    text-align: left;
}

div.error h3 {
    font-size: large;
    color: #FF0000;
}

html {
    overflow-y : scroll;
}
"""


class TraceDebugException(Exception):
    """Exception for return the information when debug is True."""
    
    def __init__(self, result, exception):
        """Store the information necessary to finish the tracing.
        
        result -- The result that would otherwise be returned by the method.
        exception -- The original caufght exception.
        """
        self.result = result
        self.exception = exception


class TraceHTMLInspector(hinet.HiNetTranslator):
    """Class for inspecting a single pass through a provided flow.
    
    This class is based on a translator that decorates the flow elements with 
    tracing wrappers. It also provides a callback function for the tracers
    and stores everything else needed for the inspection.
    
    This class is already specialized for creating HTML slides in the callback
    function.
    
    Note that a flow decorated for tracing is not compatible with pickling
    or parallel training and execution. But normally the decorated flow is
    only used in trace_training or trace_execution anyway.
    """
    
    def __init__(self, trace_translator, css_filename=SLIDE_CSS_FILENAME):
        """Prepare for tracing and create the HTML translator.
        
        trace_translator -- TraceBiMDPHTMLTranslator instance, with a write
            method to create the status visualization on each slide.
        css_filename -- CSS file used for all the slides.
        """
        super(TraceHTMLInspector, self).__init__()
        self._css_filename = css_filename
        self._trace_path = None  # path for the current trace 
        self._trace_name = None  # name for the current trace 
        self._flow = None  # needed for the callback HTML translation
        self.trace_translator = trace_translator
        # step counter used in the callback, is reset automatically
        self._slide_index = None
        self._slide_filenames = None
        self._section_ids = None  # can be used during execution
        self._slide_node_ids = None  # active node for each slide index
        self._undecorate_mode = None  # if True undecorate nodes
        
    def _reset(self):
        """Reset the internal variables for a new tracing.
        
        Should be called before 'train', 'stop_training' or 'execute' is called
        on the flow.
        """
        self._slide_index = 0
        self._slide_filenames = []
        self._section_ids = []
        self._slide_node_ids = []
        self._undecorate_mode = False
        self.trace_translator.reset()
        
    def trace_training(self, path, flow, x, msg=None, stop_msg=None,
                       trace_name="training", debug=False, **kwargs):
        """Trace a single training phase and the stop_training.
        
        Return a tuple containing a list of the training slide filenames, the
        training node ids and the same for stop_training.
        
        path -- Path were the inspection files will be stored.
        trace_name -- Name prefix for this inspection (default is training).
        **kwargs -- Additional arguments for flow.train can be specified
            as keyword arguments.
        """
        self._reset()
        self._trace_path = path
        # train and stop filenames must be different
        self._trace_name = trace_name + "_t"
        self._flow = flow
        self._translate_flow(flow)
        biflownode = BiFlowNode(BiFlow(flow.flow))
        try:
            biflownode.train(x=x, msg=msg, **kwargs)
            # reset is important for the following stop_training
            biflownode.bi_reset()
        except Exception, exception:
            if debug:
                # insert the error slide and encapsulate the exception
                traceback.print_exc()
                self._write_error_frame()
                result = (self._slide_filenames, self._slide_node_ids,
                          None, None)
                raise TraceDebugException(result=result, exception=exception) 
            else:
                raise
        train_filenames = self._slide_filenames
        train_node_ids = self._slide_node_ids
        self._reset()
        self._trace_name = trace_name + "_s"
        try:
            biflownode.stop_training(stop_msg)
        except Exception, exception:
            if debug:
                # insert the error slide and encapsulate the exception
                traceback.print_exc()
                self._write_error_frame()
                result = (train_filenames, train_node_ids,
                          self._slide_filenames, self._slide_node_ids)
                raise TraceDebugException(result=result, exception=exception) 
            else:
                raise 
        stop_filenames = self._slide_filenames
        stop_node_ids = self._slide_node_ids
        # restore undecoreted flow
        self._undecorate_mode = True
        self._translate_flow(flow)
        return train_filenames, train_node_ids, stop_filenames, stop_node_ids
    
    def trace_execution(self, path, trace_name, flow, x, msg=None, target=None,
                        debug=False, **kwargs):
        """Trace a single execution.
        
        The return value is a tuple containing a list of the slide filenames,
        the node ids, the section_ids for a slideshow with sections
        (or None if no section_ids were used) and the execution output value.
        
        path -- Path were the inspection files will be stored.
        trace_name -- Name prefix for this inspection.
        **kwargs -- Additional arguments for flow.execute can be specified
            as keyword arguments.
        """
        self._reset()
        self._trace_path = path
        self._trace_name = trace_name
        self._flow = flow
        self._translate_flow(flow)
        if (not (isinstance(flow, BiFlow) or isinstance(flow, BiNode)) and
            (msg is not None)):
            # a msg would be interpreted as nodenr by a Flow, so check this
            err = "A msg was given for a normal Flow (need BiFlow)."
            raise Exception(err)
        try:
            if msg is None:
                y = self._flow.execute(x, **kwargs)
            else:
                y = self._flow.execute(x, msg, target, **kwargs)
        except Exception, exception:
            if debug:
                # insert the error slide and encapsulate the exception
                traceback.print_exc()
                self._write_error_frame()
                if not self._section_ids:
                    self._section_ids = None
                result = (self._slide_filenames, self._slide_node_ids,
                          self._section_ids)
                raise TraceDebugException(result=result, exception=exception) 
            else:
                raise
        self._undecorate_mode = True
        self._translate_flow(flow)
        if not self._section_ids:
            self._section_ids = None
        else:
            if len(self._section_ids) != len(self._slide_filenames):
                err = ("Mismatch between number of section_ids and number of "
                       "slides.")
                raise Exception(err)
        return (self._slide_filenames, self._slide_node_ids,
                self._section_ids, y)
    
    def _tracer_callback(self, node, method_name, method_result, method_args,
                         method_kwargs):
        """This method is called by the tracers.
        
        The calling tracer also provides this method with the needed state
        information and the method arguments.
        
        node -- The node from which the callback was initiated.
        method_name -- Name of the method from which the callback was initiated.
        result -- Return value of the method.
        args, kwargs -- The arguments of the method call.
        """
        ## write visualization to html_file
        try:
            html_file = self._begin_HTML_frame()
            section_id, node_id = self.trace_translator.write_flow_to_file(
                                            path=self._trace_path,
                                            html_file=html_file, 
                                            flow=self._flow,
                                            node=node,
                                            method_name=method_name, 
                                            method_result=method_result,
                                            method_args=method_args,
                                            method_kwargs=method_kwargs)
            self._slide_index += 1
            if section_id is not None:
                self._section_ids.append(section_id)
            self._slide_node_ids.append(node_id)
        finally:
            self._end_HTML_frame(html_file)
    
    ## HTML decoration ##
    
    def _begin_HTML_frame(self):
        """Return the HTML file for a trace frame including the header.
        
        The file should then be finished via _end_HTML_frame.
        """
        path = self._trace_path
        filename = self._trace_name + "_%d.html" % self._slide_index
        self._slide_filenames.append(filename)
        html_file = open(os.path.join(path, filename), "w")
        html_file = hinet.NewlineWriteFile(html_file)
        html_file.write('<html>\n<head>\n<title>Inspection Slide</title>')
        if self._css_filename:
            html_file.write('<style type="text/css" media="screen">')
            html_file.write('@import url("%s");' % self._css_filename)
            html_file.write('</style>\n</head>\n<body>')
        return html_file
    
    def _end_HTML_frame(self, html_file):
        """Complete and close the HTML file for a trace frame.
        
        The method should always be used after _begin_HTML_frame.
        """
        html_file.write('</body>\n</html>')
        html_file.close()
        
    def _write_error_frame(self):
        html_file = self._begin_HTML_frame()
        html_file.write('<div class="error">')
        html_file.write('<h3>Encountered Exception</h3>')
        traceback_html = traceback.format_exc().replace('\n', '<br>')
#        # get HTML traceback
#        import StringIO as stringio
#        import cgitb
#        import mdp
#        exception_type, exception, tb = sys.exc_info()
#        # Problem: only the text of the original exception is stored in
#        #     mdp.FlowExceptionCR, and the text is not even correctpy displayed.
##        if exception_type is mdp.FlowExceptionCR:
##            exception.args = tuple()
##            exception.message = None
#        buffer = stringio.StringIO()
#        handler = cgitb.Hook(file=buffer)
#        handler.handle((exception_type, exception, tb))
#        traceback_html = buffer.getvalue()
        html_file.write(traceback_html)
        html_file.write('</div>')
        self._end_HTML_frame(html_file)
        
    ## translation methods ##
    
    # note that via the _undecorate_mode flag the translation undecorates,
    # this is required for clonelayer, where only one instance is decorated
    
    def _translate_flow(self, flow):
        """Wrap the flow in place and return it."""
        super(TraceHTMLInspector, self)._translate_flow(flow)
        return flow
    
    # TODO: enable the use of a shallow copy to save memory,
    #    but this requires to implement __copy__ in Node etc. for recursive
    #    shallow copying
    
    def _translate_clonelayer(self, clonelayer):
        """Add a special wrapper for switch_to_copies."""
        if self._undecorate_mode:
            if isinstance(clonelayer, CloneBiLayer):
                # check that clonelayer is actually decorated
                if not hasattr(clonelayer, "_original_set_use_copies"):
                    return
                del clonelayer._set_use_copies
                del clonelayer._original_set_use_copies
                del clonelayer.__getstate__
                self._translate_node(clonelayer.nodes[0])
                if not clonelayer.use_copies:
                    clonelayer.nodes = ((clonelayer.node,) *
                                        len(clonelayer.nodes))
            else:
                self._translate_node(clonelayer.nodes[0])
                clonelayer.nodes = (clonelayer.node,) * len(clonelayer.nodes)
            # undecoration is complete
            return
        ## decorate clonelayer
        if ((not isinstance(clonelayer, CloneBiLayer)) or
            (not clonelayer.use_copies)):
            # use a decorated deep copy for the first node
            clonelayer.node = clonelayer.nodes[0].copy()
            clonelayer.nodes = (clonelayer.node,) + clonelayer.nodes[1:]
        # only decorate the first node
        self._translate_node(clonelayer.nodes[0])
        if isinstance(clonelayer, CloneBiLayer):
            # add a wrapper to _set_use_copies, 
            # otherwise all nodes in layer would get decorated
            clonelayer._original_set_use_copies = clonelayer._set_use_copies
            trace_inspector = self
            def wrapped_use_copies(self, use_copies):
                # undecorate internal nodes to allow copy operation
                trace_inspector._undecorate_mode = True
                trace_inspector._translate_node(clonelayer.nodes[0])
                trace_inspector._undecorate_mode = False
                if use_copies and not self.use_copies:
                    # switch to node copies, no problem
                    clonelayer._original_set_use_copies(use_copies)
                elif not use_copies and self.use_copies:
                    # switch to a single node instance
                    # but use a (decorated) deep copy for first node
                    clonelayer._original_set_use_copies(use_copies)
                    clonelayer.node = clonelayer.nodes[0].copy()
                    clonelayer.nodes = (clonelayer.node,) + clonelayer.nodes[1:]
                trace_inspector._translate_node(clonelayer.nodes[0])
            clonelayer._set_use_copies = new.instancemethod(wrapped_use_copies, 
                                                            clonelayer)
            # modify getstate to enable pickling (get rid of the instance methods)
            def wrapped_getstate(self):
                result = self.__dict__.copy()
                # delete instance methods
                del result["_original_set_use_copies"]
                del result["_set_use_copies"]
                del result["__getstate__"]
                return result
            clonelayer.__getstate__ = new.instancemethod(wrapped_getstate,
                                                         clonelayer)
        
    def _translate_standard_node(self, node):
        """Wrap the node."""
        if not self._undecorate_mode:
            self._standard_tracer_decorate(node)
        else:
            self._standard_tracer_undecorate(node)
        
    ## monkey patching methods ##
        
    def _standard_tracer_decorate(self, node):
        """Adds a tracer wrapper to the node via monkey patching."""
        # add a marker to show that this node is wrapped
        setattr(node, TRACING_WRAP_FLAG, True) 
        trace_method_names = list(NODE_TRACE_METHOD_NAMES)
        if isinstance(node, BiNode):
            trace_method_names += BINODE_TRACE_METHOD_NAMES
        for method_name in trace_method_names:
            new_method_name = ORIGINAL_METHOD_PREFIX + method_name
            # create a reference to the original method
            setattr(node, new_method_name, getattr(node, method_name))
            # use nested scopes  lexical closure to get proper wrapper
            def get_wrapper(_method_name, _inspector):
                _new_method_name = ORIGINAL_METHOD_PREFIX + method_name
                def wrapper(self, *args, **kwargs):
                    args_copy = copy.deepcopy(args)
                    kwargs_copy = copy.deepcopy(kwargs)
                    result = getattr(self, _new_method_name)(*args, **kwargs)
                    _inspector._tracer_callback(self, _method_name, result,
                                                args_copy, kwargs_copy)
                    return result
                return wrapper
            # hide the original method in this instance behind the wrapper
            setattr(node, method_name, 
                    new.instancemethod(get_wrapper(method_name, self), node))
        # modify getstate to enable pickling (get rid of the instance methods)
        def wrapped_getstate(self):
            result = self.__dict__.copy()
            if not hasattr(node, TRACING_WRAP_FLAG):
                return result
            del result[TRACING_WRAP_FLAG]
            # delete all instance methods
            trace_method_names = list(NODE_TRACE_METHOD_NAMES)
            if isinstance(self, BiNode):
                trace_method_names += BINODE_TRACE_METHOD_NAMES
            for method_name in trace_method_names:
                del result[method_name]
                old_method_name = ORIGINAL_METHOD_PREFIX + method_name
                del result[old_method_name]
            del result["__getstate__"]
            return result
        node.__getstate__ = new.instancemethod(wrapped_getstate, node)
    
    def _standard_tracer_undecorate(self, node):
        """Remove a tracer wrapper from the node."""
        if not hasattr(node, TRACING_WRAP_FLAG):
            return
        delattr(node, TRACING_WRAP_FLAG)
        trace_method_names = list(NODE_TRACE_METHOD_NAMES)
        if isinstance(node, BiNode):
            trace_method_names += BINODE_TRACE_METHOD_NAMES
        for method_name in trace_method_names:
            # delete the wrapped method in the instance to unhide the original
            delattr(node, method_name)
            # delete the no longer used reference to the original method
            old_method_name = ORIGINAL_METHOD_PREFIX + method_name
            delattr(node, old_method_name)
        # restore normal getstate
        delattr(node, "__getstate__")


class TraceHTMLTranslator(BiHTMLTranslator):
    """Class to visualize the state of a BiFlow during execution or training.
    
    The single snapshot is a beefed up version of the standard HTML view.
    Capturing the data to make this possible is not the responsibility of this
    class.
    """
    
    def __init__(self, show_size=False):
        """Initialize the internal variables."""
        super(TraceHTMLTranslator, self).__init__(show_size=show_size)
        self._current_node = None
        self._method_name = None
        self._result = None
        # this the HTML node id, not the Node attribute
        # this might change in the future
        self._current_node_id = None
        self._node_id_index = None  # counter for nodes to give them an id
        
    @staticmethod
    def _array_pretty_html(ar):
        """Return a nice HTML representation of the given numpy array."""
        ar_str = 'shape: %s<br>\n' % str(ar.shape)
        # TODO: use np.savetxt instead?
        ar_str += (str(ar).replace(' [', '<br>\n[').
                    replace(']\n ...', ']<br>\n...'))
        return ar_str
    
    @staticmethod
    def _dict_pretty_html(dic):
        """Return a nice HTML representation of the given numpy array."""
        # TODO: use an stringio buffer for efficency
        dic_str = '{'
        first = True
        for key, value in dic.items():
            if first:
                first = False
            else:
                dic_str += ',<br>\n'
            dic_str += repr(key) + ': '
            if isinstance(value, str):
                dic_str += repr(value)
            else:
                dic_str += str(value)
        dic_str += '}'
        return dic_str
    
    def reset(self):
        """Reset internal variables for a new trace.
        
        It is called (by TraceHTMLInspector) before calling 'train',
        'stop_training' or 'execute' on the flow.
        
        This method can be overridden by derived that need to keep track of the
        training or execution phase. 
        """
        pass
    
    def write_flow_to_file(self, path, html_file, flow, node, method_name, 
                           method_result, method_args, method_kwargs):
        """Write the HTML translation of the flow into the provided file.
        
        Return value is the section_id and the HTML/CSS id of the active node.
        The section id is ignored during training.
        
        path -- Path of the slide (e.h. to store additional images).
        html_file -- File of current slide, where the translation is written.
        flow -- The overall flow.
        node -- The node that was called last.
        method_name -- The method that was called on the last node.
        method_result -- The result from the last call.
        method_args -- args that were given to the method
        method_kwargs -- kwargs that were given to the method
        """
        self._html_file = hinet.NewlineWriteFile(html_file)
        f = self._html_file
        ## create table, left side for the flow, right side for data
        f.write('<br><br>')
        f.write('<table><tr><td id="inspect_biflow_td">')
        f.write("<h3>flow state</h3>")
        self._translate_flow(flow, node)
        # now the argument / result part of the table
        f.write('</td><td id="inspect_result_td">')
        section_id = self._write_right_side(
                               path=path, html_file=html_file, flow=flow,
                               node=node, method_name=method_name,
                               method_result=method_result,
                               method_args=method_args,
                               method_kwargs=method_kwargs)
        f.write('</table>')
        f.write('</td></tr>\n</table>')
        self._html_file = None
        return section_id, self._current_node_id
    
    def _write_right_side(self, path, html_file, flow, node, method_name, 
                          method_result, method_args, method_kwargs):
        """Write the result part of the translation.
        
        Return value can be a section_id or None. The section_id is ignored
        during training (since the slideshow sections are used for the
        training phases). 
        
        This method can be overriden for custom visualisations. Usually this
        original method should still be called via super.
        """
        f = self._html_file
        if not method_name == "stop_training":
            f.write('<h3>%s arguments</h3>' % method_name)
            f.write('<table class="inspect_io_data">')
            if (method_name in ["execute", "train"]) and method_args:
                # deal with x separately
                x = method_args[0]
                method_args = method_args[1:]
                if isinstance(x, n.ndarray):
                    f.write('<tr><td><pre>x = </pre></td>' +
                            '<td>' + self._array_pretty_html(x) + '</td></tr>')
                else:
                    f.write('<tr><td><pre>x = </pre></td><td>' + str(x) +
                            '</td></tr>')
            # remaining arg is message
            if method_args and method_args[0] is not None:
                f.write('<tr><td><pre>msg = </pre></td><td>' +
                        self._dict_pretty_html(method_args[0]) + '</td></tr>')
            # normally the kwargs should be empty
            for arg_key in method_kwargs:
                f.write('<tr><td><pre>' + arg_key + ' = </pre></td><td>' + 
                        str(method_kwargs[arg_key]) + '</td></tr>')
            f.write('</table>')
        ## print results
        f.write("<h3>%s result</h3>" % method_name)
        f.write('<table class="inspect_io_data">')
        if method_result is None:
            f.write('<tr><td><pre>None</pre></tr></td>')
        elif isinstance(method_result, n.ndarray):
            f.write('<tr><td><pre>x = </pre></td><td>' +
                    self._array_pretty_html(method_result) + '</td></tr>')
        elif isinstance(method_result, dict):
            f.write('<tr><td><pre>msg = </pre></td><td>' +
                    self._dict_pretty_html(method_result) + '</td></tr>')
        elif isinstance(method_result, tuple):
            # interpret the results depending on the method name
            if method_name == "execute" or method_name == "train":
                result_names = ["x", "msg", "target"]
            else:
                result_names = ["msg", "target"]
            for i_result_part, result_part in enumerate(method_result):
                f.write('<tr><td><pre>' + result_names[i_result_part] +
                        ' = </pre></td><td>')
                if isinstance(result_part, n.ndarray):
                    f.write(self._array_pretty_html(result_part) +
                            '</td></tr>')
                elif isinstance(result_part, dict):
                    f.write(self._dict_pretty_html(result_part) +
                            '</td></tr>')
                else:
                    f.write(str(result_part) + '</td></tr>')
        else:
            f.write('<tr><td><pre>unknown result type: </pre></td><td>' +
                    str(method_result) + '</td></tr>')
        
    # overwrite private methods
    
    def _translate_flow(self, flow, current_node=None):
        """Translate the flow into HTML and write it into the internal file.
        
        Use write_flow_to_file instead of calling this method directly.
        This method only takes care of the BiFlow structure graph (the left
        part of the slide).
        
        current_node -- The current_node that was called last.
            
        These arguments are stored as attributes and are then used in
        _open_node_env.
        """
        self._current_node = current_node
        self._node_id_index = 0
        self._current_node_id = None
        super(TraceHTMLTranslator, self)._translate_flow(flow)
        
    def _open_node_env(self, node, type_id="node"):
        """Open the HTML environment for the node internals.
        
        This special version highlights the nodes involved in the trace.
        
        node -- The node itself.
        type_id -- The id string as used in the CSS.
        """
        f = self._html_file
        html_line = '<table class="' 
        trace_class = None
        if node is self._current_node:
            trace_class = "current_node"
        elif type_id == "node" and node._train_phase_started:
            trace_class = "training_node"
        if trace_class:
            html_line += ' %s' % trace_class
        html_line += ' %s' % type_id
        # assign id only to nodes which trigger a slide creation,
        # i.e. only if the node can become active 
        if hasattr(node, TRACING_WRAP_FLAG): 
            node_id = CLICKABLE_NODE_ID % self._node_id_index
            if node is self._current_node:
                self._current_node_id = node_id
            self._node_id_index += 1
            html_line +=  ' clickable" id="%s">' % node_id
        else:
            html_line += '">'
        f.write(html_line)
        self._write_node_header(node, type_id)


## Functions to capture pickled biflow snapshots during training. ##

def prepare_training_inspection(flow, path):
    """Use hook in the BiFlow to store a snapshot in each training phase.
    
    path -- Path were the snapshots are stored.
    
    This is done by wrapping the _stop_training_hook method of biflow.
    Some attributes are added to the biflow which store all information needed
    for the pickling (like filename). To enable pickling we use the
    __getstate__ slot, since some attributes cannot be pickled.
    """
    # add attributes to biflow which are used in wrapper_method
    flow._snapshot_counter_ = 0
    flow._snapshot_path_ = path
    flow._snapshot_name_ = SNAPSHOT_FILENAME
    flow._snapshot_instance_methods_ = []
    ### wrap _stop_training_hook to store biflow snapshots ###
    def pickle_wrap_method(_flow, _method_name):
        new_method_name = ORIGINAL_METHOD_PREFIX + _method_name
        def wrapper(self, *args, **kwargs):
            result = getattr(self, new_method_name)(*args, **kwargs)
            # pickle biflow
            filename = (self._snapshot_name_ + "_%d" % self._snapshot_counter_ + 
                        PICKLE_EXT)
            robust_pickle(self._snapshot_path_, filename, self)
            self._snapshot_counter_ += 1
            return result
        # create a reference to the original method
        setattr(_flow, new_method_name, getattr(_flow, _method_name))
        # hide the original method in this instance behind the wrapper
        setattr(_flow, _method_name, new.instancemethod(wrapper, _flow))
        _flow._snapshot_instance_methods_.append(_method_name)
        _flow._snapshot_instance_methods_.append(new_method_name)
    pickle_wrap_method(flow, "_stop_training_hook")
    ### wrap __getstate__ to enable pickling ###
    # note that in the pickled flow no trace of the wrapping remains
    def wrapped_biflow_getstate(self):
        result = self.__dict__.copy()
        # delete all instancemethods
        for method_name in self._snapshot_instance_methods_:
            del result[method_name]
        # delete the special attributes which were inserted by the wrapper
        # (not really necessary)
        del result["_snapshot_counter_"]
        del result["_snapshot_path_"]
        del result["_snapshot_name_"]
        del result["_snapshot_instance_methods_"]
        # remove data attributes (generators cannot be pickled)
        # pop with default value also works when key is not present in dict
        result.pop("_train_data_iterables", None)
        result.pop("_train_data_iterator", None)
        result.pop("_train_msg_iterables", None)
        result.pop("_train_msg_iterator", None)
        result.pop("_stop_messages", None)
        result.pop("_exec_data_iterator", None)
        result.pop("_exec_msg_iterator", None)
        result.pop("_exec_target_iterator", None)
        return result
    flow.__getstate__ = new.instancemethod(wrapped_biflow_getstate, flow)
    flow._snapshot_instance_methods_.append("__getstate__")
    
def remove_inspection_residues(flow):
    """Remove all the changes made by prepare_training_inspection."""
    try:
        for method_name in flow._snapshot_instance_methods_:
            delattr(flow, method_name)
        del flow._snapshot_counter_
        del flow._snapshot_path_
        del flow._snapshot_name_
        del flow._snapshot_instance_methods_
    except:
        # probably the hooks were already removed, so do nothing
        pass
    
## Further helper functions ##

def _trace_biflow_training(snapshot_path, inspection_path, css_filename,
                           x_samples, msg_samples=None, stop_messages=None,
                           trace_inspector=None,
                           debug=False, show_size=False, verbose=True,
                           **kwargs):
    """Load flow snapshots and perform the inspection with the given data.
    
    The return value consists of the slide filenames, the slide node ids,
    and an index table (index of last slide of section indexed by node,
    phase, train and stop). If no snapshots were found the return value is
    None.
    
    snapshot_path -- Path were the flow training snapshots are stored.
    inspection_path -- Path were the slides are stored.
    css_filename -- Filename of the CSS file for the slides.
    x_samples, msg_samples -- Lists with the input data for the training trace.
    stop_messages -- The stop msg for the training trace.
    trace_inspector -- Instance of HTMLTraceInspector, can be None for
        default class.
    debug -- If True (default is False) then any exception will be
        caught and the gathered data up to that point is returned in the
        normal way. This is useful for bimdp debugging.
    show_size -- Show the approximate memory footprint of all nodes.
    verbose -- If True (default value) a status message is printed for each
        loaded snapshot.
    **kwargs -- Additional arguments for flow.train can be specified
        as keyword arguments.
    """
    if not trace_inspector:
        trace_translator = TraceHTMLTranslator(show_size=show_size)
        trace_inspector = TraceHTMLInspector(
                                trace_translator=trace_translator,
                                css_filename=css_filename)
    i_train_node = 0  # index of the training node
    i_snapshot = 0 # snapshot counter
    index_table = [[]]  # last slide indexed by [node, phase, train 0 or stop 1]
    slide_filenames = []
    slide_node_ids = []
    try:
        # search for the snapshot files
        for file_path, dirs, files in os.walk(os.path.abspath(snapshot_path)):
            dirs.sort()
            files = fnmatch.filter(files, SNAPSHOT_FILENAME + "*" + PICKLE_EXT)
            files.sort()
            for filename in files:
                filename = os.path.join(file_path, filename)
                # load the flow snapshot
                pickle_file = open(filename, "rb")
                biflow = None  # free memory
                try:
                    biflow = pickle.load(pickle_file)
                finally:
                    pickle_file.close()
                # determine which node is training and set the indices
                for node in biflow[i_train_node:]:
                    if node.get_remaining_train_phase() > 0:
                        break
                    else:
                        i_train_node += 1
                        index_table.append([])
                # inspect the training
                x = x_samples[i_train_node]
                if msg_samples:
                    msg = msg_samples[i_train_node]
                else:
                    msg = None
                if stop_messages:
                    stop_msg = stop_messages[i_train_node]
                else:
                    stop_msg = None
                trace_name = "%d_%d" % (i_snapshot, i_train_node)
                train_files, train_ids, stop_files, stop_ids = \
                    trace_inspector.trace_training(
                                            trace_name=trace_name,
                                            path=inspection_path,
                                            flow=biflow,
                                            x=x, msg=msg, stop_msg=stop_msg,
                                            debug=debug,
                                            **kwargs)
                slide_filenames += train_files
                train_index = len(slide_filenames) - 1
                slide_filenames += stop_files
                stop_index = len(slide_filenames) - 1
                index_table[i_train_node].append((train_index, stop_index))
                slide_node_ids += train_ids
                slide_node_ids += stop_ids
                if verbose:
                    print "got traces for snapshot %d" % (i_snapshot + 1)
                i_snapshot += 1
    except TraceDebugException, debug_exception:
        train_files, train_ids, stop_files, stop_ids = debug_exception.result
        slide_filenames += train_files
        train_index = len(slide_filenames) - 1
        if stop_files:
            slide_filenames += stop_files
        stop_index = len(slide_filenames) - 1
        index_table[i_train_node].append((train_index, stop_index))
        slide_node_ids += train_ids
        if stop_ids:
            slide_node_ids += stop_ids
        debug_exception.result = (slide_filenames, slide_node_ids, index_table)
        raise
    if i_snapshot == 0:
        return None  # no snapshots were found
    return slide_filenames, slide_node_ids, index_table

www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.