aboutsummaryrefslogtreecommitdiffstats
path: root/examples/complex/webscanner_helper/urldict.py
blob: 28e6b5e6a698e52293facb275e95b83f7bd226ac (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import itertools
import json
import typing
from collections.abc import MutableMapping
from typing import Any, Dict, Generator, List, TextIO, Callable

from mitmproxy import flowfilter
from mitmproxy.http import HTTPFlow


def f_id(x):
    return x


class URLDict(MutableMapping):
    """Data structure to store information using filters as keys."""
    def __init__(self):
        self.store: Dict[flowfilter.TFilter, Any] = {}

    def __getitem__(self, key, *, count=0):
        if count:
            ret = itertools.islice(self.get_generator(key), 0, count)
        else:
            ret = list(self.get_generator(key))

        if ret:
            return ret
        else:
            raise KeyError

    def __setitem__(self, key: str, value):
        fltr = flowfilter.parse(key)
        if fltr:
            self.store.__setitem__(fltr, value)
        else:
            raise ValueError("Not a valid filter")

    def __delitem__(self, key):
        self.store.__delitem__(key)

    def __iter__(self):
        return self.store.__iter__()

    def __len__(self):
        return self.store.__len__()

    def get_generator(self, flow: HTTPFlow) -> Generator[Any, None, None]:

        for fltr, value in self.store.items():
            if flowfilter.match(fltr, flow):
                yield value

    def get(self, flow: HTTPFlow, default=None, *, count=0) -> List[Any]:
        try:
            return self.__getitem__(flow, count=count)
        except KeyError:
            return default

    @classmethod
    def _load(cls, json_obj, value_loader: Callable = f_id):
        url_dict = cls()
        for fltr, value in json_obj.items():
            url_dict[fltr] = value_loader(value)
        return url_dict

    @classmethod
    def load(cls, f: TextIO, value_loader: Callable = f_id):
        json_obj = json.load(f)
        return cls._load(json_obj, value_loader)

    @classmethod
    def loads(cls, json_str: str, value_loader: Callable = f_id):
        json_obj = json.loads(json_str)
        return cls._load(json_obj, value_loader)

    def _dump(self, value_dumper: Callable = f_id) -> Dict:
        dumped: Dict[typing.Union[flowfilter.TFilter, str], Any] = {}
        for fltr, value in self.store.items():
            if hasattr(fltr, 'pattern'):
                # cast necessary for mypy
                dumped[typing.cast(Any, fltr).pattern] = value_dumper(value)
            else:
                dumped[str(fltr)] = value_dumper(value)
        return dumped

    def dump(self, f: TextIO, value_dumper: Callable = f_id):
        json.dump(self._dump(value_dumper), f)

    def dumps(self, value_dumper: Callable = f_id):
        return json.dumps(self._dump(value_dumper))