diff options
Diffstat (limited to 'mitmproxy/controller.py')
-rw-r--r-- | mitmproxy/controller.py | 193 |
1 files changed, 107 insertions, 86 deletions
diff --git a/mitmproxy/controller.py b/mitmproxy/controller.py index 84f243a0..81978a09 100644 --- a/mitmproxy/controller.py +++ b/mitmproxy/controller.py @@ -2,44 +2,93 @@ from __future__ import absolute_import from six.moves import queue import threading +from .exceptions import Kill -class DummyReply: +class Master(object): """ - A reply object that does nothing. Useful when we need an object to seem - like it has a channel, and during testing. + The master handles mitmproxy's main event loop. """ def __init__(self): - self.acked = False + self.event_queue = queue.Queue() + self.should_exit = threading.Event() - def __call__(self, msg=False): - self.acked = True + def start(self): + self.should_exit.clear() + def run(self): + self.start() + try: + while not self.should_exit.is_set(): + # Don't choose a very small timeout in Python 2: + # https://github.com/mitmproxy/mitmproxy/issues/443 + # TODO: Lower the timeout value if we move to Python 3. + self.tick(0.1) + finally: + self.shutdown() + + def tick(self, timeout): + changed = False + try: + # This endless loop runs until the 'Queue.Empty' + # exception is thrown. + while True: + mtype, obj = self.event_queue.get(timeout=timeout) + handle_func = getattr(self, "handle_" + mtype) + handle_func(obj) + self.event_queue.task_done() + changed = True + except queue.Empty: + pass + return changed + + def shutdown(self): + self.should_exit.set() -class Reply: +class ServerMaster(Master): """ - Messages sent through a channel are decorated with a "reply" attribute. - This object is used to respond to the message through the return - channel. + The ServerMaster adds server thread support to the master. """ - def __init__(self, obj): - self.obj = obj - self.q = queue.Queue() - self.acked = False + def __init__(self): + super(ServerMaster, self).__init__() + self.servers = [] - def __call__(self, msg=None): - if not self.acked: - self.acked = True - if msg is None: - self.q.put(self.obj) - else: - self.q.put(msg) + def add_server(self, server): + # We give a Channel to the server which can be used to communicate with the master + channel = Channel(self.event_queue, self.should_exit) + server.set_channel(channel) + self.servers.append(server) + + def start(self): + super(ServerMaster, self).start() + for server in self.servers: + ServerThread(server).start() + + def shutdown(self): + for server in self.servers: + server.shutdown() + super(ServerMaster, self).shutdown() -class Channel: +class ServerThread(threading.Thread): + def __init__(self, server): + self.server = server + super(ServerThread, self).__init__() + address = getattr(self.server, "address", None) + self.name = "ServerThread ({})".format(repr(address)) + + def run(self): + self.server.serve_forever() + + +class Channel(object): + """ + The only way for the proxy server to communicate with the master + is to use the channel it has been given. + """ def __init__(self, q, should_exit): self.q = q @@ -47,8 +96,11 @@ class Channel: def ask(self, mtype, m): """ - Decorate a message with a reply attribute, and send it to the - master. then wait for a response. + Decorate a message with a reply attribute, and send it to the + master. Then wait for a response. + + Raises: + Kill: All connections should be closed immediately. """ m.reply = Reply(m) self.q.put((mtype, m)) @@ -58,85 +110,54 @@ class Channel: g = m.reply.q.get(timeout=0.5) except queue.Empty: # pragma: no cover continue + if g == Kill: + raise Kill() return g + raise Kill() + def tell(self, mtype, m): """ - Decorate a message with a dummy reply attribute, send it to the - master, then return immediately. + Decorate a message with a dummy reply attribute, send it to the + master, then return immediately. """ m.reply = DummyReply() self.q.put((mtype, m)) -class Slave(threading.Thread): - +class DummyReply(object): """ - Slaves get a channel end-point through which they can send messages to - the master. + A reply object that does nothing. Useful when we need an object to seem + like it has a channel, and during testing. """ - def __init__(self, channel, server): - self.channel, self.server = channel, server - self.server.set_channel(channel) - threading.Thread.__init__(self) - self.name = "SlaveThread ({})".format(repr(self.server.address)) + def __init__(self): + self.acked = False - def run(self): - self.server.serve_forever() + def __call__(self, msg=False): + self.acked = True -class Master(object): +# Special value to distinguish the case where no reply was sent +NO_REPLY = object() + +class Reply(object): """ - Masters get and respond to messages from slaves. + Messages sent through a channel are decorated with a "reply" attribute. + This object is used to respond to the message through the return + channel. """ - def __init__(self, server): - """ - server may be None if no server is needed. - """ - self.server = server - self.masterq = queue.Queue() - self.should_exit = threading.Event() - - def tick(self, q, timeout): - changed = False - try: - # This endless loop runs until the 'Queue.Empty' - # exception is thrown. If more than one request is in - # the queue, this speeds up every request by 0.1 seconds, - # because get_input(..) function is not blocking. - while True: - msg = q.get(timeout=timeout) - self.handle(*msg) - q.task_done() - changed = True - except queue.Empty: - pass - return changed - - def run(self): - self.should_exit.clear() - self.server.start_slave(Slave, Channel(self.masterq, self.should_exit)) - while not self.should_exit.is_set(): - - # Don't choose a very small timeout in Python 2: - # https://github.com/mitmproxy/mitmproxy/issues/443 - # TODO: Lower the timeout value if we move to Python 3. - self.tick(self.masterq, 0.1) - self.shutdown() - - def handle(self, mtype, obj): - c = "handle_" + mtype - m = getattr(self, c, None) - if m: - m(obj) - else: - obj.reply() + def __init__(self, obj): + self.obj = obj + self.q = queue.Queue() + self.acked = False - def shutdown(self): - if not self.should_exit.is_set(): - self.should_exit.set() - if self.server: - self.server.shutdown() + def __call__(self, msg=NO_REPLY): + if not self.acked: + self.acked = True + if msg is NO_REPLY: + self.q.put(self.obj) + else: + self.q.put(msg) |