From aaf892e3afc682b2dc2a166a96872420e50092cd Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Sun, 17 Feb 2013 12:42:48 +1300 Subject: Significantly refactor the master/slave message passing interface. --- libmproxy/controller.py | 85 ++++++++++++++++++++++++++++++++++++------------- 1 file changed, 62 insertions(+), 23 deletions(-) (limited to 'libmproxy/controller.py') diff --git a/libmproxy/controller.py b/libmproxy/controller.py index f38d1edb..c36bb9df 100644 --- a/libmproxy/controller.py +++ b/libmproxy/controller.py @@ -17,37 +17,73 @@ import Queue, threading should_exit = False -class Msg: + +class DummyReply: + """ + A reply object that does nothing. Useful when we need an object to seem + like it has a channel, and during testing. + """ def __init__(self): + self.acked = False + + def __call__(self, msg=False): + self.acked = True + + +class Reply: + """ + Messages sent through a channel are decorated with a "reply" attribute. + This object is used to respond to the message through the return + channel. + """ + def __init__(self, obj): + self.obj = obj self.q = Queue.Queue() self.acked = False - def _ack(self, data=False): + def __call__(self, msg=False): if not self.acked: self.acked = True - if data is None: - self.q.put(data) + if msg is None: + self.q.put(msg) else: - self.q.put(data or self) + self.q.put(msg or self.obj) - def _send(self, masterq): - self.acked = False - try: - masterq.put(self, timeout=3) - while not should_exit: # pragma: no cover - try: - g = self.q.get(timeout=0.5) - except Queue.Empty: - continue - return g - except (Queue.Empty, Queue.Full): # pragma: no cover - return None + +class Channel: + def __init__(self, q): + self.q = q + + def ask(self, m): + """ + Send a message to the master, and wait for a response. + """ + m.reply = Reply(m) + self.q.put(m) + while not should_exit: + try: + # The timeout is here so we can handle a should_exit event. + g = m.reply.q.get(timeout=0.5) + except Queue.Empty: + continue + return g + + def tell(self, m): + """ + Send a message to the master, and keep going. + """ + m.reply = None + self.q.put(m) class Slave(threading.Thread): - def __init__(self, masterq, server): - self.masterq, self.server = masterq, server - self.server.set_mqueue(masterq) + """ + Slaves get a channel end-point through which they can send messages to + the master. + """ + def __init__(self, channel, server): + self.channel, self.server = channel, server + self.server.set_channel(channel) threading.Thread.__init__(self) def run(self): @@ -55,6 +91,9 @@ class Slave(threading.Thread): class Master: + """ + Masters get and respond to messages from slaves. + """ def __init__(self, server): """ server may be None if no server is needed. @@ -81,18 +120,18 @@ class Master: def run(self): global should_exit should_exit = False - self.server.start_slave(Slave, self.masterq) + self.server.start_slave(Slave, Channel(self.masterq)) while not should_exit: self.tick(self.masterq) self.shutdown() - def handle(self, msg): # pragma: no cover + def handle(self, msg): c = "handle_" + msg.__class__.__name__.lower() m = getattr(self, c, None) if m: m(msg) else: - msg._ack() + msg.reply() def shutdown(self): global should_exit -- cgit v1.2.3 From 7800b7c9103ae119a13b81048a84f626850cf04f Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Sat, 23 Feb 2013 14:08:28 +1300 Subject: Refactor proxy core communications to be clearer. --- libmproxy/controller.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'libmproxy/controller.py') diff --git a/libmproxy/controller.py b/libmproxy/controller.py index c36bb9df..849d998b 100644 --- a/libmproxy/controller.py +++ b/libmproxy/controller.py @@ -56,13 +56,14 @@ class Channel: def ask(self, m): """ - Send a message to the master, and wait for a response. + Decorate a message with a reply attribute, and send it to the + master. then wait for a response. """ m.reply = Reply(m) self.q.put(m) while not should_exit: try: - # The timeout is here so we can handle a should_exit event. + # The timeout is here so we can handle a should_exit event. g = m.reply.q.get(timeout=0.5) except Queue.Empty: continue @@ -70,9 +71,10 @@ class Channel: def tell(self, m): """ - Send a message to the master, and keep going. + Decorate a message with a dummy reply attribute, send it to the + master, then return immediately. """ - m.reply = None + m.reply = DummyReply() self.q.put(m) -- cgit v1.2.3 From f203881b0d7f81a7f8ecbc44b7030060242af01b Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Sat, 23 Feb 2013 14:13:43 +1300 Subject: Remove redundant clause in controller.Reply --- libmproxy/controller.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'libmproxy/controller.py') diff --git a/libmproxy/controller.py b/libmproxy/controller.py index 849d998b..da097692 100644 --- a/libmproxy/controller.py +++ b/libmproxy/controller.py @@ -44,10 +44,7 @@ class Reply: def __call__(self, msg=False): if not self.acked: self.acked = True - if msg is None: - self.q.put(msg) - else: - self.q.put(msg or self.obj) + self.q.put(msg or self.obj) class Channel: -- cgit v1.2.3 From 05e4d4468ec372adb73649e6980c525a185e9c07 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Sat, 23 Feb 2013 21:59:25 +1300 Subject: Test request and response kill functionality. --- libmproxy/controller.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'libmproxy/controller.py') diff --git a/libmproxy/controller.py b/libmproxy/controller.py index da097692..bb22597d 100644 --- a/libmproxy/controller.py +++ b/libmproxy/controller.py @@ -41,10 +41,13 @@ class Reply: self.q = Queue.Queue() self.acked = False - def __call__(self, msg=False): + def __call__(self, msg=None): if not self.acked: self.acked = True - self.q.put(msg or self.obj) + if msg is None: + self.q.put(self.obj) + else: + self.q.put(msg) class Channel: @@ -62,7 +65,7 @@ class Channel: try: # The timeout is here so we can handle a should_exit event. g = m.reply.q.get(timeout=0.5) - except Queue.Empty: + except Queue.Empty: # pragma: nocover continue return g -- cgit v1.2.3