From b3bf754e539555351230cbb0887f8838c12fd23c Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Wed, 8 Jun 2016 11:21:38 +1200 Subject: Simplify script concurrency helpers We now have take() to prevent double-replies. --- examples/nonblocking.py | 4 +-- mitmproxy/script/concurrent.py | 44 +++++--------------------------- test/mitmproxy/script/test_concurrent.py | 26 ++++++++----------- 3 files changed, 19 insertions(+), 55 deletions(-) diff --git a/examples/nonblocking.py b/examples/nonblocking.py index 41674b2a..4609f389 100644 --- a/examples/nonblocking.py +++ b/examples/nonblocking.py @@ -4,6 +4,6 @@ from mitmproxy.script import concurrent @concurrent # Remove this and see what happens def request(context, flow): - print("handle request: %s%s" % (flow.request.host, flow.request.path)) + context.log("handle request: %s%s" % (flow.request.host, flow.request.path)) time.sleep(5) - print("start request: %s%s" % (flow.request.host, flow.request.path)) + context.log("start request: %s%s" % (flow.request.host, flow.request.path)) diff --git a/mitmproxy/script/concurrent.py b/mitmproxy/script/concurrent.py index b81f2ab1..89c835f6 100644 --- a/mitmproxy/script/concurrent.py +++ b/mitmproxy/script/concurrent.py @@ -8,43 +8,6 @@ from mitmproxy import controller import threading -class ReplyProxy(object): - - def __init__(self, reply_func, script_thread): - self.reply_func = reply_func - self.script_thread = script_thread - self.master_reply = None - - def send(self, message): - if self.master_reply is None: - self.master_reply = message - self.script_thread.start() - return - self.reply_func(message) - - def done(self): - self.reply_func.send(self.master_reply) - - def __getattr__(self, k): - return getattr(self.reply_func, k) - - -def _handle_concurrent_reply(fn, o, *args, **kwargs): - # Make first call to o.reply a no op and start the script thread. - # We must not start the script thread before, as this may lead to a nasty race condition - # where the script thread replies a different response before the normal reply, which then gets swallowed. - - def run(): - fn(*args, **kwargs) - # If the script did not call .reply(), we have to do it now. - reply_proxy.done() - - script_thread = ScriptThread(target=run) - - reply_proxy = ReplyProxy(o.reply, script_thread) - o.reply = reply_proxy - - class ScriptThread(threading.Thread): name = "ScriptThread" @@ -56,5 +19,10 @@ def concurrent(fn): ) def _concurrent(ctx, obj): - _handle_concurrent_reply(fn, obj, ctx, obj) + def run(): + fn(ctx, obj) + if not obj.reply.acked: + obj.reply.ack() + obj.reply.take() + ScriptThread(target=run).start() return _concurrent diff --git a/test/mitmproxy/script/test_concurrent.py b/test/mitmproxy/script/test_concurrent.py index c2f169ad..62541f3f 100644 --- a/test/mitmproxy/script/test_concurrent.py +++ b/test/mitmproxy/script/test_concurrent.py @@ -1,29 +1,25 @@ -from threading import Event - from mitmproxy.script import Script from test.mitmproxy import tutils +from mitmproxy import controller +import time -class Dummy: - def __init__(self, reply): - self.reply = reply +class Thing: + def __init__(self): + self.reply = controller.DummyReply() @tutils.skip_appveyor def test_concurrent(): with Script(tutils.test_data.path("data/scripts/concurrent_decorator.py"), None) as s: - def reply(): - reply.acked.set() - reply.acked = Event() - - f1, f2 = Dummy(reply), Dummy(reply) + f1, f2 = Thing(), Thing() s.run("request", f1) - f1.reply() s.run("request", f2) - f2.reply() - assert f1.reply.acked == reply.acked - assert not reply.acked.is_set() - assert reply.acked.wait(10) + start = time.time() + while time.time() - start < 5: + if f1.reply.acked and f2.reply.acked: + return + raise ValueError("Script never acked") def test_concurrent_err(): -- cgit v1.2.3