aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/nonblocking.py2
-rw-r--r--libmproxy/console/__init__.py5
-rw-r--r--libmproxy/flow.py37
-rw-r--r--libmproxy/proxy.py6
-rw-r--r--libmproxy/script.py39
-rw-r--r--test/test_script.py80
6 files changed, 83 insertions, 86 deletions
diff --git a/examples/nonblocking.py b/examples/nonblocking.py
index ed8cc7c9..9a131b32 100644
--- a/examples/nonblocking.py
+++ b/examples/nonblocking.py
@@ -5,4 +5,4 @@ from libmproxy.script import concurrent
def request(context, flow):
print "handle request: %s%s" % (flow.request.host, flow.request.path)
time.sleep(5)
- print "start request: %s%s" % (flow.request.host, flow.request.path) \ No newline at end of file
+ print "start request: %s%s" % (flow.request.host, flow.request.path)
diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py
index 210e2b95..b8d4a105 100644
--- a/libmproxy/console/__init__.py
+++ b/libmproxy/console/__init__.py
@@ -781,6 +781,9 @@ class ConsoleMaster(flow.FlowMaster):
else:
self.view_flowlist()
+ def edit_scripts(self, *args, **kwargs):
+ pass
+
def loop(self):
changed = True
try:
@@ -883,7 +886,7 @@ class ConsoleMaster(flow.FlowMaster):
grideditor.ScriptEditor(
self,
[[i.argv[0]] for i in self.scripts],
- None
+ self.edit_scripts
)
)
#if self.scripts:
diff --git a/libmproxy/flow.py b/libmproxy/flow.py
index 32306513..e88bf985 100644
--- a/libmproxy/flow.py
+++ b/libmproxy/flow.py
@@ -143,39 +143,6 @@ class SetHeaders:
f.request.headers.add(header, value)
-class ScriptContext:
- def __init__(self, master):
- self._master = master
-
- def log(self, *args, **kwargs):
- """
- Logs an event.
-
- How this is handled depends on the front-end. mitmdump will display
- events if the eventlog flag ("-e") was passed. mitmproxy sends
- output to the eventlog for display ("v" keyboard shortcut).
- """
- self._master.add_event(*args, **kwargs)
-
- def duplicate_flow(self, f):
- """
- Returns a duplicate of the specified flow. The flow is also
- injected into the current state, and is ready for editing, replay,
- etc.
- """
- self._master.pause_scripts = True
- f = self._master.duplicate_flow(f)
- self._master.pause_scripts = False
- return f
-
- def replay_request(self, f):
- """
- Replay the request on the current flow. The response will be added
- to the flow object.
- """
- self._master.replay_request(f)
-
-
class decoded(object):
"""
@@ -1431,14 +1398,14 @@ class FlowMaster(controller.Master):
"""
Returns an (error, script) tuple.
"""
- s = script.Script(script_argv, ScriptContext(self))
+ s = script.Script(script_argv, self)
try:
s.load()
except script.ScriptError, v:
return (v.args[0], None)
return (None, s)
- def unload_script(self,script):
+ def unload_script(self, script):
script.unload()
self.scripts.remove(script)
diff --git a/libmproxy/proxy.py b/libmproxy/proxy.py
index 15f75b8d..1894f7f0 100644
--- a/libmproxy/proxy.py
+++ b/libmproxy/proxy.py
@@ -110,15 +110,15 @@ class HandleSNI:
self.handler, self.client_conn, self.host, self.port = handler, client_conn, host, port
self.cert, self.key = cert, key
- def __call__(self, connection):
+ def __call__(self, client_connection):
try:
- sn = connection.get_servername()
+ sn = client_connection.get_servername()
if sn:
self.handler.get_server_connection(self.client_conn, "https", self.host, self.port, sn)
new_context = SSL.Context(SSL.TLSv1_METHOD)
new_context.use_privatekey_file(self.key)
new_context.use_certificate(self.cert.x509)
- connection.set_context(new_context)
+ client_connection.set_context(new_context)
self.handler.sni = sn.decode("utf8").encode("idna")
# An unhandled exception in this method will core dump PyOpenSSL, so
# make dang sure it doesn't happen.
diff --git a/libmproxy/script.py b/libmproxy/script.py
index f8a0d085..0f1056f6 100644
--- a/libmproxy/script.py
+++ b/libmproxy/script.py
@@ -5,6 +5,39 @@ class ScriptError(Exception):
pass
+class ScriptContext:
+ def __init__(self, master):
+ self._master = master
+
+ def log(self, *args, **kwargs):
+ """
+ Logs an event.
+
+ How this is handled depends on the front-end. mitmdump will display
+ events if the eventlog flag ("-e") was passed. mitmproxy sends
+ output to the eventlog for display ("v" keyboard shortcut).
+ """
+ self._master.add_event(*args, **kwargs)
+
+ def duplicate_flow(self, f):
+ """
+ Returns a duplicate of the specified flow. The flow is also
+ injected into the current state, and is ready for editing, replay,
+ etc.
+ """
+ self._master.pause_scripts = True
+ f = self._master.duplicate_flow(f)
+ self._master.pause_scripts = False
+ return f
+
+ def replay_request(self, f):
+ """
+ Replay the request on the current flow. The response will be added
+ to the flow object.
+ """
+ self._master.replay_request(f)
+
+
class Script:
"""
The instantiator should do something along this vein:
@@ -12,9 +45,9 @@ class Script:
s = Script(argv, master)
s.load()
"""
- def __init__(self, argv, ctx):
+ def __init__(self, argv, master):
self.argv = argv
- self.ctx = ctx
+ self.ctx = ScriptContext(master)
self.ns = None
def load(self):
@@ -82,4 +115,4 @@ def concurrent(fn):
def _concurrent(ctx, conn):
_handle_concurrent_reply(fn, conn, [ctx, conn])
return _concurrent
- raise NotImplementedError("Concurrent decorator not supported for this method.") \ No newline at end of file
+ raise NotImplementedError("Concurrent decorator not supported for this method.")
diff --git a/test/test_script.py b/test/test_script.py
index ad2296ef..69386834 100644
--- a/test/test_script.py
+++ b/test/test_script.py
@@ -3,26 +3,16 @@ import tutils
import shlex
import os
import time
+import mock
-class TCounter:
- count = 0
-
- def __call__(self, *args, **kwargs):
- self.count += 1
-
-
-class TScriptContext(TCounter):
- def log(self, msg):
- self.__call__()
-
class TestScript:
def test_simple(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- ctx = flow.ScriptContext(fm)
-
- p = script.Script(shlex.split(tutils.test_data.path("scripts/a.py")+" --var 40", posix=(os.name != "nt")), ctx)
+ p = script.Script(
+ shlex.split(tutils.test_data.path("scripts/a.py")+" --var 40",posix=(os.name != "nt")), fm
+ )
p.load()
assert "here" in p.ns
@@ -50,28 +40,26 @@ class TestScript:
def test_err(self):
s = flow.State()
fm = flow.FlowMaster(None, s)
- ctx = flow.ScriptContext(fm)
-
- s = script.Script(["nonexistent"], ctx)
+ s = script.Script(["nonexistent"], fm)
tutils.raises(
"no such file",
s.load
)
- s = script.Script([tutils.test_data.path("scripts")], ctx)
+ s = script.Script([tutils.test_data.path("scripts")], fm)
tutils.raises(
"not a file",
s.load
)
- s = script.Script([tutils.test_data.path("scripts/syntaxerr.py")], ctx)
+ s = script.Script([tutils.test_data.path("scripts/syntaxerr.py")], fm)
tutils.raises(
script.ScriptError,
s.load
)
- s = script.Script([tutils.test_data.path("scripts/loaderr.py")], ctx)
+ s = script.Script([tutils.test_data.path("scripts/loaderr.py")], fm)
tutils.raises(
script.ScriptError,
s.load
@@ -82,38 +70,44 @@ class TestScript:
fm = flow.FlowMaster(None, s)
fm.load_script([tutils.test_data.path("scripts/concurrent_decorator.py")])
- reply = TCounter()
- r1, r2 = tutils.treq(), tutils.treq()
- r1.reply, r2.reply = reply, reply
- t_start = time.time()
- fm.handle_request(r1)
- r1.reply()
- fm.handle_request(r2)
- r2.reply()
- assert reply.count < 2
- assert (time.time() - t_start) < 0.09
- time.sleep(0.2)
- assert reply.count == 2
+ with mock.patch("libmproxy.controller.DummyReply.__call__") as m:
+ r1, r2 = tutils.treq(), tutils.treq()
+ t_start = time.time()
+ fm.handle_request(r1)
+ r1.reply()
+ fm.handle_request(r2)
+ r2.reply()
+
+ # Two instantiations
+ assert m.call_count == 2
+ assert (time.time() - t_start) < 0.09
+ time.sleep(0.2)
+ # Plus two invocations
+ assert m.call_count == 4
def test_concurrent2(self):
- ctx = TScriptContext()
- s = script.Script([tutils.test_data.path("scripts/concurrent_decorator.py")], ctx)
+ s = flow.State()
+ fm = flow.FlowMaster(None, s)
+ s = script.Script([tutils.test_data.path("scripts/concurrent_decorator.py")], fm)
s.load()
f = tutils.tflow_full()
f.error = tutils.terr(f.request)
f.reply = f.request.reply
- s.run("clientconnect", f)
- s.run("serverconnect", f)
- s.run("response", f)
- s.run("error", f)
- s.run("clientdisconnect", f)
- time.sleep(0.1)
- assert ctx.count == 5
+ with mock.patch("libmproxy.controller.DummyReply.__call__") as m:
+ s.run("clientconnect", f)
+ s.run("serverconnect", f)
+ s.run("response", f)
+ s.run("error", f)
+ s.run("clientdisconnect", f)
+ time.sleep(0.1)
+ assert m.call_count == 5
def test_concurrent_err(self):
- s = script.Script([tutils.test_data.path("scripts/concurrent_decorator_err.py")], TScriptContext())
+ s = flow.State()
+ fm = flow.FlowMaster(None, s)
+ s = script.Script([tutils.test_data.path("scripts/concurrent_decorator_err.py")], fm)
tutils.raises(
"decorator not supported for this method",
s.load
- ) \ No newline at end of file
+ )