diff options
Diffstat (limited to 'mitmproxy/optmanager.py')
-rw-r--r-- | mitmproxy/optmanager.py | 308 |
1 files changed, 246 insertions, 62 deletions
diff --git a/mitmproxy/optmanager.py b/mitmproxy/optmanager.py index f95ce836..8661aece 100644 --- a/mitmproxy/optmanager.py +++ b/mitmproxy/optmanager.py @@ -1,37 +1,84 @@ import contextlib import blinker import pprint -import inspect import copy import functools import weakref import os +import typing +import textwrap import ruamel.yaml from mitmproxy import exceptions from mitmproxy.utils import typecheck - """ The base implementation for Options. """ +unset = object() + + +class _Option: + __slots__ = ("name", "typespec", "value", "_default", "choices", "help") + + def __init__( + self, + name: str, + typespec: type, + default: typing.Any, + help: str, + choices: typing.Optional[typing.Sequence[str]] + ) -> None: + typecheck.check_type(name, default, typespec) + self.name = name + self.typespec = typespec + self._default = default + self.value = unset + self.help = help + self.choices = choices + + def __repr__(self): + return "{value} [{type}]".format(value=self.current(), type=self.typespec) + + @property + def default(self): + return copy.deepcopy(self._default) + + def current(self) -> typing.Any: + if self.value is unset: + v = self.default + else: + v = self.value + return copy.deepcopy(v) + + def set(self, value: typing.Any) -> None: + typecheck.check_type(self.name, value, self.typespec) + self.value = value -class _DefaultsMeta(type): - def __new__(cls, name, bases, namespace, **kwds): - ret = type.__new__(cls, name, bases, dict(namespace)) - defaults = {} - for klass in reversed(inspect.getmro(ret)): - for p in inspect.signature(klass.__init__).parameters.values(): - if p.kind in (p.KEYWORD_ONLY, p.POSITIONAL_OR_KEYWORD): - if not p.default == p.empty: - defaults[p.name] = p.default - ret._defaults = defaults - return ret + def reset(self) -> None: + self.value = unset + def has_changed(self) -> bool: + return self.value is not unset -class OptManager(metaclass=_DefaultsMeta): + def __eq__(self, other) -> bool: + for i in self.__slots__: + if getattr(self, i) != getattr(other, i): + return False + return True + + def __deepcopy__(self, _): + o = _Option( + self.name, self.typespec, self.default, self.help, self.choices + ) + if self.has_changed(): + o.value = self.current() + return o + + +class OptManager: """ OptManager is the base class from which Options objects are derived. Note that the __init__ method of all child classes must force all @@ -45,33 +92,37 @@ class OptManager(metaclass=_DefaultsMeta): Optmanager always returns a deep copy of options to ensure that mutation doesn't change the option state inadvertently. """ - _initialized = False - attributes = [] - - def __new__(cls, *args, **kwargs): - # Initialize instance._opts before __init__ is called. - # This allows us to call super().__init__() last, which then sets - # ._initialized = True as the final operation. - instance = super().__new__(cls) - instance.__dict__["_opts"] = {} - return instance - def __init__(self): + self.__dict__["_options"] = {} self.__dict__["changed"] = blinker.Signal() self.__dict__["errored"] = blinker.Signal() - self.__dict__["_initialized"] = True + self.__dict__["_processed"] = {} + + def add_option( + self, + name: str, + typespec: type, + default: typing.Any, + help: str, + choices: typing.Optional[typing.Sequence[str]] = None + ) -> None: + if name in self._options: + raise ValueError("Option %s already exists" % name) + self._options[name] = _Option(name, typespec, default, help, choices) @contextlib.contextmanager - def rollback(self, updated): - old = self._opts.copy() + def rollback(self, updated, reraise=False): + old = copy.deepcopy(self._options) try: yield except exceptions.OptionsError as e: # Notify error handlers self.errored.send(self, exc=e) # Rollback - self.__dict__["_opts"] = old + self.__dict__["_options"] = old self.changed.send(self, updated=updated) + if reraise: + raise e def subscribe(self, func, opts): """ @@ -95,61 +146,51 @@ class OptManager(metaclass=_DefaultsMeta): self.changed.connect(_call, weak=False) def __eq__(self, other): - return self._opts == other._opts + return self._options == other._options def __copy__(self): - return self.__class__(**self._opts) + o = OptManager() + o.__dict__["_options"] = copy.deepcopy(self._options) + return o def __getattr__(self, attr): - if attr in self._opts: - return copy.deepcopy(self._opts[attr]) + if attr in self._options: + return self._options[attr].current() else: raise AttributeError("No such option: %s" % attr) def __setattr__(self, attr, value): - if not self._initialized: - self._typecheck(attr, value) - self._opts[attr] = value - return self.update(**{attr: value}) - def _typecheck(self, attr, value): - expected_type = typecheck.get_arg_type_from_constructor_annotation( - type(self), attr - ) - if expected_type is None: - return # no type info :( - typecheck.check_type(attr, value, expected_type) - def keys(self): - return set(self._opts.keys()) + return set(self._options.keys()) + + def __contains__(self, k): + return k in self._options def reset(self): """ Restore defaults for all options. """ - self.update(**self._defaults) - - @classmethod - def default(klass, opt): - return copy.deepcopy(klass._defaults[opt]) + for o in self._options.values(): + o.reset() def update(self, **kwargs): updated = set(kwargs.keys()) - for k, v in kwargs.items(): - if k not in self._opts: - raise KeyError("No such option: %s" % k) - self._typecheck(k, v) with self.rollback(updated): - self._opts.update(kwargs) + for k, v in kwargs.items(): + if k not in self._options: + raise KeyError("No such option: %s" % k) + self._options[k].set(v) self.changed.send(self, updated=updated) + return self def setter(self, attr): """ Generate a setter for a given attribute. This returns a callable taking a single argument. """ - if attr not in self._opts: + if attr not in self._options: raise KeyError("No such option: %s" % attr) def setter(x): @@ -161,19 +202,24 @@ class OptManager(metaclass=_DefaultsMeta): Generate a toggler for a boolean attribute. This returns a callable that takes no arguments. """ - if attr not in self._opts: + if attr not in self._options: raise KeyError("No such option: %s" % attr) + o = self._options[attr] + if o.typespec != bool: + raise ValueError("Toggler can only be used with boolean options") def toggle(): setattr(self, attr, not getattr(self, attr)) return toggle + def default(self, option: str) -> typing.Any: + return self._options[option].default + def has_changed(self, option): """ Has the option changed from the default? """ - if getattr(self, option) != self._defaults[option]: - return True + return self._options[option].has_changed() def save(self, path, defaults=False): """ @@ -204,7 +250,7 @@ class OptManager(metaclass=_DefaultsMeta): if defaults or self.has_changed(k): data[k] = getattr(self, k) for k in list(data.keys()): - if k not in self._opts: + if k not in self._options: del data[k] return ruamel.yaml.round_trip_dump(data) @@ -268,7 +314,7 @@ class OptManager(metaclass=_DefaultsMeta): self.update(**toset) def __repr__(self): - options = pprint.pformat(self._opts, indent=4).strip(" {}") + options = pprint.pformat(self._options, indent=4).strip(" {}") if "\n" in options: options = "\n " + options + "\n" return "{mod}.{cls}({{{options}}})".format( @@ -276,3 +322,141 @@ class OptManager(metaclass=_DefaultsMeta): cls=type(self).__name__, options=options ) + + def set(self, spec): + parts = spec.split("=", maxsplit=1) + if len(parts) == 1: + optname, optval = parts[0], None + else: + optname, optval = parts[0], parts[1] + o = self._options[optname] + + if o.typespec in (str, typing.Optional[str]): + setattr(self, optname, optval) + elif o.typespec in (int, typing.Optional[int]): + if optval: + try: + optval = int(optval) + except ValueError: + raise exceptions.OptionsError("Not an integer: %s" % optval) + setattr(self, optname, optval) + elif o.typespec == bool: + if not optval or optval == "true": + v = True + elif optval == "false": + v = False + else: + raise exceptions.OptionsError( + "Boolean must be \"true\", \"false\", or have the value " "omitted (a synonym for \"true\")." + ) + setattr(self, optname, v) + elif o.typespec == typing.Sequence[str]: + if not optval: + setattr(self, optname, []) + else: + setattr( + self, + optname, + getattr(self, optname) + [optval] + ) + else: # pragma: no cover + raise NotImplementedError("Unsupported option type: %s", o.typespec) + + def make_parser(self, parser, optname, metavar=None, short=None): + o = self._options[optname] + + def mkf(l, s): + l = l.replace("_", "-") + f = ["--%s" % l] + if s: + f.append("-" + s) + return f + + flags = mkf(optname, short) + + if o.typespec == bool: + g = parser.add_mutually_exclusive_group(required=False) + onf = mkf(optname, None) + offf = mkf("no-" + optname, None) + # The short option for a bool goes to whatever is NOT the default + if short: + if o.default: + offf = mkf("no-" + optname, short) + else: + onf = mkf(optname, short) + g.add_argument( + *offf, + action="store_false", + dest=optname, + ) + g.add_argument( + *onf, + action="store_true", + dest=optname, + help=o.help + ) + parser.set_defaults(**{optname: None}) + elif o.typespec in (int, typing.Optional[int]): + parser.add_argument( + *flags, + action="store", + type=int, + dest=optname, + help=o.help, + metavar=metavar, + ) + elif o.typespec in (str, typing.Optional[str]): + parser.add_argument( + *flags, + action="store", + type=str, + dest=optname, + help=o.help, + metavar=metavar, + choices=o.choices + ) + elif o.typespec == typing.Sequence[str]: + parser.add_argument( + *flags, + action="append", + type=str, + dest=optname, + help=o.help + " May be passed multiple times.", + metavar=metavar, + choices=o.choices, + ) + else: + raise ValueError("Unsupported option type: %s", o.typespec) + + +def dump(opts): + """ + Dumps an annotated file with all options. + """ + # Sort data + s = ruamel.yaml.comments.CommentedMap() + for k in sorted(opts.keys()): + o = opts._options[k] + s[k] = o.default + txt = o.help.strip() + + if o.choices: + txt += " Valid values are %s." % ", ".join(repr(c) for c in o.choices) + else: + if o.typespec in (str, int, bool): + t = o.typespec.__name__ + elif o.typespec == typing.Optional[str]: + t = "optional str" + elif o.typespec == typing.Sequence[str]: + t = "sequence of str" + else: # pragma: no cover + raise NotImplementedError + txt += " Type %s." % t + + txt = "\n".join( + textwrap.wrap( + textwrap.dedent(txt) + ) + ) + s.yaml_set_comment_before_after_key(k, before = "\n" + txt) + return ruamel.yaml.round_trip_dump(s) |