aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/libmproxy/controller.py
diff options
context:
space:
mode:
Diffstat (limited to 'mitmproxy/libmproxy/controller.py')
-rw-r--r--mitmproxy/libmproxy/controller.py143
1 files changed, 143 insertions, 0 deletions
diff --git a/mitmproxy/libmproxy/controller.py b/mitmproxy/libmproxy/controller.py
new file mode 100644
index 00000000..9a059856
--- /dev/null
+++ b/mitmproxy/libmproxy/controller.py
@@ -0,0 +1,143 @@
+from __future__ import absolute_import
+import Queue
+import threading
+
+
+class DummyReply:
+
+ """
+ A reply object that does nothing. Useful when we need an object to seem
+ like it has a channel, and during testing.
+ """
+
+ def __init__(self):
+ self.acked = False
+
+ def __call__(self, msg=False):
+ self.acked = True
+
+
+class Reply:
+
+ """
+ Messages sent through a channel are decorated with a "reply" attribute.
+ This object is used to respond to the message through the return
+ channel.
+ """
+
+ def __init__(self, obj):
+ self.obj = obj
+ self.q = Queue.Queue()
+ self.acked = False
+
+ def __call__(self, msg=None):
+ if not self.acked:
+ self.acked = True
+ if msg is None:
+ self.q.put(self.obj)
+ else:
+ self.q.put(msg)
+
+
+class Channel:
+
+ def __init__(self, q, should_exit):
+ self.q = q
+ self.should_exit = should_exit
+
+ def ask(self, mtype, m):
+ """
+ Decorate a message with a reply attribute, and send it to the
+ master. then wait for a response.
+ """
+ m.reply = Reply(m)
+ self.q.put((mtype, m))
+ while not self.should_exit.is_set():
+ try:
+ # The timeout is here so we can handle a should_exit event.
+ g = m.reply.q.get(timeout=0.5)
+ except Queue.Empty: # pragma: no cover
+ continue
+ return g
+
+ def tell(self, mtype, m):
+ """
+ Decorate a message with a dummy reply attribute, send it to the
+ master, then return immediately.
+ """
+ m.reply = DummyReply()
+ self.q.put((mtype, m))
+
+
+class Slave(threading.Thread):
+
+ """
+ Slaves get a channel end-point through which they can send messages to
+ the master.
+ """
+
+ def __init__(self, channel, server):
+ self.channel, self.server = channel, server
+ self.server.set_channel(channel)
+ threading.Thread.__init__(self)
+ self.name = "SlaveThread (%s:%s)" % (
+ self.server.address.host, self.server.address.port)
+
+ def run(self):
+ self.server.serve_forever()
+
+
+class Master(object):
+
+ """
+ Masters get and respond to messages from slaves.
+ """
+
+ def __init__(self, server):
+ """
+ server may be None if no server is needed.
+ """
+ self.server = server
+ self.masterq = Queue.Queue()
+ self.should_exit = threading.Event()
+
+ def tick(self, q, timeout):
+ changed = False
+ try:
+ # This endless loop runs until the 'Queue.Empty'
+ # exception is thrown. If more than one request is in
+ # the queue, this speeds up every request by 0.1 seconds,
+ # because get_input(..) function is not blocking.
+ while True:
+ msg = q.get(timeout=timeout)
+ self.handle(*msg)
+ q.task_done()
+ changed = True
+ except Queue.Empty:
+ pass
+ return changed
+
+ def run(self):
+ self.should_exit.clear()
+ self.server.start_slave(Slave, Channel(self.masterq, self.should_exit))
+ while not self.should_exit.is_set():
+
+ # Don't choose a very small timeout in Python 2:
+ # https://github.com/mitmproxy/mitmproxy/issues/443
+ # TODO: Lower the timeout value if we move to Python 3.
+ self.tick(self.masterq, 0.1)
+ self.shutdown()
+
+ def handle(self, mtype, obj):
+ c = "handle_" + mtype
+ m = getattr(self, c, None)
+ if m:
+ m(obj)
+ else:
+ obj.reply()
+
+ def shutdown(self):
+ if not self.should_exit.is_set():
+ self.should_exit.set()
+ if self.server:
+ self.server.shutdown()