diff options
Diffstat (limited to 'tools/python/xen/xend/sxp.py')
-rw-r--r-- | tools/python/xen/xend/sxp.py | 580 |
1 files changed, 580 insertions, 0 deletions
diff --git a/tools/python/xen/xend/sxp.py b/tools/python/xen/xend/sxp.py new file mode 100644 index 0000000000..01654a2377 --- /dev/null +++ b/tools/python/xen/xend/sxp.py @@ -0,0 +1,580 @@ +#!/usr/bin/python2 +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> +""" +Input-driven parsing for s-expression (sxp) format. +Create a parser: pin = Parser(); +Then call pin.input(buf) with your input. +Call pin.input_eof() when done. +Use pin.read() to see if a value has been parsed, pin.get_val() +to get a parsed value. You can call ready and get_val at any time - +you don't have to wait until after calling input_eof. + +""" +from __future__ import generators + +import sys +import types +import errno +import string +from StringIO import StringIO + +__all__ = [ + "mime_type", + "ParseError", + "Parser", + "atomp", + "show", + "show_xml", + "elementp", + "name", + "attributes", + "attribute", + "children", + "child", + "child_at", + "child0", + "child1", + "child2", + "child3", + "child4", + "child_value", + "has_id", + "with_id", + "child_with_id", + "elements", + "parse", + ] + +mime_type = "application/sxp" + +escapes = { + 'a': '\a', + 'b': '\b', + 't': '\t', + 'n': '\n', + 'v': '\v', + 'f': '\f', + 'r': '\r', + '\\': '\\', + '\'': '\'', + '\"': '\"'} + +k_list_open = "(" +k_list_close = ")" +k_attr_open = "@" +k_eval = "!" + +escapes_rev = {} +for k in escapes: + escapes_rev[escapes[k]] = k + +class ParseError(StandardError): + + def __init__(self, parser, value): + self.parser = parser + self.value = value + + def __str__(self): + return self.value + +class ParserState: + + def __init__(self, fn, parent=None): + self.parent = parent + self.buf = '' + self.val = [] + self.delim = None + self.fn = fn + + def push(self, fn): + return ParserState(fn, parent=self) + +class Parser: + + def __init__(self): + self.error = sys.stderr + self.reset() + + def reset(self): + self.val = [] + self.eof = 0 + self.err = 0 + self.line_no = 0 + self.char_no = 0 + self.state = None + + def push_state(self, fn): + self.state = self.state.push(fn) + + def pop_state(self): + val = self.state + self.state = self.state.parent + if self.state and self.state.fn == self.state_start: + # Return to start state - produce the value. + self.val += self.state.val + self.state.val = [] + return val + + def in_class(self, c, s): + return s.find(c) >= 0 + + def in_space_class(self, c): + return self.in_class(c, ' \t\n\v\f\r') + + def is_separator(self, c): + return self.in_class(c, '{}()<>[]!;') + + def in_comment_class(self, c): + return self.in_class(c, '#') + + def in_string_quote_class(self, c): + return self.in_class(c, '"\'') + + def in_printable_class(self, c): + return self.in_class(c, string.printable) + + def set_error_stream(self, error): + self.error = error + + def has_error(self): + return self.err > 0 + + def at_eof(self): + return self.eof + + def input_eof(self): + self.eof = 1 + self.input_char(-1) + + def input(self, buf): + if not buf or len(buf) == 0: + self.input_eof() + else: + for c in buf: + self.input_char(c) + + def input_char(self, c): + if self.at_eof(): + pass + elif c == '\n': + self.line_no += 1 + self.char_no = 0 + else: + self.char_no += 1 + + if self.state is None: + self.begin_start(None) + self.state.fn(c) + + def ready(self): + return len(self.val) > 0 + + def get_val(self): + v = self.val[0] + self.val = self.val[1:] + return v + + def get_all(self): + return self.val + + def begin_start(self, c): + self.state = ParserState(self.state_start) + + def end_start(self): + self.val += self.state.val + self.pop_state() + + def state_start(self, c): + if self.at_eof(): + self.end_start() + elif self.in_space_class(c): + pass + elif self.in_comment_class(c): + self.begin_comment(c) + elif c == k_list_open: + self.begin_list(c) + elif c == k_list_close: + raise ParseError(self, "syntax error: "+c) + elif self.in_string_quote_class(c): + self.begin_string(c) + elif self.in_printable_class(c): + self.begin_atom(c) + elif c == chr(4): + # ctrl-D, EOT: end-of-text. + self.input_eof() + else: + raise ParseError(self, "invalid character: code %d" % ord(c)) + + def begin_comment(self, c): + self.push_state(self.state_comment) + self.state.buf += c + + def end_comment(self): + self.pop_state() + + def state_comment(self, c): + if c == '\n' or self.at_eof(): + self.end_comment() + else: + self.state.buf += c + + def begin_string(self, c): + self.push_state(self.state_string) + self.state.delim = c + + def end_string(self): + val = self.state.buf + self.state.parent.val.append(val) + self.pop_state() + + def state_string(self, c): + if self.at_eof(): + raise ParseError(self, "unexpected EOF") + elif c == self.state.delim: + self.end_string() + elif c == '\\': + self.push_state(self.state_escape) + else: + self.state.buf += c + + def state_escape(self, c): + if self.at_eof(): + raise ParseError(self, "unexpected EOF") + d = escapes.get(c) + if d: + self.state.parent.buf += d + self.pop_state() + elif c == 'x': + self.state.fn = self.state_hex + self.state.val = 0 + else: + self.state.fn = self.state_octal + self.state.val = 0 + self.input_char(c) + + def state_octal(self, c): + def octaldigit(c): + self.state.val *= 8 + self.state.val += ord(c) - ord('0') + self.state.buf += c + if self.state.val < 0 or self.state.val > 0xff: + raise ParseError(self, "invalid octal escape: out of range " + self.state.buf) + if len(self.state.buf) == 3: + octaldone() + + def octaldone(): + d = chr(self.state.val) + self.state.parent.buf += d + self.pop_state() + + if self.at_eof(): + raise ParseError(self, "unexpected EOF") + elif '0' <= c <= '7': + octaldigit(c) + elif len(self.buf): + octaldone() + self.input_char(c) + + def state_hex(self, c): + def hexdone(): + d = chr(self.state.val) + self.state.parent.buf += d + self.pop_state() + + def hexdigit(c, d): + self.state.val *= 16 + self.state.val += ord(c) - ord(d) + self.state.buf += c + if self.state.val < 0 or self.state.val > 0xff: + raise ParseError(self, "invalid hex escape: out of range " + self.state.buf) + if len(self.state.buf) == 2: + hexdone() + + if self.at_eof(): + raise ParseError(self, "unexpected EOF") + elif '0' <= c <= '9': + hexdigit(c, '0') + elif 'A' <= c <= 'F': + hexdigit(c, 'A') + elif 'a' <= c <= 'f': + hexdigit(c, 'a') + elif len(buf): + hexdone() + self.input_char(c) + + def begin_atom(self, c): + self.push_state(self.state_atom) + self.state.buf = c + + def end_atom(self): + val = self.state.buf + self.state.parent.val.append(val) + self.pop_state() + + def state_atom(self, c): + if self.at_eof(): + self.end_atom() + elif (self.is_separator(c) or + self.in_space_class(c) or + self.in_comment_class(c)): + self.end_atom() + self.input_char(c) + else: + self.state.buf += c + + def begin_list(self, c): + self.push_state(self.state_list) + + def end_list(self): + val = self.state.val + self.state.parent.val.append(val) + self.pop_state() + + def state_list(self, c): + if self.at_eof(): + raise ParseError(self, "unexpected EOF") + elif c == k_list_close: + self.end_list() + else: + self.state_start(c) + +def atomp(sxpr): + if sxpr.isalnum() or sxpr == '@': + return 1 + for c in sxpr: + if c in string.whitespace: return 0 + if c in '"\'\\(){}[]<>$#&%^': return 0 + if c in string.ascii_letters: continue + if c in string.digits: continue + if c in '.-_:/~': continue + return 0 + return 1 + +def show(sxpr, out=sys.stdout): + if isinstance(sxpr, types.ListType): + out.write(k_list_open) + i = 0 + for x in sxpr: + if i: out.write(' ') + show(x, out) + i += 1 + out.write(k_list_close) + elif isinstance(sxpr, types.StringType) and atomp(sxpr): + out.write(sxpr) + else: + #out.write("'" + str(sxpr) + "'") + out.write(repr(str(sxpr))) + +def show_xml(sxpr, out=sys.stdout): + if isinstance(sxpr, types.ListType): + element = name(sxpr) + out.write('<%s' % element) + for attr in attributes(sxpr): + out.write(' %s=%s' % (attr[0], attr[1])) + out.write('>') + i = 0 + for x in children(sxpr): + if i: out.write(' ') + show_xml(x, out) + i += 1 + out.write('</%s>' % element) + elif isinstance(sxpr, types.StringType) and atomp(sxpr): + out.write(sxpr) + else: + out.write(str(sxpr)) + +def elementp(sxpr, elt=None): + return (isinstance(sxpr, types.ListType) + and len(sxpr) + and (None == elt or sxpr[0] == elt)) + +def name(sxpr): + val = None + if isinstance(sxpr, types.StringType): + val = sxpr + elif isinstance(sxpr, types.ListType) and len(sxpr): + val = sxpr[0] + return val + +def attributes(sxpr): + val = [] + if isinstance(sxpr, types.ListType) and len(sxpr) > 1: + attr = sxpr[1] + if elementp(attr, k_attr_open): + val = attr[1:] + return val + +def attribute(sxpr, key, val=None): + for x in attributes(sxpr): + if x[0] == key: + val = x[1] + break + return val + +def children(sxpr, elt=None): + val = [] + if isinstance(sxpr, types.ListType) and len(sxpr) > 1: + i = 1 + x = sxpr[i] + if elementp(x, k_attr_open): + i += 1 + val = sxpr[i : ] + if elt: + def iselt(x): + return elementp(x, elt) + val = filter(iselt, val) + return val + +def child(sxpr, elt, val=None): + for x in children(sxpr): + if elementp(x, elt): + val = x + break + return val + +def child_at(sxpr, index, val=None): + kids = children(sxpr) + if len(kids) > index: + val = kids[index] + return val + +def child0(sxpr, val=None): + return child_at(sxpr, 0, val) + +def child1(sxpr, val=None): + return child_at(sxpr, 1, val) + +def child2(sxpr, val=None): + return child_at(sxpr, 2, val) + +def child3(sxpr, val=None): + return child_at(sxpr, 3, val) + +def child4(sxpr, val=None): + return child_at(sxpr, 4, val) + +def child_value(sxpr, elt, val=None): + kid = child(sxpr, elt) + if kid: + val = child_at(kid, 0, val) + return val + +def has_id(sxpr, id): + """Test if an s-expression has a given id. + """ + return attribute(sxpr, 'id') == id + +def with_id(sxpr, id, val=None): + """Find the first s-expression with a given id, at any depth. + + sxpr s-exp or list + id id + val value if not found (default None) + + return s-exp or val + """ + if isinstance(sxpr, types.ListType): + for n in sxpr: + if has_id(n, id): + val = n + break + v = with_id(n, id) + if v is None: continue + val = v + break + return val + +def child_with_id(sxpr, id, val=None): + """Find the first child with a given id. + + sxpr s-exp or list + id id + val value if not found (default None) + + return s-exp or val + """ + if isinstance(sxpr, types.ListType): + for n in sxpr: + if has_id(n, id): + val = n + break + return val + +def elements(sxpr, ctxt=None): + """Generate elements (at any depth). + Visit elements in pre-order. + Values generated are (node, context) + The context is None if there is no parent, otherwise + (index, parent, context) where index is the node's index w.r.t its parent, + and context is the parent's context. + + sxpr s-exp + + returns generator + """ + yield (sxpr, ctxt) + i = 0 + for n in children(sxpr): + if isinstance(n, types.ListType): + # Calling elements() recursively does not generate recursively, + # it just returns a generator object. So we must iterate over it. + for v in elements(n, (i, sxpr, ctxt)): + yield v + i += 1 + +def to_string(sxpr): + """Convert an sxpr to a string. + + sxpr sxpr + returns string + """ + io = StringIO() + show(sxpr, io) + io.seek(0) + val = io.getvalue() + io.close() + return val + +def from_string(str): + """Create an sxpr by parsing a string. + + str string + returns sxpr + """ + io = StringIO(str) + return parse(io) + +def parse(io): + """Completely parse all input from 'io'. + + io input file object + returns list of values, None if incomplete + raises ParseError on parse error + """ + pin = Parser() + while 1: + buf = io.readline() + pin.input(buf) + if len(buf) == 0: + break + if pin.ready(): + val = pin.get_all() + else: + val = None + return val + + +if __name__ == '__main__': + print ">main" + pin = Parser() + while 1: + buf = sys.stdin.read(1024) + #buf = sys.stdin.readline() + pin.input(buf) + while pin.ready(): + val = pin.get_val() + print + print '****** val=', val + if len(buf) == 0: + break + |