aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormadt1m <pietrotirenna.pt@gmail.com>2018-08-01 01:55:20 +0200
committermadt1m <pietrotirenna.pt@gmail.com>2018-08-01 02:16:35 +0200
commitccb5fd7c9981b65e8bb543076e89e381481340f7 (patch)
tree9ef7e667acb9026f81c7ac3b567629ef0e01545b
parent53b85d23605409a46754f217d061f7d8a1d3cb6d (diff)
downloadmitmproxy-ccb5fd7c9981b65e8bb543076e89e381481340f7.tar.gz
mitmproxy-ccb5fd7c9981b65e8bb543076e89e381481340f7.tar.bz2
mitmproxy-ccb5fd7c9981b65e8bb543076e89e381481340f7.zip
session: basic flow capture implemented
-rw-r--r--mitmproxy/addons/session.py110
1 files changed, 99 insertions, 11 deletions
diff --git a/mitmproxy/addons/session.py b/mitmproxy/addons/session.py
index 6176bd5b..010d3616 100644
--- a/mitmproxy/addons/session.py
+++ b/mitmproxy/addons/session.py
@@ -9,7 +9,8 @@ import os
from mitmproxy import types
from mitmproxy import http
from mitmproxy import ctx
-from mitmproxy.exceptions import SessionLoadException
+from mitmproxy.io import protobuf
+from mitmproxy.exceptions import SessionLoadException, CommandError
from mitmproxy.utils.data import pkg_data
@@ -32,6 +33,13 @@ class SessionDB:
for Sessions and handles creation,
retrieving and insertion in tables.
"""
+ content_threshold = 1000
+ type_mappings = {
+ "body": {
+ "request" : 1,
+ "response" : 2
+ }
+ }
def __init__(self, db_path=None):
"""
@@ -58,6 +66,13 @@ class SessionDB:
if self.tempdir:
shutil.rmtree(self.tempdir)
+ def __contains__(self, fid):
+ return fid in self._get_ids()
+
+ def _get_ids(self):
+ with self.con as con:
+ return [t[0] for t in con.execute("SELECT id FROM flow;").fetchall()]
+
def _load_session(self, path):
if not self.is_session_db(path):
raise SessionLoadException('Given path does not point to a valid Session')
@@ -90,6 +105,23 @@ class SessionDB:
c.close()
return False
+ def store_flows(self, flows):
+ body_buf = []
+ flow_buf = []
+ for flow in flows:
+ if len(flow.request.content) > self.content_threshold:
+ body_buf.append((flow.id, self.type_mappings["body"]["request"], flow.request.content))
+ flow.request.content = b""
+ if flow.response:
+ if len(flow.response.content) > self.content_threshold:
+ body_buf.append((flow.id, self.type_mappings["body"]["response"], flow.response.content))
+ flow.response.content = b""
+ flow_buf.append((flow.id, protobuf.dumps(flow)))
+ with self.con as con:
+ con.executemany("INSERT OR REPLACE INTO flow VALUES(?, ?)", flow_buf)
+ con.executemany("INSERT INTO body VALUES(?, ?, ?)", body_buf)
+
+
orders = [
("t", "time"),
@@ -101,12 +133,15 @@ orders = [
class Session:
def __init__(self):
- self.sdb = SessionDB(ctx.options.session_path)
+ self.dbstore = SessionDB(ctx.options.session_path)
self._hot_store = []
self._view = []
+ self._live_components = {}
self.order = orders[0]
self._flush_period = 3.0
+ self._tweak_period = 0.5
self._flush_rate = 150
+ self.started = False
def load(self, loader):
loader.add_option(
@@ -118,6 +153,23 @@ class Session:
"Flow sort order.",
choices=list(map(lambda c: c[1], orders))
)
+ loader.add_option(
+ "view_filter", typing.Optional[str], None,
+ "Limit the view to matching flows."
+ )
+
+ def running(self):
+ if not self.started:
+ self.started = True
+ loop = asyncio.get_event_loop()
+ tasks = (self._writer, self._tweaker)
+ loop.create_task(asyncio.gather(*(t() for t in tasks)))
+
+ def configure(self, updated):
+ if "view_order" in updated:
+ self.set_order(ctx.options.view_order)
+ if "view_filter" in updated:
+ self.set_filter(ctx.options.view_filter)
def _generate_order(self, f: http.HTTPFlow) -> typing.Union[str, int, float]:
o = self.order
@@ -135,6 +187,12 @@ class Session:
s += len(f.response.raw_content)
return s
+ def set_order(self, order: str) -> None:
+ pass
+
+ def set_filter(self, filt: str) -> None:
+ pass
+
async def _writer(self):
while True:
await asyncio.sleep(self._flush_period)
@@ -144,22 +202,52 @@ class Session:
tof.append(self._hot_store.pop())
self.store(tof)
- def store(self, flows: typing.Sequence[http.HTTPFlow]):
- pass
+ async def _tweaker(self):
+ while True:
+ await asyncio.sleep(self._tweak_period)
+ if len(self._hot_store) >= self._flush_rate:
+ self._flush_period *= 0.9
+ self._flush_rate *= 0.9
+ elif len(self._hot_store) < self._flush_rate:
+ self._flush_period *= 1.1
+ self._flush_rate *= 1.1
- def running(self):
- pass
+ def store(self, flows: typing.Sequence[http.HTTPFlow]) -> None:
+ # Some live components of flows cannot be serialized, but they are needed to ensure correct functionality.
+ # We solve this by keeping a list of tuples which "save" those components for each flow id, eventually
+ # adding them back when needed.
+ for f in flows:
+ self._live_components[f.id] = (
+ f.client_conn.wfile or None,
+ f.client_conn.rfile or None,
+ f.server_conn.wfile or None,
+ f.server_conn.rfile or None,
+ f.reply or None
+ )
+ self.dbstore.store_flows(flows)
+
+ def _base_add(self, f):
+ if f.id not in self._view:
+ o = self._generate_order(f)
+ self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id))
+ else:
+ o = self._generate_order(f)
+ self._view = [flow for flow in self._view if flow.id != f.id]
+ self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id))
def add(self, flows: typing.Sequence[http.HTTPFlow]) -> None:
for f in flows:
- if f.id not in [f.id for f in self._hot_store] and f.id not in self.sdb:
+ if f.id not in [f.id for f in self._hot_store] and f.id not in self.dbstore:
# Flow has to be filtered here before adding to view. Later
- o = self._generate_order(f)
- self._view.insert(bisect.bisect_left(KeyifyList(self._view, lambda x: x[0]), o), (o, f.id))
self._hot_store.append(f)
+ self._base_add(f)
- def update(self, flow):
- pass
+ def update(self, flows: typing.Sequence[http.HTTPFlow]) -> None:
+ for f in flows:
+ if f.id in [f.id for f in self._hot_store]:
+ self._hot_store = [flow for flow in self._hot_store if flow.id != f.id]
+ self._hot_store.append(f)
+ self._base_add(f)
def request(self, f):
self.add([f])