aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy/flow.py
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@nullcube.com>2011-01-25 15:02:48 +1300
committerAldo Cortesi <aldo@nullcube.com>2011-01-25 15:02:48 +1300
commit7983dbb26a023db149a6e3e91f19fc7171680534 (patch)
tree9c4b978ec02c199c60b066aab202592942ebf272 /libmproxy/flow.py
parentb8d8030abd3c5e8111d85cf7e7700a3985f8eb09 (diff)
downloadmitmproxy-7983dbb26a023db149a6e3e91f19fc7171680534.tar.gz
mitmproxy-7983dbb26a023db149a6e3e91f19fc7171680534.tar.bz2
mitmproxy-7983dbb26a023db149a6e3e91f19fc7171680534.zip
Abstract flow management out of the interactive code.
Diffstat (limited to 'libmproxy/flow.py')
-rw-r--r--libmproxy/flow.py207
1 files changed, 207 insertions, 0 deletions
diff --git a/libmproxy/flow.py b/libmproxy/flow.py
new file mode 100644
index 00000000..bdca5ebd
--- /dev/null
+++ b/libmproxy/flow.py
@@ -0,0 +1,207 @@
+"""
+ This module provides more sophisticated flow tracking. These match requests
+ with their responses, and provide filtering and interception facilities.
+"""
+import proxy, threading
+
+class ReplayConnection:
+ pass
+
+
+class ReplayThread(threading.Thread):
+ def __init__(self, flow, masterq):
+ self.flow, self.masterq = flow, masterq
+ threading.Thread.__init__(self)
+
+ def run(self):
+ try:
+ server = proxy.ServerConnection(self.flow.request)
+ response = server.read_response()
+ response.send(self.masterq)
+ except proxy.ProxyError, v:
+ err = proxy.Error(self.flow.connection, v.msg)
+ err.send(self.masterq)
+
+
+class Flow:
+ def __init__(self, connection):
+ self.connection = connection
+ self.request, self.response, self.error = None, None, None
+ self.waiting = True
+ self.intercepting = False
+ self._backup = None
+
+ def backup(self):
+ if not self._backup:
+ self._backup = [
+ self.connection.copy() if self.connection else None,
+ self.request.copy() if self.request else None,
+ self.response.copy() if self.response else None,
+ self.error.copy() if self.error else None,
+ ]
+
+ def revert(self):
+ if self._backup:
+ self.waiting = False
+ restore = [i.copy() if i else None for i in self._backup]
+ self.connection, self.request, self.response, self.error = restore
+
+ def match(self, pattern):
+ if pattern:
+ if self.response:
+ return pattern(self.response)
+ elif self.request:
+ return pattern(self.request)
+ return False
+
+ def is_replay(self):
+ return isinstance(self.connection, ReplayConnection)
+
+ def kill(self):
+ if self.intercepting:
+ if not self.request.acked:
+ self.request.kill = True
+ self.request.ack()
+ elif self.response and not self.response.acked:
+ self.response.kill = True
+ self.response.ack()
+ self.intercepting = False
+
+ def intercept(self):
+ self.intercepting = True
+
+ def accept_intercept(self):
+ if self.request:
+ if not self.request.acked:
+ self.request.ack()
+ elif self.response and not self.response.acked:
+ self.response.ack()
+ self.intercepting = False
+
+
+class State:
+ def __init__(self):
+ self.flow_map = {}
+ self.flow_list = []
+ # These are compiled filt expressions:
+ self.limit = None
+ self.intercept = None
+
+ def add_browserconnect(self, f):
+ """
+ Start a browser connection.
+ """
+ self.flow_list.insert(0, f)
+ self.flow_map[f.connection] = f
+
+ def add_request(self, req):
+ """
+ Add a request to the state. Returns the matching flow.
+ """
+ f = self.flow_map.get(req.connection)
+ if not f:
+ return False
+ f.request = req
+ return f
+
+ def add_response(self, resp):
+ """
+ Add a response to the state. Returns the matching flow.
+ """
+ f = self.flow_map.get(resp.request.connection)
+ if not f:
+ return False
+ f.response = resp
+ f.waiting = False
+ f.backup()
+ return f
+
+ def add_error(self, err):
+ """
+ Add an error response to the state. Returns the matching flow, or
+ None if there isn't one.
+ """
+ f = self.flow_map.get(err.connection)
+ if not f:
+ return None
+ f.error = err
+ f.waiting = False
+ f.backup()
+ return f
+
+ def set_limit(self, limit):
+ """
+ Limit is a compiled filter expression, or None.
+ """
+ self.limit = limit
+
+ def get_connection(self, itm):
+ if isinstance(itm, (proxy.BrowserConnection, ReplayConnection)):
+ return itm
+ elif hasattr(itm, "connection"):
+ return itm.connection
+ elif hasattr(itm, "request"):
+ return itm.request.connection
+
+ def lookup(self, itm):
+ """
+ Checks for matching connection, using a Flow, Replay Connection,
+ BrowserConnection, Request, Response or Error object. Returns None
+ if not found.
+ """
+ connection = self.get_connection(itm)
+ return self.flow_map.get(connection)
+
+ def delete_flow(self, f):
+ if not f.intercepting:
+ c = self.get_connection(f)
+ del self.flow_map[c]
+ self.flow_list.remove(f)
+ return True
+ return False
+
+ def clear(self):
+ for i in self.flow_list[:]:
+ self.delete_flow(i)
+
+ def accept_all(self):
+ for i in self.flow_list[:]:
+ i.accept_intercept()
+
+ def kill_flow(self, f):
+ f.kill()
+ self.delete_flow(f)
+
+ def revert(self, f):
+ """
+ Replaces the matching connection object with a ReplayConnection object.
+ """
+ conn = self.get_connection(f)
+ del self.flow_map[conn]
+ f.revert()
+ self.flow_map[f.connection] = f
+
+ def replay(self, f, masterq):
+ """
+ Replaces the matching connection object with a ReplayConnection object.
+
+ Returns None if successful, or error message if not.
+ """
+ #begin nocover
+ if f.intercepting:
+ return "Can't replay while intercepting..."
+ if f.request:
+ f.backup()
+ conn = self.get_connection(f)
+ del self.flow_map[conn]
+ rp = ReplayConnection()
+ f.connection = rp
+ f.request.connection = rp
+ if f.request.content:
+ f.request.headers["content-length"] = [str(len(f.request.content))]
+ f.response = None
+ f.error = None
+ self.flow_map[rp] = f
+ rt = ReplayThread(f, masterq)
+ rt.start()
+ #end nocover