Middle tier between a Lexer and an ObjectGenerator. receives parse events from Lexer
and constructs a parse tree, then calls an ObjectGenerator to generate an object file from
the parse tree. the parse tree structure is language-neutral and contains no python code.
Compiler is currently stateful and is not thread-safe. the clone() method can be used
to create copies of this object for use in multiple threads.
from myghty.util import *
from myghty import exception
import myghty.lexer
import myghty.objgen as objgen
import myghty.args as args
import string, re, sys, warnings
# blocks that can only be in the top level
top_level_only_block = dict(map (lambda x:[x ,True], ['once', 'shared', 'threadonce']))
# valid flags in use
valid_comp_flag = dict(map (lambda x:[x ,True], ['inherit']))
# map of block names pointing to compile method names
'python' : 'python_block',
argscopes = {
'request' : args.RequestArg,
'subrequest' : args.SubRequestArg,
'dynamic' : args.DynamicArg,
'component' : args.LocalArg
class Compiler:
# compiled component modules must have this defined as their
# _MAGIC_NUMBER or an IncompatibleCompiler exception will be raised
class BlockFlyweight:
"""an object that represents
a construct in the original source file. maintains a reference to a list
of source file items and uses a context to act upon the list entry
in question, so that very few objects need to be created """
def __init__(self):pass
def get_line(self, context):
return context.entry[2]
def get_line_number(self, context):
return context.entry[1]
def append(self, list, line, linenumber):
list.append([self, linenumber, line])
def accept_visitor(self, visitor, context):raise NotImplementedError
class CodeBlock(BlockFlyweight):
"""represents a <%python> section"""
def get_lines(self, context):
block = context.entry[2]
lines = re.split(r"\n", block)
return lines
def accept_visitor(self, visitor, context):
visitor.visit_code_block(self, context)
class Substitution(BlockFlyweight):
"""represents a <% %> line """
def accept_visitor(self, visitor, context):
visitor.visit_substitution(self, context)
def append(self, list, line, escape, linenumber):
list.append([self, linenumber, line, escape])
def get_escapes(self, context):
return context.entry[3]
class TextLine(BlockFlyweight):
""" represents a line of plain text"""
def accept_visitor(self, visitor, context):
visitor.visit_text_line(self, context)
class CodeSingleLine(BlockFlyweight):
""" represents a % line """
def accept_visitor(self, visitor, context):
visitor.visit_code_single_line(self, context)
class ComponentCall(BlockFlyweight):
""" represents a component call without content <& &>"""
def append(self, list, component, args, linenumber):
list.append([self, linenumber, component, args])
def get_component(self, context):
return context.entry[2]
def get_args(self, context):
return context.entry[3]
def accept_visitor(self, visitor, context):
visitor.visit_component_call(self, context)
class ComponentContentCall(ComponentCall):
""" represents a component call with content <&| &>"""
def accept_visitor(self, visitor, context):
visitor.visit_component_content_call(self, context)
class ComponentContentCallEnd(ComponentCall):
""" represents a component call with content end tag </&>"""
def accept_visitor(self, visitor, context):
visitor.visit_component_content_call_end(self, context)
class Closure(BlockFlyweight):
def append(self, list, compiled, linenumber):
list.append([self, linenumber, compiled])
def get_compiled(self, context):
return context.entry[2]
def accept_visitor(self, visitor, context):
visitor.visit_closure_block(self, context)
class BlockFlyweightIterator:
"""an iterator that loops through an array of BlockFlyweight references and
their associated data,
and acts as the context to send to their get() methods"""
def __init__(self, list):
self.list = list
self.index = -1
self.entry = None
def __iter__(self):
return self
def next(self):
self.index += 1
if self.index >= len(self.list): raise StopIteration()
self.entry = self.list[self.index]
return self.entry[0]
# singleton instances of BlockFlyweights
codeblock = CodeBlock()
textline = TextLine()
codesingleline = CodeSingleLine()
substituteline = Substitution()
componentcall = ComponentCall()
componentcontentcall = ComponentContentCall()
componentcontentcallend = ComponentContentCallEnd()
closure = Closure()
def __init__(self,
python_pre_processor = None,
python_post_processor = None,
text_post_processor = None,
lexer = None,
generator = None,
default_escape_flags = [],
use_source_line_numbers = True,
allow_globals = [],
disable_unicode = False,
self.current_compile = None
if lexer:
self.lexer = lexer
self.lexer = myghty.lexer.Lexer(**params)
if generator:
self.generator = generator
self.generator = objgen.PythonGenerator()
self.default_escape_flags = default_escape_flags
self.python_pre_processor = python_pre_processor
self.python_post_processor = python_post_processor
self.text_post_processor = text_post_processor
self.use_source_line_numbers = use_source_line_numbers
self.allow_globals = allow_globals
self.disable_unicode = bool(disable_unicode)
def get_object_id(self):
"""identification string placed at the top of compiled files"""
return "%s|%s|%s" % (self.lexer.get_object_id(),
def get_magic_number(self):
return Compiler.MAGIC_NUMBER, {'disable_unicode': self.disable_unicode}
def clone(self, **params):
"""creates a clone of this Compiler. allow the Prototype pattern
to be used in creating compilers for use in other threads."""
params['lexer'] = self.lexer.clone()
params['generator'] = self.generator.clone()
clone = ConstructorClone(self, **params)
return clone.clone()
class Compiled:
"Stores information about the currently compiling block"
def __init__(self, compiler, fileid, name, in_main = True, parent = None, block_type = None, flags = None):
self.compiler = compiler
self.lexer = compiler.lexer
self.parent = parent
self.in_main = in_main
self.block_type = block_type
self.fileid = fileid
self.name = name
self.component_content_call_stack = []
self.in_block = None
self.args = OrderedDict()
if flags is not None:
self.flags = OrderedDict([flags])
self.flags = OrderedDict()
self.attr = OrderedDict()
self.named_blocks = {'def': {}, 'method': {}}
self.blocks = {
'cleanup' : [],
'filter' : [],
'init' : [],
'once' : [],
'threadonce' : [],
'shared' : [],
'body' : []
self.start_blocks = {
'args' : -1,
'flags' : -1,
'attr': - 1,
self.encoding = None
def accept_visitor(self, visitor):
if self.in_main:
elif self.block_type == 'def':
elif self.block_type == 'method':
def add_argument(self, name, default, linenumber, scope):
if self.args.has_key(name):
self.lexer.raise_syntax_error("component argument %s already defined" % name)
self.args[name] = argscopes[scope or 'component'](name, default = default, linenumber = linenumber)
except KeyError:
self.lexer.raise_syntax_error("Unknown %%args scope '%s'" % scope)
def has_named_block(self, block_type, name):
if block_type is None:
for value in self.named_blocks.values():
if value.has_key(name):
return True
return False
return self.named_blocks.has_key(block_type) and self.named_blocks[block_type].has_key(name)
def get_named_block(self, block_type, name):
if block_type is None:
for value in self.named_blocks.values():
if value.has_key(name):
return value[name]
raise KeyError(name)
return self.named_blocks[block_type][name]
def add_named_block(self, block_type, name, block):
self.named_blocks[block_type][name] = block
def get_named_blocks(self, block_type = None):
if block_type is not None:
return self.named_blocks[block_type].values()
list = []
for d in self.named_blocks.values():
list += d.values()
return list
def get_body(self):
return self.blocks['body']
def get_block_list(self, block_type):
return self.blocks[block_type]
def block_has_code(self, block_type):
return len(self.blocks[block_type]) > 0
def get_block_iterator(self, block_type):
block = self.get_block_list(block_type)
return Compiler.BlockFlyweightIterator(block)
def set_encoding(self, encoding):
if self.parent is not None:
self.parent.set_encoding(encoding, ignore_if_already_set)
elif self.encoding:
'multiple file encoding specifiers')
self.encoding = encoding
def get_encoding(self):
if self.parent is not None:
return self.parent.get_encoding()
return self.encoding
def __str__(self):
return self._to_string()
def _to_string(self, indent = ''):
"""dumps out the structure attempting to recreate the original source file,
more or less."""
list = []
if self.encoding is not None:
list.append("# -*- encoding: %s -*-" % self.encoding)
for arg in self.args.keys():
list.append(arg + " = " + repr(self.args[arg]))
for key in ('once', 'init', 'shared', 'body', 'cleanup'):
block = self.get_block_list(key)
if key != 'body':
list.append("<%%%s>" % key)
list.append("# body code begin")
iter = Compiler.BlockFlyweightIterator(block)
for code in iter:
if key != 'body':
list.append("</%%%s>\n" % key)
list.append("# body code end\n")
for blockname, blocks in self.named_blocks.iteritems():
for key, value in self.def_blocks.iteritems():
ind = indent + "\t"
list.append(ind + "<%%%s %s>" % (blockname, key))
list.append(value._to_string(ind + "\t"))
list.append(ind + "</%%%s %s>" % (blockname, key))
return indent + string.join(list, "\n" + indent)
def compile(self, source, name, file, input_file = None):
"""compiles a source file.
source - a string representing the source of the file
name - a name to give to the compiled object
file - a file object that the compiled output will be streamed to."""
self.current_compile = Compiler.Compiled(compiler = self, fileid = name, name = 'top_level')
# Preprocess the source. The preprocessor routine is handed a
# reference to the entire source.
if self.python_pre_processor:
source = self.python_pre_processor(source)
except Exception, e:
raise exception.Compiler(e.args)
self.lexer.lex(source, name, self, input_file)
def start_block(self, block_type, attributes = None):
"""called by Lexer to indicate a <%block> tag """
compile = self.current_compile
if top_level_only_block.has_key(block_type) and not compile.in_main:
self.lexer.raise_syntax_error("Cannot define a %s section inside a method or subcomponent" % block_type)
if compile.start_blocks.has_key(block_type):
compile.start_blocks[block_type] = self.lexer.line_number()
if compile.in_block:
self.lexer.raise_syntax_error("Cannot nest a %s inside a %s block" % (block_type, c.in_block))
compile.in_block = block_type
def end_block(self, block_type):
"""called by Lexer to indicate a </%block> tag"""
compile = self.current_compile
if compile.in_block != block_type:
self.lexer.raise_syntax_error("End of %s encountered while in %s block" % (block_type, compile.in_block))
compile.in_block = None
def text_block(self, block, **params):
Compiler.textline.append(self.current_compile.get_block_list('body'), block, self.lexer.line_number())
# comment - discard
def doc_block(self, **params):pass
def magic_encoding_comment(self, encoding, **params):
"""A magic -*- encoding: foo -*- style comment."""
assert encoding and type(encoding) is str
def post_process_text(self, code):
if self.text_post_processor:
return self.text_post_processor(code)
except Exception, e:
raise exception.Compiler(e.args)
return code
def post_process_python(self, code):
if self.python_post_processor:
return self.python_post_processor(code)
except Exception, e:
raise exception.Compiler(e.args)
return code
def raw_block(self, **params):
# see if we have a method corresponding to this block and call it if so
if BLOCKS.has_key(params['block_type']):
method = BLOCKS[params['block_type']]
# call method dynamically
return getattr(self, method)(**params)
Compiler.codeblock.append(self.current_compile.get_block_list(params['block_type']), params['block'], self.lexer.line_number())
def variable_declaration(self, block_type, name, default, scope):
"""Inserts a variable declaration from the C<< <%args> >> section into
the component."""
if block_type != 'args':
self.lexer.raise_syntax_error("Variable Declaration called inside a %s block" % block_type)
self.current_compile.add_argument(name, default, self.lexer.line_number(), scope)
def key_value_pair(self, block_type, key, value):
try :
dict = {"flags": self.current_compile.flags, "attr": self.current_compile.attr}[block_type]
if dict.has_key(key):
self.lexer.raise_syntax_error("%s %s already defined" % (key, block_type))
dict[key] = value
if block_type == "flags" and key == "encoding":
# "The 'encoding' flag has been deprecated, "
# "use an encoding magic comment instead.",
# category=DeprecationWarning)
self.current_compile.set_encoding(eval(value, {}))
except KeyError:
exception.Compiler("key_value_pair called inside a %s block" % block_type)
def python_line(self, line):
Compiler.codesingleline.append(self.current_compile.get_body(), line, self.lexer.line_number())
def python_block(self, block, **params):
Compiler.codeblock.append(self.current_compile.get_body(), block, self.lexer.line_number())
def substitution(self, line, escape):
if escape or len(self.default_escape_flags):
dict = OrderedDict()
if escape:
escapes = re.split(r"\s*,\s*", escape)
for esc in escapes:
dict[esc] = 1
if self.default_escape_flags and not dict.has_key('n'):
for esc in self.default_escape_flags:
dict[esc] = 1
if dict.has_key('n'): del dict['n']
escapes = dict.keys()
escapes = None
Compiler.substituteline.append(self.current_compile.get_body(), line, escapes, self.lexer.line_number())
def start_component(self):
def end_component(self):
compile = self.current_compile
if len(compile.component_content_call_stack) > 0:
self.lexer.raise_syntax_error("Not enough component-with-content ending tags found")
def compiled_component(self, file):
self.generator.generate(self, self.current_compile, file)
def start_named_block(self, block_type, name, attributes):
compile = self.current_compile
# Error if defining one def or method inside another
if block_type != 'closure' and not compile.in_main:
self.lexer.raise_syntax_error("Cannot define a %s block inside a method or subcomponent" % block_type)
if re.search(r"[^.\w-]", name):
self.lexer.raise_syntax_error("Invalid %s name: %s" % (block_type, name))
# error if we have a named block with this name already
if block_type != 'closure' and compile.has_named_block(None, name):
block = compile.get_named_block(None, name)
self.lexer.raise_syntax_error("%s block %s already exists" % (block.block_type, name))
# make a new compile object, set it to be our current
newcompile = Compiler.Compiled(compiler = self, parent = self.current_compile, in_main = False, fileid = self.current_compile.fileid, name = name, block_type = block_type, flags = attributes)
if block_type != 'closure':
compile.add_named_block(block_type, name, newcompile)
Compiler.closure.append(self.current_compile.get_body(), newcompile, self.lexer.line_number())
self.current_compile = newcompile
def end_named_block(self, block_type):
# reset current compile object to be its parent
if self.current_compile.parent:
self.current_compile = self.current_compile.parent
raise exception.Compiler("end_named_block found no parent block")
def _generic_component_call(self, call, iscontent = False, isclose = False):
if isclose:
call = self.current_compile.component_content_call_stack.pop()
except IndexError:
self.lexer.raise_syntax_error("found component with content ending tag but no beginning tag")
call = string.strip(call)
(component, args) = (re.split(r",", call, 1) + [''])[0:2]
component = string.strip(component)
args = string.strip(args)
if not iscontent:
Compiler.componentcall.append(self.current_compile.get_body(), component, args, self.lexer.line_number())
if not isclose:
Compiler.componentcontentcall.append(self.current_compile.get_body(), component, args, self.lexer.line_number())
Compiler.componentcontentcallend.append(self.current_compile.get_body(), component, args, self.lexer.line_number())
def component_call(self, call):
self._generic_component_call(call, False, False)
def component_content_call(self, call):
self._generic_component_call(call, True, False)
def component_content_call_end(self):
self._generic_component_call(None, True, True)
def get_encoding(self):
return self.current_compile.get_encoding()