diff options
author | madt1m <pietrotirenna.pt@gmail.com> | 2018-08-05 21:49:54 +0200 |
---|---|---|
committer | madt1m <pietrotirenna.pt@gmail.com> | 2018-08-05 21:57:55 +0200 |
commit | a52451900c71f48bb51d777522424d8ba6944f0b (patch) | |
tree | a0c5c4bf2f6627349b5a23477ec5e11d017e6100 /mitmproxy | |
parent | e9c2b12dabddd8d5b26db7f877eb982859274263 (diff) | |
download | mitmproxy-a52451900c71f48bb51d777522424d8ba6944f0b.tar.gz mitmproxy-a52451900c71f48bb51d777522424d8ba6944f0b.tar.bz2 mitmproxy-a52451900c71f48bb51d777522424d8ba6944f0b.zip |
session: implemented changes requested after PR review.
Diffstat (limited to 'mitmproxy')
-rw-r--r-- | mitmproxy/addons/session.py | 105 |
1 files changed, 51 insertions, 54 deletions
diff --git a/mitmproxy/addons/session.py b/mitmproxy/addons/session.py index 2e4d2147..63e382ec 100644 --- a/mitmproxy/addons/session.py +++ b/mitmproxy/addons/session.py @@ -50,12 +50,13 @@ class SessionDB: or create a new one with optional path. :param db_path: """ - self.live_components = {} - self.tempdir = None - self.con = None + self.live_components: typing.Dict[str, tuple] = {} + self.tempdir: tempfile.TemporaryDirectory = None + self.con: sqlite3.Connection = None # This is used for fast look-ups over bodies already dumped to database. # This permits to enforce one-to-one relationship between flow and body table. - self.body_ledger = set() + self.body_ledger: typing.Set[str] = set() + self.id_ledger: typing.Set[str] = set() if db_path is not None and os.path.isfile(db_path): self._load_session(db_path) else: @@ -74,14 +75,10 @@ class SessionDB: shutil.rmtree(self.tempdir) def __contains__(self, fid): - return any([fid == i for i in self._get_ids()]) + return fid in self.id_ledger def __len__(self): - ln = self.con.execute("SELECT COUNT(*) FROM flow;").fetchall()[0] - return ln[0] if ln else 0 - - def _get_ids(self): - return [t[0] for t in self.con.execute("SELECT id FROM flow;").fetchall()] + return len(self.id_ledger) def _load_session(self, path): if not self.is_session_db(path): @@ -150,6 +147,7 @@ class SessionDB: body_buf = [] flow_buf = [] for flow in flows: + self.id_ledger.add(flow.id) self._disassemble(flow) f = copy.copy(flow) f.request = copy.deepcopy(flow.request) @@ -209,17 +207,21 @@ orders = [ class Session: + + _FP_RATE = 150 + _FP_DECREMENT = 0.9 + _FP_DEFAULT = 3.0 + def __init__(self): - self.db_store = None - self._hot_store = collections.OrderedDict() - self._live_components = {} - self._view = [] - self.order = orders[0] + self.db_store: SessionDB = None + self._hot_store: collections.OrderedDict = collections.OrderedDict() + self._order_store: typing.Dict[str, typing.Dict[str, typing.Union[int, float, str]]] = {} + self._view: typing.List[typing.Tuple[typing.Union[int, float, str], str]] = [] + self.order: str = orders[0] self.filter = matchall - self._flush_period = 3.0 - self._tweak_period = 0.5 - self._flush_rate = 150 - self.started = False + self._flush_period: float = self._FP_DEFAULT + self._flush_rate: int = self._FP_RATE + self.started: bool = False def load(self, loader): loader.add_option( @@ -242,7 +244,6 @@ class Session: self.db_store = SessionDB(ctx.options.session_path) loop = asyncio.get_event_loop() loop.create_task(self._writer()) - loop.create_task(self._tweaker()) def configure(self, updated): if "view_order" in updated: @@ -253,28 +254,23 @@ class Session: async def _writer(self): while True: await asyncio.sleep(self._flush_period) - tof = [] - to_dump = min(self._flush_rate, len(self._hot_store)) - for _ in range(to_dump): - tof.append(self._hot_store.popitem(last=False)[1]) - self.db_store.store_flows(tof) - - async def _tweaker(self): - while True: - await asyncio.sleep(self._tweak_period) - if len(self._hot_store) >= 3 * self._flush_rate: - self._flush_period *= 0.9 - self._flush_rate *= 1.1 - elif len(self._hot_store) < self._flush_rate: - self._flush_period *= 1.1 - self._flush_rate *= 0.9 - - def load_view(self): + batches = -(-len(self._hot_store) // self._flush_rate) + self._flush_period = self._flush_period * self._FP_DECREMENT if batches > 1 else self._FP_DEFAULT + while batches: + tof = [] + to_dump = min(len(self._hot_store), self._flush_rate) + for _ in range(to_dump): + tof.append(self._hot_store.popitem(last=False)[1]) + self.db_store.store_flows(tof) + batches -= 1 + await asyncio.sleep(0.01) + + def load_view(self) -> typing.Sequence[http.HTTPFlow]: ids = [fid for _, fid in self._view] flows = self.load_storage(ids) - return sorted(flows, key=lambda f: self._generate_order(f)) + return sorted(flows, key=lambda f: self._generate_order(self.order, f)) - def load_storage(self, ids=None): + def load_storage(self, ids=None) -> typing.Sequence[http.HTTPFlow]: flows = [] ids_from_store = [] if ids is not None: @@ -284,8 +280,6 @@ class Session: flows.append(self._hot_store[fid]) elif fid in self.db_store: ids_from_store.append(fid) - else: - flows.append(None) flows += self.db_store.retrieve_flows(ids_from_store) else: for flow in self._hot_store.values(): @@ -300,15 +294,15 @@ class Session: self._hot_store.clear() self._view = [] - def store_count(self): + def store_count(self) -> int: ln = 0 for fid in self._hot_store.keys(): if fid not in self.db_store: ln += 1 return ln + len(self.db_store) - def _generate_order(self, f: http.HTTPFlow) -> typing.Optional[typing.Union[str, int, float]]: - o = self.order + @staticmethod + def _generate_order(o: str, f: http.HTTPFlow) -> typing.Optional[typing.Union[str, int, float]]: if o == "time": return f.request.timestamp_start or 0 if o == "method": @@ -324,6 +318,11 @@ class Session: return s return None + def _store_order(self, f: http.HTTPFlow): + self._order_store[f.id] = {} + for order in orders: + self._order_store[f.id][order] = self._generate_order(order, f) + def set_order(self, order: str) -> None: if order not in orders: raise CommandError( @@ -332,7 +331,7 @@ class Session: if order != self.order: self.order = order newview = [ - (self._generate_order(f), f.id) for f in self.load_view() + (self._order_store[t[1]][order], t[1]) for t in self._view ] self._view = sorted(newview) @@ -341,7 +340,7 @@ class Session: flows = self.load_storage() for f in flows: if self.filter(f): - self._base_add(f) + self.update_view(f) def set_filter(self, input_filter: typing.Optional[str]) -> None: filt = matchall if not input_filter else flowfilter.parse(input_filter) @@ -352,22 +351,20 @@ class Session: self.filter = filt self._refilter() - def _base_add(self, f): - if not any([f.id == t[1] for t in self._view]): - o = self._generate_order(f) - self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id)) - else: - o = self._generate_order(f) + def update_view(self, f): + if any([f.id == t[1] for t in self._view]): self._view = [(order, fid) for order, fid in self._view if fid != f.id] - self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id)) + o = self._order_store[f.id][self.order] + self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id)) def update(self, flows: typing.Sequence[http.HTTPFlow]) -> None: for f in flows: + self._store_order(f) if f.id in self._hot_store: self._hot_store.pop(f.id) self._hot_store[f.id] = f if self.filter(f): - self._base_add(f) + self.update_view(f) def request(self, f): self.update([f]) |