aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/optmanager.py
diff options
context:
space:
mode:
Diffstat (limited to 'mitmproxy/optmanager.py')
-rw-r--r--mitmproxy/optmanager.py308
1 files changed, 246 insertions, 62 deletions
diff --git a/mitmproxy/optmanager.py b/mitmproxy/optmanager.py
index f95ce836..8661aece 100644
--- a/mitmproxy/optmanager.py
+++ b/mitmproxy/optmanager.py
@@ -1,37 +1,84 @@
import contextlib
import blinker
import pprint
-import inspect
import copy
import functools
import weakref
import os
+import typing
+import textwrap
import ruamel.yaml
from mitmproxy import exceptions
from mitmproxy.utils import typecheck
-
"""
The base implementation for Options.
"""
+unset = object()
+
+
+class _Option:
+ __slots__ = ("name", "typespec", "value", "_default", "choices", "help")
+
+ def __init__(
+ self,
+ name: str,
+ typespec: type,
+ default: typing.Any,
+ help: str,
+ choices: typing.Optional[typing.Sequence[str]]
+ ) -> None:
+ typecheck.check_type(name, default, typespec)
+ self.name = name
+ self.typespec = typespec
+ self._default = default
+ self.value = unset
+ self.help = help
+ self.choices = choices
+
+ def __repr__(self):
+ return "{value} [{type}]".format(value=self.current(), type=self.typespec)
+
+ @property
+ def default(self):
+ return copy.deepcopy(self._default)
+
+ def current(self) -> typing.Any:
+ if self.value is unset:
+ v = self.default
+ else:
+ v = self.value
+ return copy.deepcopy(v)
+
+ def set(self, value: typing.Any) -> None:
+ typecheck.check_type(self.name, value, self.typespec)
+ self.value = value
-class _DefaultsMeta(type):
- def __new__(cls, name, bases, namespace, **kwds):
- ret = type.__new__(cls, name, bases, dict(namespace))
- defaults = {}
- for klass in reversed(inspect.getmro(ret)):
- for p in inspect.signature(klass.__init__).parameters.values():
- if p.kind in (p.KEYWORD_ONLY, p.POSITIONAL_OR_KEYWORD):
- if not p.default == p.empty:
- defaults[p.name] = p.default
- ret._defaults = defaults
- return ret
+ def reset(self) -> None:
+ self.value = unset
+ def has_changed(self) -> bool:
+ return self.value is not unset
-class OptManager(metaclass=_DefaultsMeta):
+ def __eq__(self, other) -> bool:
+ for i in self.__slots__:
+ if getattr(self, i) != getattr(other, i):
+ return False
+ return True
+
+ def __deepcopy__(self, _):
+ o = _Option(
+ self.name, self.typespec, self.default, self.help, self.choices
+ )
+ if self.has_changed():
+ o.value = self.current()
+ return o
+
+
+class OptManager:
"""
OptManager is the base class from which Options objects are derived.
Note that the __init__ method of all child classes must force all
@@ -45,33 +92,37 @@ class OptManager(metaclass=_DefaultsMeta):
Optmanager always returns a deep copy of options to ensure that
mutation doesn't change the option state inadvertently.
"""
- _initialized = False
- attributes = []
-
- def __new__(cls, *args, **kwargs):
- # Initialize instance._opts before __init__ is called.
- # This allows us to call super().__init__() last, which then sets
- # ._initialized = True as the final operation.
- instance = super().__new__(cls)
- instance.__dict__["_opts"] = {}
- return instance
-
def __init__(self):
+ self.__dict__["_options"] = {}
self.__dict__["changed"] = blinker.Signal()
self.__dict__["errored"] = blinker.Signal()
- self.__dict__["_initialized"] = True
+ self.__dict__["_processed"] = {}
+
+ def add_option(
+ self,
+ name: str,
+ typespec: type,
+ default: typing.Any,
+ help: str,
+ choices: typing.Optional[typing.Sequence[str]] = None
+ ) -> None:
+ if name in self._options:
+ raise ValueError("Option %s already exists" % name)
+ self._options[name] = _Option(name, typespec, default, help, choices)
@contextlib.contextmanager
- def rollback(self, updated):
- old = self._opts.copy()
+ def rollback(self, updated, reraise=False):
+ old = copy.deepcopy(self._options)
try:
yield
except exceptions.OptionsError as e:
# Notify error handlers
self.errored.send(self, exc=e)
# Rollback
- self.__dict__["_opts"] = old
+ self.__dict__["_options"] = old
self.changed.send(self, updated=updated)
+ if reraise:
+ raise e
def subscribe(self, func, opts):
"""
@@ -95,61 +146,51 @@ class OptManager(metaclass=_DefaultsMeta):
self.changed.connect(_call, weak=False)
def __eq__(self, other):
- return self._opts == other._opts
+ return self._options == other._options
def __copy__(self):
- return self.__class__(**self._opts)
+ o = OptManager()
+ o.__dict__["_options"] = copy.deepcopy(self._options)
+ return o
def __getattr__(self, attr):
- if attr in self._opts:
- return copy.deepcopy(self._opts[attr])
+ if attr in self._options:
+ return self._options[attr].current()
else:
raise AttributeError("No such option: %s" % attr)
def __setattr__(self, attr, value):
- if not self._initialized:
- self._typecheck(attr, value)
- self._opts[attr] = value
- return
self.update(**{attr: value})
- def _typecheck(self, attr, value):
- expected_type = typecheck.get_arg_type_from_constructor_annotation(
- type(self), attr
- )
- if expected_type is None:
- return # no type info :(
- typecheck.check_type(attr, value, expected_type)
-
def keys(self):
- return set(self._opts.keys())
+ return set(self._options.keys())
+
+ def __contains__(self, k):
+ return k in self._options
def reset(self):
"""
Restore defaults for all options.
"""
- self.update(**self._defaults)
-
- @classmethod
- def default(klass, opt):
- return copy.deepcopy(klass._defaults[opt])
+ for o in self._options.values():
+ o.reset()
def update(self, **kwargs):
updated = set(kwargs.keys())
- for k, v in kwargs.items():
- if k not in self._opts:
- raise KeyError("No such option: %s" % k)
- self._typecheck(k, v)
with self.rollback(updated):
- self._opts.update(kwargs)
+ for k, v in kwargs.items():
+ if k not in self._options:
+ raise KeyError("No such option: %s" % k)
+ self._options[k].set(v)
self.changed.send(self, updated=updated)
+ return self
def setter(self, attr):
"""
Generate a setter for a given attribute. This returns a callable
taking a single argument.
"""
- if attr not in self._opts:
+ if attr not in self._options:
raise KeyError("No such option: %s" % attr)
def setter(x):
@@ -161,19 +202,24 @@ class OptManager(metaclass=_DefaultsMeta):
Generate a toggler for a boolean attribute. This returns a callable
that takes no arguments.
"""
- if attr not in self._opts:
+ if attr not in self._options:
raise KeyError("No such option: %s" % attr)
+ o = self._options[attr]
+ if o.typespec != bool:
+ raise ValueError("Toggler can only be used with boolean options")
def toggle():
setattr(self, attr, not getattr(self, attr))
return toggle
+ def default(self, option: str) -> typing.Any:
+ return self._options[option].default
+
def has_changed(self, option):
"""
Has the option changed from the default?
"""
- if getattr(self, option) != self._defaults[option]:
- return True
+ return self._options[option].has_changed()
def save(self, path, defaults=False):
"""
@@ -204,7 +250,7 @@ class OptManager(metaclass=_DefaultsMeta):
if defaults or self.has_changed(k):
data[k] = getattr(self, k)
for k in list(data.keys()):
- if k not in self._opts:
+ if k not in self._options:
del data[k]
return ruamel.yaml.round_trip_dump(data)
@@ -268,7 +314,7 @@ class OptManager(metaclass=_DefaultsMeta):
self.update(**toset)
def __repr__(self):
- options = pprint.pformat(self._opts, indent=4).strip(" {}")
+ options = pprint.pformat(self._options, indent=4).strip(" {}")
if "\n" in options:
options = "\n " + options + "\n"
return "{mod}.{cls}({{{options}}})".format(
@@ -276,3 +322,141 @@ class OptManager(metaclass=_DefaultsMeta):
cls=type(self).__name__,
options=options
)
+
+ def set(self, spec):
+ parts = spec.split("=", maxsplit=1)
+ if len(parts) == 1:
+ optname, optval = parts[0], None
+ else:
+ optname, optval = parts[0], parts[1]
+ o = self._options[optname]
+
+ if o.typespec in (str, typing.Optional[str]):
+ setattr(self, optname, optval)
+ elif o.typespec in (int, typing.Optional[int]):
+ if optval:
+ try:
+ optval = int(optval)
+ except ValueError:
+ raise exceptions.OptionsError("Not an integer: %s" % optval)
+ setattr(self, optname, optval)
+ elif o.typespec == bool:
+ if not optval or optval == "true":
+ v = True
+ elif optval == "false":
+ v = False
+ else:
+ raise exceptions.OptionsError(
+ "Boolean must be \"true\", \"false\", or have the value " "omitted (a synonym for \"true\")."
+ )
+ setattr(self, optname, v)
+ elif o.typespec == typing.Sequence[str]:
+ if not optval:
+ setattr(self, optname, [])
+ else:
+ setattr(
+ self,
+ optname,
+ getattr(self, optname) + [optval]
+ )
+ else: # pragma: no cover
+ raise NotImplementedError("Unsupported option type: %s", o.typespec)
+
+ def make_parser(self, parser, optname, metavar=None, short=None):
+ o = self._options[optname]
+
+ def mkf(l, s):
+ l = l.replace("_", "-")
+ f = ["--%s" % l]
+ if s:
+ f.append("-" + s)
+ return f
+
+ flags = mkf(optname, short)
+
+ if o.typespec == bool:
+ g = parser.add_mutually_exclusive_group(required=False)
+ onf = mkf(optname, None)
+ offf = mkf("no-" + optname, None)
+ # The short option for a bool goes to whatever is NOT the default
+ if short:
+ if o.default:
+ offf = mkf("no-" + optname, short)
+ else:
+ onf = mkf(optname, short)
+ g.add_argument(
+ *offf,
+ action="store_false",
+ dest=optname,
+ )
+ g.add_argument(
+ *onf,
+ action="store_true",
+ dest=optname,
+ help=o.help
+ )
+ parser.set_defaults(**{optname: None})
+ elif o.typespec in (int, typing.Optional[int]):
+ parser.add_argument(
+ *flags,
+ action="store",
+ type=int,
+ dest=optname,
+ help=o.help,
+ metavar=metavar,
+ )
+ elif o.typespec in (str, typing.Optional[str]):
+ parser.add_argument(
+ *flags,
+ action="store",
+ type=str,
+ dest=optname,
+ help=o.help,
+ metavar=metavar,
+ choices=o.choices
+ )
+ elif o.typespec == typing.Sequence[str]:
+ parser.add_argument(
+ *flags,
+ action="append",
+ type=str,
+ dest=optname,
+ help=o.help + " May be passed multiple times.",
+ metavar=metavar,
+ choices=o.choices,
+ )
+ else:
+ raise ValueError("Unsupported option type: %s", o.typespec)
+
+
+def dump(opts):
+ """
+ Dumps an annotated file with all options.
+ """
+ # Sort data
+ s = ruamel.yaml.comments.CommentedMap()
+ for k in sorted(opts.keys()):
+ o = opts._options[k]
+ s[k] = o.default
+ txt = o.help.strip()
+
+ if o.choices:
+ txt += " Valid values are %s." % ", ".join(repr(c) for c in o.choices)
+ else:
+ if o.typespec in (str, int, bool):
+ t = o.typespec.__name__
+ elif o.typespec == typing.Optional[str]:
+ t = "optional str"
+ elif o.typespec == typing.Sequence[str]:
+ t = "sequence of str"
+ else: # pragma: no cover
+ raise NotImplementedError
+ txt += " Type %s." % t
+
+ txt = "\n".join(
+ textwrap.wrap(
+ textwrap.dedent(txt)
+ )
+ )
+ s.yaml_set_comment_before_after_key(k, before = "\n" + txt)
+ return ruamel.yaml.round_trip_dump(s)