1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
import itertools
import json
import typing
from collections.abc import MutableMapping
from typing import Any, Dict, Generator, List, TextIO, Callable
from mitmproxy import flowfilter
from mitmproxy.http import HTTPFlow
def f_id(x):
return x
class URLDict(MutableMapping):
"""Data structure to store information using filters as keys."""
def __init__(self):
self.store: Dict[flowfilter.TFilter, Any] = {}
def __getitem__(self, key, *, count=0):
if count:
ret = itertools.islice(self.get_generator(key), 0, count)
else:
ret = list(self.get_generator(key))
if ret:
return ret
else:
raise KeyError
def __setitem__(self, key: str, value):
fltr = flowfilter.parse(key)
if fltr:
self.store.__setitem__(fltr, value)
else:
raise ValueError("Not a valid filter")
def __delitem__(self, key):
self.store.__delitem__(key)
def __iter__(self):
return self.store.__iter__()
def __len__(self):
return self.store.__len__()
def get_generator(self, flow: HTTPFlow) -> Generator[Any, None, None]:
for fltr, value in self.store.items():
if flowfilter.match(fltr, flow):
yield value
def get(self, flow: HTTPFlow, default=None, *, count=0) -> List[Any]:
try:
return self.__getitem__(flow, count=count)
except KeyError:
return default
@classmethod
def _load(cls, json_obj, value_loader: Callable = f_id):
url_dict = cls()
for fltr, value in json_obj.items():
url_dict[fltr] = value_loader(value)
return url_dict
@classmethod
def load(cls, f: TextIO, value_loader: Callable = f_id):
json_obj = json.load(f)
return cls._load(json_obj, value_loader)
@classmethod
def loads(cls, json_str: str, value_loader: Callable = f_id):
json_obj = json.loads(json_str)
return cls._load(json_obj, value_loader)
def _dump(self, value_dumper: Callable = f_id) -> Dict:
dumped: Dict[typing.Union[flowfilter.TFilter, str], Any] = {}
for fltr, value in self.store.items():
if hasattr(fltr, 'pattern'):
# cast necessary for mypy
dumped[typing.cast(Any, fltr).pattern] = value_dumper(value)
else:
dumped[str(fltr)] = value_dumper(value)
return dumped
def dump(self, f: TextIO, value_dumper: Callable = f_id):
json.dump(self._dump(value_dumper), f)
def dumps(self, value_dumper: Callable = f_id):
return json.dumps(self._dump(value_dumper))
|