#!/usr/bin/python -OOOO # vim: set fileencoding=utf8 shiftwidth=4 tabstop=4 textwidth=80 foldmethod=marker : # Copyright (c) 2010, Kou Man Tong. All rights reserved. # For licensing, see LICENSE file included in the package. """ Base codec functions for bson. """ import struct import cStringIO import calendar, pytz from datetime import datetime import warnings from abc import ABCMeta, abstractmethod # {{{ Error Classes class MissingClassDefinition(ValueError): def __init__(self, class_name): super(MissingClassDefinition, self).__init__( "No class definition for class %s" % (class_name,)) # }}} # {{{ Warning Classes class MissingTimezoneWarning(RuntimeWarning): def __init__(self, *args): args = list(args) if len(args) < 1: args.append("Input datetime object has no tzinfo, assuming UTC.") super(MissingTimezoneWarning, self).__init__(*args) # }}} # {{{ Traversal Step class TraversalStep(object): def __init__(self, parent, key): self.parent = parent self.key = key # }}} # {{{ Custom Object Codec class BSONCoding(object): __metaclass__ = ABCMeta @abstractmethod def bson_encode(self): pass @abstractmethod def bson_init(self, raw_values): pass classes = {} def import_class(cls): if not issubclass(cls, BSONCoding): return global classes classes[cls.__name__] = cls def import_classes(*args): for cls in args: import_class(cls) def import_classes_from_modules(*args): for module in args: for item in module.__dict__: if hasattr(item, "__new__") and hasattr(item, "__name__"): import_class(item) def encode_object(obj, traversal_stack, generator_func): values = obj.bson_encode() class_name = obj.__class__.__name__ values["$$__CLASS_NAME__$$"] = class_name return encode_document(values, traversal_stack, obj, generator_func) def encode_object_element(name, value, traversal_stack, generator_func): return "\x03" + encode_cstring(name) + \ encode_object(value, traversal_stack, generator_func = generator_func) class _EmptyClass(object): pass def decode_object(raw_values): global classes class_name = raw_values["$$__CLASS_NAME__$$"] cls = None try: cls = classes[class_name] except KeyError, e: raise MissingClassDefinition(class_name) retval = _EmptyClass() retval.__class__ = cls retval.bson_init(raw_values) return retval # }}} # {{{ Codec Logic def encode_string(value): value = value.encode("utf8") length = len(value) return struct.pack(" 0x7fffffff: buf.write(encode_int64_element(name, value)) else: buf.write(encode_int32_element(name, value)) elif isinstance(value, long): buf.write(encode_int64_element(name, value)) def encode_document(obj, traversal_stack, traversal_parent = None, generator_func = None): buf = cStringIO.StringIO() key_iter = obj.iterkeys() if generator_func is not None: key_iter = generator_func(obj, traversal_stack) for name in key_iter: value = obj[name] traversal_stack.append(TraversalStep(traversal_parent or obj, name)) encode_value(name, value, buf, traversal_stack, generator_func) traversal_stack.pop() e_list = buf.getvalue() e_list_length = len(e_list) return struct.pack("