aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/optmanager.py
diff options
context:
space:
mode:
Diffstat (limited to 'mitmproxy/optmanager.py')
-rw-r--r--mitmproxy/optmanager.py223
1 files changed, 125 insertions, 98 deletions
diff --git a/mitmproxy/optmanager.py b/mitmproxy/optmanager.py
index 9553bd32..495354f4 100644
--- a/mitmproxy/optmanager.py
+++ b/mitmproxy/optmanager.py
@@ -81,8 +81,6 @@ class _Option:
class OptManager:
"""
OptManager is the base class from which Options objects are derived.
- Note that the __init__ method of all child classes must force all
- arguments to be positional only, by including a "*" argument.
.changed is a blinker Signal that triggers whenever options are
updated. If any handler in the chain raises an exceptions.OptionsError
@@ -176,15 +174,29 @@ class OptManager:
o.reset()
self.changed.send(self._options.keys())
+ def update_known(self, **kwargs):
+ """
+ Update and set all known options from kwargs. Returns a dictionary
+ of unknown options.
+ """
+ known, unknown = {}, {}
+ for k, v in kwargs.items():
+ if k in self._options:
+ known[k] = v
+ else:
+ unknown[k] = v
+ updated = set(known.keys())
+ if updated:
+ with self.rollback(updated):
+ for k, v in known.items():
+ self._options[k].set(v)
+ self.changed.send(self, updated=updated)
+ return unknown
+
def update(self, **kwargs):
- updated = set(kwargs.keys())
- with self.rollback(updated):
- for k, v in kwargs.items():
- if k not in self._options:
- raise KeyError("No such option: %s" % k)
- self._options[k].set(v)
- self.changed.send(self, updated=updated)
- return self
+ u = self.update_known(**kwargs)
+ if u:
+ raise KeyError("Unknown options: %s" % ", ".join(u.keys()))
def setter(self, attr):
"""
@@ -222,83 +234,6 @@ class OptManager:
"""
return self._options[option].has_changed()
- def save(self, path, defaults=False):
- """
- Save to path. If the destination file exists, modify it in-place.
- """
- if os.path.exists(path) and os.path.isfile(path):
- with open(path, "r") as f:
- data = f.read()
- else:
- data = ""
- data = self.serialize(data, defaults)
- with open(path, "w") as f:
- f.write(data)
-
- def serialize(self, text, defaults=False):
- """
- Performs a round-trip serialization. If text is not None, it is
- treated as a previous serialization that should be modified
- in-place.
-
- - If "defaults" is False, only options with non-default values are
- serialized. Default values in text are preserved.
- - Unknown options in text are removed.
- - Raises OptionsError if text is invalid.
- """
- data = self._load(text)
- for k in self.keys():
- if defaults or self.has_changed(k):
- data[k] = getattr(self, k)
- for k in list(data.keys()):
- if k not in self._options:
- del data[k]
- return ruamel.yaml.round_trip_dump(data)
-
- def _load(self, text):
- if not text:
- return {}
- try:
- data = ruamel.yaml.load(text, ruamel.yaml.RoundTripLoader)
- except ruamel.yaml.error.YAMLError as v:
- snip = v.problem_mark.get_snippet()
- raise exceptions.OptionsError(
- "Config error at line %s:\n%s\n%s" %
- (v.problem_mark.line + 1, snip, v.problem)
- )
- if isinstance(data, str):
- raise exceptions.OptionsError("Config error - no keys found.")
- return data
-
- def load(self, text):
- """
- Load configuration from text, over-writing options already set in
- this object. May raise OptionsError if the config file is invalid.
- """
- data = self._load(text)
- try:
- self.update(**data)
- except KeyError as v:
- raise exceptions.OptionsError(v)
-
- def load_paths(self, *paths):
- """
- Load paths in order. Each path takes precedence over the previous
- path. Paths that don't exist are ignored, errors raise an
- OptionsError.
- """
- for p in paths:
- p = os.path.expanduser(p)
- if os.path.exists(p) and os.path.isfile(p):
- with open(p, "r") as f:
- txt = f.read()
- try:
- self.load(txt)
- except exceptions.OptionsError as e:
- raise exceptions.OptionsError(
- "Error reading %s: %s" % (p, e)
- )
-
def merge(self, opts):
"""
Merge a dict of options into this object. Options that have None
@@ -324,23 +259,33 @@ class OptManager:
options=options
)
- def set(self, spec):
+ def set(self, *spec):
+ vals = {}
+ for i in spec:
+ vals.update(self._setspec(i))
+ self.update(**vals)
+
+ def _setspec(self, spec):
+ d = {}
+
parts = spec.split("=", maxsplit=1)
if len(parts) == 1:
optname, optval = parts[0], None
else:
optname, optval = parts[0], parts[1]
+ if optname not in self._options:
+ raise exceptions.OptionsError("No such option %s" % optname)
o = self._options[optname]
if o.typespec in (str, typing.Optional[str]):
- setattr(self, optname, optval)
+ d[optname] = optval
elif o.typespec in (int, typing.Optional[int]):
if optval:
try:
optval = int(optval)
except ValueError:
raise exceptions.OptionsError("Not an integer: %s" % optval)
- setattr(self, optname, optval)
+ d[optname] = optval
elif o.typespec == bool:
if not optval or optval == "true":
v = True
@@ -350,18 +295,15 @@ class OptManager:
raise exceptions.OptionsError(
"Boolean must be \"true\", \"false\", or have the value " "omitted (a synonym for \"true\")."
)
- setattr(self, optname, v)
+ d[optname] = v
elif o.typespec == typing.Sequence[str]:
if not optval:
- setattr(self, optname, [])
+ d[optname] = []
else:
- setattr(
- self,
- optname,
- getattr(self, optname) + [optval]
- )
+ d[optname] = getattr(self, optname) + [optval]
else: # pragma: no cover
raise NotImplementedError("Unsupported option type: %s", o.typespec)
+ return d
def make_parser(self, parser, optname, metavar=None, short=None):
o = self._options[optname]
@@ -430,7 +372,7 @@ class OptManager:
raise ValueError("Unsupported option type: %s", o.typespec)
-def dump(opts):
+def dump_defaults(opts):
"""
Dumps an annotated file with all options.
"""
@@ -461,3 +403,88 @@ def dump(opts):
)
s.yaml_set_comment_before_after_key(k, before = "\n" + txt)
return ruamel.yaml.round_trip_dump(s)
+
+
+def parse(text):
+ if not text:
+ return {}
+ try:
+ data = ruamel.yaml.load(text, ruamel.yaml.RoundTripLoader)
+ except ruamel.yaml.error.YAMLError as v:
+ snip = v.problem_mark.get_snippet()
+ raise exceptions.OptionsError(
+ "Config error at line %s:\n%s\n%s" %
+ (v.problem_mark.line + 1, snip, v.problem)
+ )
+ if isinstance(data, str):
+ raise exceptions.OptionsError("Config error - no keys found.")
+ return data
+
+
+def load(opts, text):
+ """
+ Load configuration from text, over-writing options already set in
+ this object. May raise OptionsError if the config file is invalid.
+
+ Returns a dictionary of all unknown options.
+ """
+ data = parse(text)
+ return opts.update_known(**data)
+
+
+def load_paths(opts, *paths):
+ """
+ Load paths in order. Each path takes precedence over the previous
+ path. Paths that don't exist are ignored, errors raise an
+ OptionsError.
+
+ Returns a dictionary of unknown options.
+ """
+ ret = {}
+ for p in paths:
+ p = os.path.expanduser(p)
+ if os.path.exists(p) and os.path.isfile(p):
+ with open(p, "r") as f:
+ txt = f.read()
+ try:
+ ret.update(load(opts, txt))
+ except exceptions.OptionsError as e:
+ raise exceptions.OptionsError(
+ "Error reading %s: %s" % (p, e)
+ )
+ return ret
+
+
+def serialize(opts, text, defaults=False):
+ """
+ Performs a round-trip serialization. If text is not None, it is
+ treated as a previous serialization that should be modified
+ in-place.
+
+ - If "defaults" is False, only options with non-default values are
+ serialized. Default values in text are preserved.
+ - Unknown options in text are removed.
+ - Raises OptionsError if text is invalid.
+ """
+ data = parse(text)
+ for k in opts.keys():
+ if defaults or opts.has_changed(k):
+ data[k] = getattr(opts, k)
+ for k in list(data.keys()):
+ if k not in opts._options:
+ del data[k]
+ return ruamel.yaml.round_trip_dump(data)
+
+
+def save(opts, path, defaults=False):
+ """
+ Save to path. If the destination file exists, modify it in-place.
+ """
+ if os.path.exists(path) and os.path.isfile(path):
+ with open(path, "r") as f:
+ data = f.read()
+ else:
+ data = ""
+ data = serialize(opts, data, defaults)
+ with open(path, "w") as f:
+ f.write(data)