aboutsummaryrefslogtreecommitdiffstats
path: root/examples/complex/webscanner_helper/urldict.py
diff options
context:
space:
mode:
authoranneborcherding <55282902+anneborcherding@users.noreply.github.com>2020-05-04 10:37:13 +0200
committerGitHub <noreply@github.com>2020-05-04 10:37:13 +0200
commit7fdcbb09e6034ab1f76724965cfdf45f3d775129 (patch)
tree9adaa530173c70d374680a510402b958ad669277 /examples/complex/webscanner_helper/urldict.py
parentf4aa3ee11c01d5b8f260e57bfd7e084b7767c08e (diff)
downloadmitmproxy-7fdcbb09e6034ab1f76724965cfdf45f3d775129.tar.gz
mitmproxy-7fdcbb09e6034ab1f76724965cfdf45f3d775129.tar.bz2
mitmproxy-7fdcbb09e6034ab1f76724965cfdf45f3d775129.zip
added add-ons that enhance the performance of web application scanners. (#3961)
* added add-ons that enhance the performance of web application scanners. Co-authored-by: weichweich <14820950+weichweich@users.noreply.github.com>
Diffstat (limited to 'examples/complex/webscanner_helper/urldict.py')
-rw-r--r--examples/complex/webscanner_helper/urldict.py90
1 files changed, 90 insertions, 0 deletions
diff --git a/examples/complex/webscanner_helper/urldict.py b/examples/complex/webscanner_helper/urldict.py
new file mode 100644
index 00000000..28e6b5e6
--- /dev/null
+++ b/examples/complex/webscanner_helper/urldict.py
@@ -0,0 +1,90 @@
+import itertools
+import json
+import typing
+from collections.abc import MutableMapping
+from typing import Any, Dict, Generator, List, TextIO, Callable
+
+from mitmproxy import flowfilter
+from mitmproxy.http import HTTPFlow
+
+
+def f_id(x):
+ return x
+
+
+class URLDict(MutableMapping):
+ """Data structure to store information using filters as keys."""
+ def __init__(self):
+ self.store: Dict[flowfilter.TFilter, Any] = {}
+
+ def __getitem__(self, key, *, count=0):
+ if count:
+ ret = itertools.islice(self.get_generator(key), 0, count)
+ else:
+ ret = list(self.get_generator(key))
+
+ if ret:
+ return ret
+ else:
+ raise KeyError
+
+ def __setitem__(self, key: str, value):
+ fltr = flowfilter.parse(key)
+ if fltr:
+ self.store.__setitem__(fltr, value)
+ else:
+ raise ValueError("Not a valid filter")
+
+ def __delitem__(self, key):
+ self.store.__delitem__(key)
+
+ def __iter__(self):
+ return self.store.__iter__()
+
+ def __len__(self):
+ return self.store.__len__()
+
+ def get_generator(self, flow: HTTPFlow) -> Generator[Any, None, None]:
+
+ for fltr, value in self.store.items():
+ if flowfilter.match(fltr, flow):
+ yield value
+
+ def get(self, flow: HTTPFlow, default=None, *, count=0) -> List[Any]:
+ try:
+ return self.__getitem__(flow, count=count)
+ except KeyError:
+ return default
+
+ @classmethod
+ def _load(cls, json_obj, value_loader: Callable = f_id):
+ url_dict = cls()
+ for fltr, value in json_obj.items():
+ url_dict[fltr] = value_loader(value)
+ return url_dict
+
+ @classmethod
+ def load(cls, f: TextIO, value_loader: Callable = f_id):
+ json_obj = json.load(f)
+ return cls._load(json_obj, value_loader)
+
+ @classmethod
+ def loads(cls, json_str: str, value_loader: Callable = f_id):
+ json_obj = json.loads(json_str)
+ return cls._load(json_obj, value_loader)
+
+ def _dump(self, value_dumper: Callable = f_id) -> Dict:
+ dumped: Dict[typing.Union[flowfilter.TFilter, str], Any] = {}
+ for fltr, value in self.store.items():
+ if hasattr(fltr, 'pattern'):
+ # cast necessary for mypy
+ dumped[typing.cast(Any, fltr).pattern] = value_dumper(value)
+ else:
+ dumped[str(fltr)] = value_dumper(value)
+ return dumped
+
+ def dump(self, f: TextIO, value_dumper: Callable = f_id):
+ json.dump(self._dump(value_dumper), f)
+
+ def dumps(self, value_dumper: Callable = f_id):
+ return json.dumps(self._dump(value_dumper))