From ccb5fd7c9981b65e8bb543076e89e381481340f7 Mon Sep 17 00:00:00 2001 From: madt1m Date: Wed, 1 Aug 2018 01:55:20 +0200 Subject: session: basic flow capture implemented --- mitmproxy/addons/session.py | 110 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 99 insertions(+), 11 deletions(-) diff --git a/mitmproxy/addons/session.py b/mitmproxy/addons/session.py index 6176bd5b..010d3616 100644 --- a/mitmproxy/addons/session.py +++ b/mitmproxy/addons/session.py @@ -9,7 +9,8 @@ import os from mitmproxy import types from mitmproxy import http from mitmproxy import ctx -from mitmproxy.exceptions import SessionLoadException +from mitmproxy.io import protobuf +from mitmproxy.exceptions import SessionLoadException, CommandError from mitmproxy.utils.data import pkg_data @@ -32,6 +33,13 @@ class SessionDB: for Sessions and handles creation, retrieving and insertion in tables. """ + content_threshold = 1000 + type_mappings = { + "body": { + "request" : 1, + "response" : 2 + } + } def __init__(self, db_path=None): """ @@ -58,6 +66,13 @@ class SessionDB: if self.tempdir: shutil.rmtree(self.tempdir) + def __contains__(self, fid): + return fid in self._get_ids() + + def _get_ids(self): + with self.con as con: + return [t[0] for t in con.execute("SELECT id FROM flow;").fetchall()] + def _load_session(self, path): if not self.is_session_db(path): raise SessionLoadException('Given path does not point to a valid Session') @@ -90,6 +105,23 @@ class SessionDB: c.close() return False + def store_flows(self, flows): + body_buf = [] + flow_buf = [] + for flow in flows: + if len(flow.request.content) > self.content_threshold: + body_buf.append((flow.id, self.type_mappings["body"]["request"], flow.request.content)) + flow.request.content = b"" + if flow.response: + if len(flow.response.content) > self.content_threshold: + body_buf.append((flow.id, self.type_mappings["body"]["response"], flow.response.content)) + flow.response.content = b"" + flow_buf.append((flow.id, protobuf.dumps(flow))) + with self.con as con: + con.executemany("INSERT OR REPLACE INTO flow VALUES(?, ?)", flow_buf) + con.executemany("INSERT INTO body VALUES(?, ?, ?)", body_buf) + + orders = [ ("t", "time"), @@ -101,12 +133,15 @@ orders = [ class Session: def __init__(self): - self.sdb = SessionDB(ctx.options.session_path) + self.dbstore = SessionDB(ctx.options.session_path) self._hot_store = [] self._view = [] + self._live_components = {} self.order = orders[0] self._flush_period = 3.0 + self._tweak_period = 0.5 self._flush_rate = 150 + self.started = False def load(self, loader): loader.add_option( @@ -118,6 +153,23 @@ class Session: "Flow sort order.", choices=list(map(lambda c: c[1], orders)) ) + loader.add_option( + "view_filter", typing.Optional[str], None, + "Limit the view to matching flows." + ) + + def running(self): + if not self.started: + self.started = True + loop = asyncio.get_event_loop() + tasks = (self._writer, self._tweaker) + loop.create_task(asyncio.gather(*(t() for t in tasks))) + + def configure(self, updated): + if "view_order" in updated: + self.set_order(ctx.options.view_order) + if "view_filter" in updated: + self.set_filter(ctx.options.view_filter) def _generate_order(self, f: http.HTTPFlow) -> typing.Union[str, int, float]: o = self.order @@ -135,6 +187,12 @@ class Session: s += len(f.response.raw_content) return s + def set_order(self, order: str) -> None: + pass + + def set_filter(self, filt: str) -> None: + pass + async def _writer(self): while True: await asyncio.sleep(self._flush_period) @@ -144,22 +202,52 @@ class Session: tof.append(self._hot_store.pop()) self.store(tof) - def store(self, flows: typing.Sequence[http.HTTPFlow]): - pass + async def _tweaker(self): + while True: + await asyncio.sleep(self._tweak_period) + if len(self._hot_store) >= self._flush_rate: + self._flush_period *= 0.9 + self._flush_rate *= 0.9 + elif len(self._hot_store) < self._flush_rate: + self._flush_period *= 1.1 + self._flush_rate *= 1.1 - def running(self): - pass + def store(self, flows: typing.Sequence[http.HTTPFlow]) -> None: + # Some live components of flows cannot be serialized, but they are needed to ensure correct functionality. + # We solve this by keeping a list of tuples which "save" those components for each flow id, eventually + # adding them back when needed. + for f in flows: + self._live_components[f.id] = ( + f.client_conn.wfile or None, + f.client_conn.rfile or None, + f.server_conn.wfile or None, + f.server_conn.rfile or None, + f.reply or None + ) + self.dbstore.store_flows(flows) + + def _base_add(self, f): + if f.id not in self._view: + o = self._generate_order(f) + self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id)) + else: + o = self._generate_order(f) + self._view = [flow for flow in self._view if flow.id != f.id] + self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id)) def add(self, flows: typing.Sequence[http.HTTPFlow]) -> None: for f in flows: - if f.id not in [f.id for f in self._hot_store] and f.id not in self.sdb: + if f.id not in [f.id for f in self._hot_store] and f.id not in self.dbstore: # Flow has to be filtered here before adding to view. Later - o = self._generate_order(f) - self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id)) self._hot_store.append(f) + self._base_add(f) - def update(self, flow): - pass + def update(self, flows: typing.Sequence[http.HTTPFlow]) -> None: + for f in flows: + if f.id in [f.id for f in self._hot_store]: + self._hot_store = [flow for flow in self._hot_store if flow.id != f.id] + self._hot_store.append(f) + self._base_add(f) def request(self, f): self.add([f]) -- cgit v1.2.3