aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy
diff options
context:
space:
mode:
authorAldo Cortesi <aldo@corte.si>2016-05-29 20:03:40 +1200
committerAldo Cortesi <aldo@corte.si>2016-05-29 20:03:40 +1200
commitf3bee6f24539005e0916e786805e1655bb7f80f8 (patch)
treeefa23c05738aae1aeb396b10fd012151d66d555c /mitmproxy
parent85aa5da6e3b7d811d316ed692ecd40e5442abe02 (diff)
parent0176f50e4f4994be4b19be212f3a3db053a18d0c (diff)
downloadmitmproxy-f3bee6f24539005e0916e786805e1655bb7f80f8.tar.gz
mitmproxy-f3bee6f24539005e0916e786805e1655bb7f80f8.tar.bz2
mitmproxy-f3bee6f24539005e0916e786805e1655bb7f80f8.zip
Merge pull request #1172 from cortesi/solidcore
First steps to solidifying the core
Diffstat (limited to 'mitmproxy')
-rw-r--r--mitmproxy/console/__init__.py29
-rw-r--r--mitmproxy/controller.py162
-rw-r--r--mitmproxy/dump.py17
-rw-r--r--mitmproxy/flow.py81
-rw-r--r--mitmproxy/models/flow.py2
-rw-r--r--mitmproxy/proxy/root_context.py1
-rw-r--r--mitmproxy/web/__init__.py14
7 files changed, 188 insertions, 118 deletions
diff --git a/mitmproxy/console/__init__.py b/mitmproxy/console/__init__.py
index 1dd032be..c3157292 100644
--- a/mitmproxy/console/__init__.py
+++ b/mitmproxy/console/__init__.py
@@ -16,7 +16,7 @@ import weakref
from netlib import tcp
-from .. import flow, script, contentviews
+from .. import flow, script, contentviews, controller
from . import flowlist, flowview, help, window, signals, options
from . import grideditor, palettes, statusbar, palettepicker
from ..exceptions import FlowReadException, ScriptException
@@ -713,14 +713,15 @@ class ConsoleMaster(flow.FlowMaster):
)
def process_flow(self, f):
- if self.state.intercept and f.match(self.state.intercept) and not f.request.is_replay:
+ should_intercept = any(
+ [
+ self.state.intercept and f.match(self.state.intercept) and not f.request.is_replay,
+ f.intercepted,
+ ]
+ )
+ if should_intercept:
f.intercept(self)
- else:
- # check if flow was intercepted within an inline script by flow.intercept()
- if f.intercepted:
- f.intercept(self)
- else:
- f.reply()
+ f.reply.take()
signals.flowlist_change.send(self)
signals.flow_change.send(self, flow = f)
@@ -728,25 +729,29 @@ class ConsoleMaster(flow.FlowMaster):
self.eventlist[:] = []
# Handlers
- def handle_error(self, f):
+ @controller.handler
+ def error(self, f):
f = flow.FlowMaster.handle_error(self, f)
if f:
self.process_flow(f)
return f
- def handle_request(self, f):
+ @controller.handler
+ def request(self, f):
f = flow.FlowMaster.handle_request(self, f)
if f:
self.process_flow(f)
return f
- def handle_response(self, f):
+ @controller.handler
+ def response(self, f):
f = flow.FlowMaster.handle_response(self, f)
if f:
self.process_flow(f)
return f
- def handle_script_change(self, script):
+ @controller.handler
+ def script_change(self, script):
if super(ConsoleMaster, self).handle_script_change(script):
signals.status_message.send(message='"{}" reloaded.'.format(script.filename))
else:
diff --git a/mitmproxy/controller.py b/mitmproxy/controller.py
index af8a77bd..dcf920ef 100644
--- a/mitmproxy/controller.py
+++ b/mitmproxy/controller.py
@@ -1,22 +1,58 @@
from __future__ import absolute_import
from six.moves import queue
import threading
+import functools
+import sys
-from .exceptions import Kill
+from . import exceptions
+Events = frozenset([
+ "clientconnect",
+ "clientdisconnect",
+ "serverconnect",
+ "serverdisconnect",
-class Master(object):
+ "tcp_open",
+ "tcp_message",
+ "tcp_error",
+ "tcp_close",
+
+ "request",
+ "response",
+ "responseheaders",
+
+ "next_layer",
+
+ "error",
+ "log",
+])
+
+class ControlError(Exception):
+ pass
+
+
+class Master(object):
"""
- The master handles mitmproxy's main event loop.
+ The master handles mitmproxy's main event loop.
"""
-
- def __init__(self):
+ def __init__(self, *servers):
self.event_queue = queue.Queue()
self.should_exit = threading.Event()
+ self.servers = []
+ for i in servers:
+ self.add_server(i)
+
+ def add_server(self, server):
+ # We give a Channel to the server which can be used to communicate with the master
+ channel = Channel(self.event_queue, self.should_exit)
+ server.set_channel(channel)
+ self.servers.append(server)
def start(self):
self.should_exit.clear()
+ for server in self.servers:
+ ServerThread(server).start()
def run(self):
self.start()
@@ -36,7 +72,17 @@ class Master(object):
# exception is thrown.
while True:
mtype, obj = self.event_queue.get(timeout=timeout)
- handle_func = getattr(self, "handle_" + mtype)
+ if mtype not in Events:
+ raise ControlError("Unknown event %s"%repr(mtype))
+ handle_func = getattr(self, mtype)
+ if not hasattr(handle_func, "func_dict"):
+ raise ControlError("Handler %s not a function"%mtype)
+ if not handle_func.func_dict.get("__handler"):
+ raise ControlError(
+ "Handler function %s is not decorated with controller.handler"%(
+ handle_func
+ )
+ )
handle_func(obj)
self.event_queue.task_done()
changed = True
@@ -45,38 +91,12 @@ class Master(object):
return changed
def shutdown(self):
- self.should_exit.set()
-
-
-class ServerMaster(Master):
-
- """
- The ServerMaster adds server thread support to the master.
- """
-
- def __init__(self):
- super(ServerMaster, self).__init__()
- self.servers = []
-
- def add_server(self, server):
- # We give a Channel to the server which can be used to communicate with the master
- channel = Channel(self.event_queue, self.should_exit)
- server.set_channel(channel)
- self.servers.append(server)
-
- def start(self):
- super(ServerMaster, self).start()
- for server in self.servers:
- ServerThread(server).start()
-
- def shutdown(self):
for server in self.servers:
server.shutdown()
- super(ServerMaster, self).shutdown()
+ self.should_exit.set()
class ServerThread(threading.Thread):
-
def __init__(self, server):
self.server = server
super(ServerThread, self).__init__()
@@ -88,12 +108,10 @@ class ServerThread(threading.Thread):
class Channel(object):
-
"""
- The only way for the proxy server to communicate with the master
- is to use the channel it has been given.
+ The only way for the proxy server to communicate with the master
+ is to use the channel it has been given.
"""
-
def __init__(self, q, should_exit):
self.q = q
self.should_exit = should_exit
@@ -104,7 +122,7 @@ class Channel(object):
master. Then wait for a response.
Raises:
- Kill: All connections should be closed immediately.
+ exceptions.Kill: All connections should be closed immediately.
"""
m.reply = Reply(m)
self.q.put((mtype, m))
@@ -114,11 +132,10 @@ class Channel(object):
g = m.reply.q.get(timeout=0.5)
except queue.Empty: # pragma: no cover
continue
- if g == Kill:
- raise Kill()
+ if g == exceptions.Kill:
+ raise exceptions.Kill()
return g
-
- raise Kill()
+ raise exceptions.Kill()
def tell(self, mtype, m):
"""
@@ -130,14 +147,17 @@ class Channel(object):
class DummyReply(object):
-
"""
A reply object that does nothing. Useful when we need an object to seem
like it has a channel, and during testing.
"""
-
def __init__(self):
self.acked = False
+ self.taken = False
+ self.handled = False
+
+ def take(self):
+ self.taken = True
def __call__(self, msg=False):
self.acked = True
@@ -147,23 +167,63 @@ class DummyReply(object):
NO_REPLY = object()
-class Reply(object):
+def handler(f):
+ @functools.wraps(f)
+ def wrapper(*args, **kwargs):
+ if len(args) == 1:
+ message = args[0]
+ elif len(args) == 2:
+ message = args[1]
+ else:
+ raise ControlError("Handler takes one argument: a message")
+
+ if not hasattr(message, "reply"):
+ raise ControlError("Message %s has no reply attribute"%message)
+
+ handling = False
+ # We're the first handler - ack responsibility is ours
+ if not message.reply.handled:
+ handling = True
+ message.reply.handled = True
+
+ ret = f(*args, **kwargs)
+ if handling and not message.reply.acked and not message.reply.taken:
+ message.reply()
+ return ret
+ wrapper.func_dict["__handler"] = True
+ return wrapper
+
+
+class Reply(object):
"""
Messages sent through a channel are decorated with a "reply" attribute.
This object is used to respond to the message through the return
channel.
"""
-
def __init__(self, obj):
self.obj = obj
self.q = queue.Queue()
+ # Has this message been acked?
self.acked = False
+ # Has the user taken responsibility for ack-ing?
+ self.taken = False
+ # Has a handler taken responsibility for ack-ing?
+ self.handled = False
+
+ def take(self):
+ self.taken = True
def __call__(self, msg=NO_REPLY):
+ if self.acked:
+ raise ControlError("Message already acked.")
+ self.acked = True
+ if msg is NO_REPLY:
+ self.q.put(self.obj)
+ else:
+ self.q.put(msg)
+
+ def __del__(self):
if not self.acked:
- self.acked = True
- if msg is NO_REPLY:
- self.q.put(self.obj)
- else:
- self.q.put(msg)
+ # This will be ignored by the interpreter, but emit a warning
+ raise ControlError("Un-acked message")
diff --git a/mitmproxy/dump.py b/mitmproxy/dump.py
index 8f9488be..4443995a 100644
--- a/mitmproxy/dump.py
+++ b/mitmproxy/dump.py
@@ -6,7 +6,7 @@ import itertools
from netlib import tcp
from netlib.utils import bytes_to_escaped_str, pretty_size
-from . import flow, filt, contentviews
+from . import flow, filt, contentviews, controller
from .exceptions import ContentViewException, FlowReadException, ScriptException
@@ -325,22 +325,25 @@ class DumpMaster(flow.FlowMaster):
self.echo_flow(f)
- def handle_request(self, f):
- flow.FlowMaster.handle_request(self, f)
+ @controller.handler
+ def request(self, f):
+ flow.FlowMaster.request(self, f)
self.state.delete_flow(f)
if f:
f.reply()
return f
- def handle_response(self, f):
- flow.FlowMaster.handle_response(self, f)
+ @controller.handler
+ def response(self, f):
+ flow.FlowMaster.response(self, f)
if f:
f.reply()
self._process_flow(f)
return f
- def handle_error(self, f):
- flow.FlowMaster.handle_error(self, f)
+ @controller.handler
+ def error(self, f):
+ flow.FlowMaster.error(self, f)
if f:
self._process_flow(f)
return f
diff --git a/mitmproxy/flow.py b/mitmproxy/flow.py
index d70ec2d9..407f0d7b 100644
--- a/mitmproxy/flow.py
+++ b/mitmproxy/flow.py
@@ -208,9 +208,9 @@ class ClientPlaybackState:
master.replay_request(self.current)
else:
self.current.reply = controller.DummyReply()
- master.handle_request(self.current)
+ master.request(self.current)
if self.current.response:
- master.handle_response(self.current)
+ master.response(self.current)
class ServerPlaybackState:
@@ -546,7 +546,8 @@ class FlowStore(FlowList):
def kill_all(self, master):
for f in self._list:
- f.kill(master)
+ if not f.reply.acked:
+ f.kill(master)
class State(object):
@@ -637,7 +638,7 @@ class State(object):
self.flows.kill_all(master)
-class FlowMaster(controller.ServerMaster):
+class FlowMaster(controller.Master):
@property
def server(self):
@@ -893,23 +894,23 @@ class FlowMaster(controller.ServerMaster):
f.reply = controller.DummyReply()
if f.request:
- self.handle_request(f)
+ self.request(f)
if f.response:
- self.handle_responseheaders(f)
- self.handle_response(f)
+ self.responseheaders(f)
+ self.response(f)
if f.error:
- self.handle_error(f)
+ self.error(f)
elif isinstance(f, TCPFlow):
messages = f.messages
f.messages = []
f.reply = controller.DummyReply()
- self.handle_tcp_open(f)
+ self.tcp_open(f)
while messages:
f.messages.append(messages.pop(0))
- self.handle_tcp_message(f)
+ self.tcp_message(f)
if f.error:
- self.handle_tcp_error(f)
- self.handle_tcp_close(f)
+ self.tcp_error(f)
+ self.tcp_close(f)
else:
raise NotImplementedError()
@@ -985,39 +986,40 @@ class FlowMaster(controller.ServerMaster):
if block:
rt.join()
- def handle_log(self, l):
+ @controller.handler
+ def log(self, l):
self.add_event(l.msg, l.level)
- l.reply()
- def handle_clientconnect(self, root_layer):
+ @controller.handler
+ def clientconnect(self, root_layer):
self.run_script_hook("clientconnect", root_layer)
- root_layer.reply()
- def handle_clientdisconnect(self, root_layer):
+ @controller.handler
+ def clientdisconnect(self, root_layer):
self.run_script_hook("clientdisconnect", root_layer)
- root_layer.reply()
- def handle_serverconnect(self, server_conn):
+ @controller.handler
+ def serverconnect(self, server_conn):
self.run_script_hook("serverconnect", server_conn)
- server_conn.reply()
- def handle_serverdisconnect(self, server_conn):
+ @controller.handler
+ def serverdisconnect(self, server_conn):
self.run_script_hook("serverdisconnect", server_conn)
- server_conn.reply()
- def handle_next_layer(self, top_layer):
+ @controller.handler
+ def next_layer(self, top_layer):
self.run_script_hook("next_layer", top_layer)
- top_layer.reply()
- def handle_error(self, f):
+ @controller.handler
+ def error(self, f):
self.state.update_flow(f)
self.run_script_hook("error", f)
if self.client_playback:
self.client_playback.clear(f)
- f.reply()
return f
- def handle_request(self, f):
+ @controller.handler
+ def request(self, f):
if f.live:
app = self.apps.get(f.request)
if app:
@@ -1039,20 +1041,19 @@ class FlowMaster(controller.ServerMaster):
self.run_script_hook("request", f)
return f
- def handle_responseheaders(self, f):
+ @controller.handler
+ def responseheaders(self, f):
try:
if self.stream_large_bodies:
self.stream_large_bodies.run(f, False)
except HttpException:
f.reply(Kill)
return
-
self.run_script_hook("responseheaders", f)
-
- f.reply()
return f
- def handle_response(self, f):
+ @controller.handler
+ def response(self, f):
self.active_flows.discard(f)
self.state.update_flow(f)
self.replacehooks.run(f)
@@ -1099,14 +1100,15 @@ class FlowMaster(controller.ServerMaster):
self.add_event('"{}" reloaded.'.format(s.filename), 'info')
return ok
- def handle_tcp_open(self, flow):
+ @controller.handler
+ def tcp_open(self, flow):
# TODO: This would break mitmproxy currently.
# self.state.add_flow(flow)
self.active_flows.add(flow)
self.run_script_hook("tcp_open", flow)
- flow.reply()
- def handle_tcp_message(self, flow):
+ @controller.handler
+ def tcp_message(self, flow):
self.run_script_hook("tcp_message", flow)
message = flow.messages[-1]
direction = "->" if message.from_client else "<-"
@@ -1116,22 +1118,21 @@ class FlowMaster(controller.ServerMaster):
direction=direction,
), "info")
self.add_event(clean_bin(message.content), "debug")
- flow.reply()
- def handle_tcp_error(self, flow):
+ @controller.handler
+ def tcp_error(self, flow):
self.add_event("Error in TCP connection to {}: {}".format(
repr(flow.server_conn.address),
flow.error
), "info")
self.run_script_hook("tcp_error", flow)
- flow.reply()
- def handle_tcp_close(self, flow):
+ @controller.handler
+ def tcp_close(self, flow):
self.active_flows.discard(flow)
if self.stream:
self.stream.add(flow)
self.run_script_hook("tcp_close", flow)
- flow.reply()
def shutdown(self):
super(FlowMaster, self).shutdown()
diff --git a/mitmproxy/models/flow.py b/mitmproxy/models/flow.py
index 1019c9fb..8797fcd8 100644
--- a/mitmproxy/models/flow.py
+++ b/mitmproxy/models/flow.py
@@ -152,7 +152,7 @@ class Flow(stateobject.StateObject):
self.error = Error("Connection killed")
self.intercepted = False
self.reply(Kill)
- master.handle_error(self)
+ master.error(self)
def intercept(self, master):
"""
diff --git a/mitmproxy/proxy/root_context.py b/mitmproxy/proxy/root_context.py
index 96e7aab6..9b4e2963 100644
--- a/mitmproxy/proxy/root_context.py
+++ b/mitmproxy/proxy/root_context.py
@@ -132,7 +132,6 @@ class RootContext(object):
class Log(object):
-
def __init__(self, msg, level="info"):
self.msg = msg
self.level = level
diff --git a/mitmproxy/web/__init__.py b/mitmproxy/web/__init__.py
index 956d221d..f8102ed8 100644
--- a/mitmproxy/web/__init__.py
+++ b/mitmproxy/web/__init__.py
@@ -6,7 +6,7 @@ import sys
from netlib.http import authentication
-from .. import flow
+from .. import flow, controller
from ..exceptions import FlowReadException
from . import app
@@ -194,18 +194,20 @@ class WebMaster(flow.FlowMaster):
if self.state.intercept and self.state.intercept(
f) and not f.request.is_replay:
f.intercept(self)
- else:
- f.reply()
+ f.reply.take()
- def handle_request(self, f):
+ @controller.handler
+ def request(self, f):
super(WebMaster, self).handle_request(f)
self._process_flow(f)
- def handle_response(self, f):
+ @controller.handler
+ def response(self, f):
super(WebMaster, self).handle_response(f)
self._process_flow(f)
- def handle_error(self, f):
+ @controller.handler
+ def error(self, f):
super(WebMaster, self).handle_error(f)
self._process_flow(f)