From a839d2ee2a5c668be1d5b2198f89bf44c6c7c78b Mon Sep 17 00:00:00 2001 From: madt1m Date: Wed, 1 Aug 2018 12:00:28 +0200 Subject: session: implemented filter and refilter. Ready for testing implementation --- mitmproxy/addons/session.py | 88 ++++++++++++++++++++++++++++--------- mitmproxy/io/sql/session_create.sql | 2 + 2 files changed, 69 insertions(+), 21 deletions(-) diff --git a/mitmproxy/addons/session.py b/mitmproxy/addons/session.py index 010d3616..c08097ee 100644 --- a/mitmproxy/addons/session.py +++ b/mitmproxy/addons/session.py @@ -6,6 +6,7 @@ import shutil import sqlite3 import os +from mitmproxy import flowfilter from mitmproxy import types from mitmproxy import http from mitmproxy import ctx @@ -36,8 +37,8 @@ class SessionDB: content_threshold = 1000 type_mappings = { "body": { - "request" : 1, - "response" : 2 + 1: "request", + 2: "response" } } @@ -49,6 +50,9 @@ class SessionDB: """ self.tempdir = None self.con = None + # This is used for fast look-ups over bodies already dumped to database. + # This permits to enforce one-to-one relationship between flow and body table. + self.body_ledger = set() if db_path is not None and os.path.isfile(db_path): self._load_session(db_path) else: @@ -91,6 +95,7 @@ class SessionDB: is a valid Session SQLite DB. :return: True if valid, False if invalid. """ + c = None try: c = sqlite3.connect(f'file:{path}?mode=rw', uri=True) cursor = c.cursor() @@ -100,7 +105,7 @@ class SessionDB: if all(elem in rows for elem in tables): c.close() return True - except: + except sqlite3.Error: if c: c.close() return False @@ -110,18 +115,42 @@ class SessionDB: flow_buf = [] for flow in flows: if len(flow.request.content) > self.content_threshold: - body_buf.append((flow.id, self.type_mappings["body"]["request"], flow.request.content)) + body_buf.append((flow.id, self.type_mappings["body"][1], flow.request.content)) flow.request.content = b"" - if flow.response: + self.body_ledger.add(flow.id) + if flow.response and flow.id not in self.body_ledger: if len(flow.response.content) > self.content_threshold: - body_buf.append((flow.id, self.type_mappings["body"]["response"], flow.response.content)) + body_buf.append((flow.id, self.type_mappings["body"][2], flow.response.content)) flow.response.content = b"" flow_buf.append((flow.id, protobuf.dumps(flow))) with self.con as con: con.executemany("INSERT OR REPLACE INTO flow VALUES(?, ?)", flow_buf) con.executemany("INSERT INTO body VALUES(?, ?, ?)", body_buf) - + def retrieve_flows(self, ids=None): + flows = [] + with self.con as con: + if not ids: + sql = "SELECT f.content, b.type_id, b.content " \ + "FROM flow f, body b " \ + "WHERE f.id = b.flow_id;" + rows = con.execute(sql).fetchall() + else: + sql = "SELECT f.content, b.type_id, b.content " \ + "FROM flow f, body b " \ + "WHERE f.id = b.flow_id" \ + f"AND f.id IN ({','.join(['?' for _ in range(len(ids))])})" + rows = con.execute(sql, ids).fetchall() + for row in rows: + flow = protobuf.loads(row[0]) + typ = self.type_mappings["body"][row[1]] + if typ and row[2]: + setattr(getattr(flow, typ), "content", row[2]) + flows.append(flow) + return flows + + +matchall = flowfilter.parse(".") orders = [ ("t", "time"), @@ -138,6 +167,7 @@ class Session: self._view = [] self._live_components = {} self.order = orders[0] + self.filter = matchall self._flush_period = 3.0 self._tweak_period = 0.5 self._flush_rate = 150 @@ -188,10 +218,32 @@ class Session: return s def set_order(self, order: str) -> None: - pass + if order not in orders: + raise CommandError( + "Unknown flow order: %s" % order + ) + if order != self.order: + self.order = order + newview = [ + (self._generate_order(f), f.id) for f in self.dbstore.retrieve_flows([t[0] for t in self._view]) + ] + self._view = sorted(newview) + + def _refilter(self): + self._view = [] + flows = self.dbstore.retrieve_flows() + for f in flows: + if self.filter(f): + self._base_add(f) - def set_filter(self, filt: str) -> None: - pass + def set_filter(self, input_filter: str) -> None: + filt = flowfilter.parse(input_filter) + if not filt: + raise CommandError( + "Invalid interception filter: %s" % filt + ) + self.filter = filt + self._refilter() async def _writer(self): while True: @@ -235,22 +287,16 @@ class Session: self._view = [flow for flow in self._view if flow.id != f.id] self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id)) - def add(self, flows: typing.Sequence[http.HTTPFlow]) -> None: + def update(self, flows: typing.Sequence[http.HTTPFlow]) -> None: for f in flows: - if f.id not in [f.id for f in self._hot_store] and f.id not in self.dbstore: - # Flow has to be filtered here before adding to view. Later + if self.filter(f): + if f.id in [f.id for f in self._hot_store]: + self._hot_store = [flow for flow in self._hot_store if flow.id != f.id] self._hot_store.append(f) self._base_add(f) - def update(self, flows: typing.Sequence[http.HTTPFlow]) -> None: - for f in flows: - if f.id in [f.id for f in self._hot_store]: - self._hot_store = [flow for flow in self._hot_store if flow.id != f.id] - self._hot_store.append(f) - self._base_add(f) - def request(self, f): - self.add([f]) + self.update([f]) def error(self, f): self.update([f]) diff --git a/mitmproxy/io/sql/session_create.sql b/mitmproxy/io/sql/session_create.sql index bfc98b94..b9c28c03 100644 --- a/mitmproxy/io/sql/session_create.sql +++ b/mitmproxy/io/sql/session_create.sql @@ -1,3 +1,5 @@ +PRAGMA foreign_keys = ON; + CREATE TABLE flow ( id VARCHAR(36) PRIMARY KEY, content BLOB -- cgit v1.2.3