diff options
author | Aldo Cortesi <aldo@corte.si> | 2016-05-29 20:03:40 +1200 |
---|---|---|
committer | Aldo Cortesi <aldo@corte.si> | 2016-05-29 20:03:40 +1200 |
commit | f3bee6f24539005e0916e786805e1655bb7f80f8 (patch) | |
tree | efa23c05738aae1aeb396b10fd012151d66d555c /mitmproxy | |
parent | 85aa5da6e3b7d811d316ed692ecd40e5442abe02 (diff) | |
parent | 0176f50e4f4994be4b19be212f3a3db053a18d0c (diff) | |
download | mitmproxy-f3bee6f24539005e0916e786805e1655bb7f80f8.tar.gz mitmproxy-f3bee6f24539005e0916e786805e1655bb7f80f8.tar.bz2 mitmproxy-f3bee6f24539005e0916e786805e1655bb7f80f8.zip |
Merge pull request #1172 from cortesi/solidcore
First steps to solidifying the core
Diffstat (limited to 'mitmproxy')
-rw-r--r-- | mitmproxy/console/__init__.py | 29 | ||||
-rw-r--r-- | mitmproxy/controller.py | 162 | ||||
-rw-r--r-- | mitmproxy/dump.py | 17 | ||||
-rw-r--r-- | mitmproxy/flow.py | 81 | ||||
-rw-r--r-- | mitmproxy/models/flow.py | 2 | ||||
-rw-r--r-- | mitmproxy/proxy/root_context.py | 1 | ||||
-rw-r--r-- | mitmproxy/web/__init__.py | 14 |
7 files changed, 188 insertions, 118 deletions
diff --git a/mitmproxy/console/__init__.py b/mitmproxy/console/__init__.py index 1dd032be..c3157292 100644 --- a/mitmproxy/console/__init__.py +++ b/mitmproxy/console/__init__.py @@ -16,7 +16,7 @@ import weakref from netlib import tcp -from .. import flow, script, contentviews +from .. import flow, script, contentviews, controller from . import flowlist, flowview, help, window, signals, options from . import grideditor, palettes, statusbar, palettepicker from ..exceptions import FlowReadException, ScriptException @@ -713,14 +713,15 @@ class ConsoleMaster(flow.FlowMaster): ) def process_flow(self, f): - if self.state.intercept and f.match(self.state.intercept) and not f.request.is_replay: + should_intercept = any( + [ + self.state.intercept and f.match(self.state.intercept) and not f.request.is_replay, + f.intercepted, + ] + ) + if should_intercept: f.intercept(self) - else: - # check if flow was intercepted within an inline script by flow.intercept() - if f.intercepted: - f.intercept(self) - else: - f.reply() + f.reply.take() signals.flowlist_change.send(self) signals.flow_change.send(self, flow = f) @@ -728,25 +729,29 @@ class ConsoleMaster(flow.FlowMaster): self.eventlist[:] = [] # Handlers - def handle_error(self, f): + @controller.handler + def error(self, f): f = flow.FlowMaster.handle_error(self, f) if f: self.process_flow(f) return f - def handle_request(self, f): + @controller.handler + def request(self, f): f = flow.FlowMaster.handle_request(self, f) if f: self.process_flow(f) return f - def handle_response(self, f): + @controller.handler + def response(self, f): f = flow.FlowMaster.handle_response(self, f) if f: self.process_flow(f) return f - def handle_script_change(self, script): + @controller.handler + def script_change(self, script): if super(ConsoleMaster, self).handle_script_change(script): signals.status_message.send(message='"{}" reloaded.'.format(script.filename)) else: diff --git a/mitmproxy/controller.py b/mitmproxy/controller.py index af8a77bd..dcf920ef 100644 --- a/mitmproxy/controller.py +++ b/mitmproxy/controller.py @@ -1,22 +1,58 @@ from __future__ import absolute_import from six.moves import queue import threading +import functools +import sys -from .exceptions import Kill +from . import exceptions +Events = frozenset([ + "clientconnect", + "clientdisconnect", + "serverconnect", + "serverdisconnect", -class Master(object): + "tcp_open", + "tcp_message", + "tcp_error", + "tcp_close", + + "request", + "response", + "responseheaders", + + "next_layer", + + "error", + "log", +]) + +class ControlError(Exception): + pass + + +class Master(object): """ - The master handles mitmproxy's main event loop. + The master handles mitmproxy's main event loop. """ - - def __init__(self): + def __init__(self, *servers): self.event_queue = queue.Queue() self.should_exit = threading.Event() + self.servers = [] + for i in servers: + self.add_server(i) + + def add_server(self, server): + # We give a Channel to the server which can be used to communicate with the master + channel = Channel(self.event_queue, self.should_exit) + server.set_channel(channel) + self.servers.append(server) def start(self): self.should_exit.clear() + for server in self.servers: + ServerThread(server).start() def run(self): self.start() @@ -36,7 +72,17 @@ class Master(object): # exception is thrown. while True: mtype, obj = self.event_queue.get(timeout=timeout) - handle_func = getattr(self, "handle_" + mtype) + if mtype not in Events: + raise ControlError("Unknown event %s"%repr(mtype)) + handle_func = getattr(self, mtype) + if not hasattr(handle_func, "func_dict"): + raise ControlError("Handler %s not a function"%mtype) + if not handle_func.func_dict.get("__handler"): + raise ControlError( + "Handler function %s is not decorated with controller.handler"%( + handle_func + ) + ) handle_func(obj) self.event_queue.task_done() changed = True @@ -45,38 +91,12 @@ class Master(object): return changed def shutdown(self): - self.should_exit.set() - - -class ServerMaster(Master): - - """ - The ServerMaster adds server thread support to the master. - """ - - def __init__(self): - super(ServerMaster, self).__init__() - self.servers = [] - - def add_server(self, server): - # We give a Channel to the server which can be used to communicate with the master - channel = Channel(self.event_queue, self.should_exit) - server.set_channel(channel) - self.servers.append(server) - - def start(self): - super(ServerMaster, self).start() - for server in self.servers: - ServerThread(server).start() - - def shutdown(self): for server in self.servers: server.shutdown() - super(ServerMaster, self).shutdown() + self.should_exit.set() class ServerThread(threading.Thread): - def __init__(self, server): self.server = server super(ServerThread, self).__init__() @@ -88,12 +108,10 @@ class ServerThread(threading.Thread): class Channel(object): - """ - The only way for the proxy server to communicate with the master - is to use the channel it has been given. + The only way for the proxy server to communicate with the master + is to use the channel it has been given. """ - def __init__(self, q, should_exit): self.q = q self.should_exit = should_exit @@ -104,7 +122,7 @@ class Channel(object): master. Then wait for a response. Raises: - Kill: All connections should be closed immediately. + exceptions.Kill: All connections should be closed immediately. """ m.reply = Reply(m) self.q.put((mtype, m)) @@ -114,11 +132,10 @@ class Channel(object): g = m.reply.q.get(timeout=0.5) except queue.Empty: # pragma: no cover continue - if g == Kill: - raise Kill() + if g == exceptions.Kill: + raise exceptions.Kill() return g - - raise Kill() + raise exceptions.Kill() def tell(self, mtype, m): """ @@ -130,14 +147,17 @@ class Channel(object): class DummyReply(object): - """ A reply object that does nothing. Useful when we need an object to seem like it has a channel, and during testing. """ - def __init__(self): self.acked = False + self.taken = False + self.handled = False + + def take(self): + self.taken = True def __call__(self, msg=False): self.acked = True @@ -147,23 +167,63 @@ class DummyReply(object): NO_REPLY = object() -class Reply(object): +def handler(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + if len(args) == 1: + message = args[0] + elif len(args) == 2: + message = args[1] + else: + raise ControlError("Handler takes one argument: a message") + + if not hasattr(message, "reply"): + raise ControlError("Message %s has no reply attribute"%message) + + handling = False + # We're the first handler - ack responsibility is ours + if not message.reply.handled: + handling = True + message.reply.handled = True + + ret = f(*args, **kwargs) + if handling and not message.reply.acked and not message.reply.taken: + message.reply() + return ret + wrapper.func_dict["__handler"] = True + return wrapper + + +class Reply(object): """ Messages sent through a channel are decorated with a "reply" attribute. This object is used to respond to the message through the return channel. """ - def __init__(self, obj): self.obj = obj self.q = queue.Queue() + # Has this message been acked? self.acked = False + # Has the user taken responsibility for ack-ing? + self.taken = False + # Has a handler taken responsibility for ack-ing? + self.handled = False + + def take(self): + self.taken = True def __call__(self, msg=NO_REPLY): + if self.acked: + raise ControlError("Message already acked.") + self.acked = True + if msg is NO_REPLY: + self.q.put(self.obj) + else: + self.q.put(msg) + + def __del__(self): if not self.acked: - self.acked = True - if msg is NO_REPLY: - self.q.put(self.obj) - else: - self.q.put(msg) + # This will be ignored by the interpreter, but emit a warning + raise ControlError("Un-acked message") diff --git a/mitmproxy/dump.py b/mitmproxy/dump.py index 8f9488be..4443995a 100644 --- a/mitmproxy/dump.py +++ b/mitmproxy/dump.py @@ -6,7 +6,7 @@ import itertools from netlib import tcp from netlib.utils import bytes_to_escaped_str, pretty_size -from . import flow, filt, contentviews +from . import flow, filt, contentviews, controller from .exceptions import ContentViewException, FlowReadException, ScriptException @@ -325,22 +325,25 @@ class DumpMaster(flow.FlowMaster): self.echo_flow(f) - def handle_request(self, f): - flow.FlowMaster.handle_request(self, f) + @controller.handler + def request(self, f): + flow.FlowMaster.request(self, f) self.state.delete_flow(f) if f: f.reply() return f - def handle_response(self, f): - flow.FlowMaster.handle_response(self, f) + @controller.handler + def response(self, f): + flow.FlowMaster.response(self, f) if f: f.reply() self._process_flow(f) return f - def handle_error(self, f): - flow.FlowMaster.handle_error(self, f) + @controller.handler + def error(self, f): + flow.FlowMaster.error(self, f) if f: self._process_flow(f) return f diff --git a/mitmproxy/flow.py b/mitmproxy/flow.py index d70ec2d9..407f0d7b 100644 --- a/mitmproxy/flow.py +++ b/mitmproxy/flow.py @@ -208,9 +208,9 @@ class ClientPlaybackState: master.replay_request(self.current) else: self.current.reply = controller.DummyReply() - master.handle_request(self.current) + master.request(self.current) if self.current.response: - master.handle_response(self.current) + master.response(self.current) class ServerPlaybackState: @@ -546,7 +546,8 @@ class FlowStore(FlowList): def kill_all(self, master): for f in self._list: - f.kill(master) + if not f.reply.acked: + f.kill(master) class State(object): @@ -637,7 +638,7 @@ class State(object): self.flows.kill_all(master) -class FlowMaster(controller.ServerMaster): +class FlowMaster(controller.Master): @property def server(self): @@ -893,23 +894,23 @@ class FlowMaster(controller.ServerMaster): f.reply = controller.DummyReply() if f.request: - self.handle_request(f) + self.request(f) if f.response: - self.handle_responseheaders(f) - self.handle_response(f) + self.responseheaders(f) + self.response(f) if f.error: - self.handle_error(f) + self.error(f) elif isinstance(f, TCPFlow): messages = f.messages f.messages = [] f.reply = controller.DummyReply() - self.handle_tcp_open(f) + self.tcp_open(f) while messages: f.messages.append(messages.pop(0)) - self.handle_tcp_message(f) + self.tcp_message(f) if f.error: - self.handle_tcp_error(f) - self.handle_tcp_close(f) + self.tcp_error(f) + self.tcp_close(f) else: raise NotImplementedError() @@ -985,39 +986,40 @@ class FlowMaster(controller.ServerMaster): if block: rt.join() - def handle_log(self, l): + @controller.handler + def log(self, l): self.add_event(l.msg, l.level) - l.reply() - def handle_clientconnect(self, root_layer): + @controller.handler + def clientconnect(self, root_layer): self.run_script_hook("clientconnect", root_layer) - root_layer.reply() - def handle_clientdisconnect(self, root_layer): + @controller.handler + def clientdisconnect(self, root_layer): self.run_script_hook("clientdisconnect", root_layer) - root_layer.reply() - def handle_serverconnect(self, server_conn): + @controller.handler + def serverconnect(self, server_conn): self.run_script_hook("serverconnect", server_conn) - server_conn.reply() - def handle_serverdisconnect(self, server_conn): + @controller.handler + def serverdisconnect(self, server_conn): self.run_script_hook("serverdisconnect", server_conn) - server_conn.reply() - def handle_next_layer(self, top_layer): + @controller.handler + def next_layer(self, top_layer): self.run_script_hook("next_layer", top_layer) - top_layer.reply() - def handle_error(self, f): + @controller.handler + def error(self, f): self.state.update_flow(f) self.run_script_hook("error", f) if self.client_playback: self.client_playback.clear(f) - f.reply() return f - def handle_request(self, f): + @controller.handler + def request(self, f): if f.live: app = self.apps.get(f.request) if app: @@ -1039,20 +1041,19 @@ class FlowMaster(controller.ServerMaster): self.run_script_hook("request", f) return f - def handle_responseheaders(self, f): + @controller.handler + def responseheaders(self, f): try: if self.stream_large_bodies: self.stream_large_bodies.run(f, False) except HttpException: f.reply(Kill) return - self.run_script_hook("responseheaders", f) - - f.reply() return f - def handle_response(self, f): + @controller.handler + def response(self, f): self.active_flows.discard(f) self.state.update_flow(f) self.replacehooks.run(f) @@ -1099,14 +1100,15 @@ class FlowMaster(controller.ServerMaster): self.add_event('"{}" reloaded.'.format(s.filename), 'info') return ok - def handle_tcp_open(self, flow): + @controller.handler + def tcp_open(self, flow): # TODO: This would break mitmproxy currently. # self.state.add_flow(flow) self.active_flows.add(flow) self.run_script_hook("tcp_open", flow) - flow.reply() - def handle_tcp_message(self, flow): + @controller.handler + def tcp_message(self, flow): self.run_script_hook("tcp_message", flow) message = flow.messages[-1] direction = "->" if message.from_client else "<-" @@ -1116,22 +1118,21 @@ class FlowMaster(controller.ServerMaster): direction=direction, ), "info") self.add_event(clean_bin(message.content), "debug") - flow.reply() - def handle_tcp_error(self, flow): + @controller.handler + def tcp_error(self, flow): self.add_event("Error in TCP connection to {}: {}".format( repr(flow.server_conn.address), flow.error ), "info") self.run_script_hook("tcp_error", flow) - flow.reply() - def handle_tcp_close(self, flow): + @controller.handler + def tcp_close(self, flow): self.active_flows.discard(flow) if self.stream: self.stream.add(flow) self.run_script_hook("tcp_close", flow) - flow.reply() def shutdown(self): super(FlowMaster, self).shutdown() diff --git a/mitmproxy/models/flow.py b/mitmproxy/models/flow.py index 1019c9fb..8797fcd8 100644 --- a/mitmproxy/models/flow.py +++ b/mitmproxy/models/flow.py @@ -152,7 +152,7 @@ class Flow(stateobject.StateObject): self.error = Error("Connection killed") self.intercepted = False self.reply(Kill) - master.handle_error(self) + master.error(self) def intercept(self, master): """ diff --git a/mitmproxy/proxy/root_context.py b/mitmproxy/proxy/root_context.py index 96e7aab6..9b4e2963 100644 --- a/mitmproxy/proxy/root_context.py +++ b/mitmproxy/proxy/root_context.py @@ -132,7 +132,6 @@ class RootContext(object): class Log(object): - def __init__(self, msg, level="info"): self.msg = msg self.level = level diff --git a/mitmproxy/web/__init__.py b/mitmproxy/web/__init__.py index 956d221d..f8102ed8 100644 --- a/mitmproxy/web/__init__.py +++ b/mitmproxy/web/__init__.py @@ -6,7 +6,7 @@ import sys from netlib.http import authentication -from .. import flow +from .. import flow, controller from ..exceptions import FlowReadException from . import app @@ -194,18 +194,20 @@ class WebMaster(flow.FlowMaster): if self.state.intercept and self.state.intercept( f) and not f.request.is_replay: f.intercept(self) - else: - f.reply() + f.reply.take() - def handle_request(self, f): + @controller.handler + def request(self, f): super(WebMaster, self).handle_request(f) self._process_flow(f) - def handle_response(self, f): + @controller.handler + def response(self, f): super(WebMaster, self).handle_response(f) self._process_flow(f) - def handle_error(self, f): + @controller.handler + def error(self, f): super(WebMaster, self).handle_error(f) self._process_flow(f) |