aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/addons/script.py33
-rw-r--r--mitmproxy/tools/console/flowlist.py8
-rw-r--r--mitmproxy/tools/console/master.py49
-rw-r--r--test/mitmproxy/addons/test_script.py70
4 files changed, 70 insertions, 90 deletions
diff --git a/mitmproxy/addons/script.py b/mitmproxy/addons/script.py
index 99a8f6a4..e90dd885 100644
--- a/mitmproxy/addons/script.py
+++ b/mitmproxy/addons/script.py
@@ -2,9 +2,13 @@ import os
import importlib
import time
import sys
+import typing
from mitmproxy import addonmanager
from mitmproxy import exceptions
+from mitmproxy import flow
+from mitmproxy import command
+from mitmproxy import eventsequence
from mitmproxy import ctx
@@ -34,10 +38,13 @@ class Script:
def __init__(self, path):
self.name = "scriptmanager:" + path
self.path = path
+ self.fullpath = os.path.expanduser(path)
self.ns = None
self.last_load = 0
self.last_mtime = 0
+ if not os.path.isfile(self.fullpath):
+ raise exceptions.OptionsError("No such script: %s" % path)
@property
def addons(self):
@@ -45,12 +52,12 @@ class Script:
def tick(self):
if time.time() - self.last_load > self.ReloadInterval:
- mtime = os.stat(self.path).st_mtime
+ mtime = os.stat(self.fullpath).st_mtime
if mtime > self.last_mtime:
ctx.log.info("Loading script: %s" % self.path)
if self.ns:
ctx.master.addons.remove(self.ns)
- self.ns = load_script(ctx, self.path)
+ self.ns = load_script(ctx, self.fullpath)
if self.ns:
# We're already running, so we have to explicitly register and
# configure the addon
@@ -76,9 +83,25 @@ class ScriptLoader:
def running(self):
self.is_running = True
- def run_once(self, command, flows):
- # Returning once we have proper commands
- raise NotImplementedError
+ @command.command("script.run")
+ def script_run(self, flows: typing.Sequence[flow.Flow], path: str) -> None:
+ """
+ Run a script on the specified flows. The script is loaded with
+ default options, and all lifecycle events for each flow are
+ simulated.
+ """
+ try:
+ s = Script(path)
+ l = addonmanager.Loader(ctx.master)
+ ctx.master.addons.invoke_addon(s, "load", l)
+ ctx.master.addons.invoke_addon(s, "configure", ctx.options.keys())
+ # Script is loaded on the first tick
+ ctx.master.addons.invoke_addon(s, "tick")
+ for f in flows:
+ for evt, arg in eventsequence.iterate(f):
+ ctx.master.addons.invoke_addon(s, evt, arg)
+ except exceptions.OptionsError as e:
+ raise exceptions.CommandError("Error running script: %s" % e) from e
def configure(self, updated):
if "scripts" in updated:
diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py
index 2b89915f..8f8f5493 100644
--- a/mitmproxy/tools/console/flowlist.py
+++ b/mitmproxy/tools/console/flowlist.py
@@ -141,13 +141,7 @@ class FlowItem(urwid.WidgetWrap):
def keypress(self, xxx_todo_changeme, key):
(maxcol,) = xxx_todo_changeme
key = common.shortcuts(key)
- if key == "|":
- signals.status_prompt_path.send(
- prompt = "Send flow to script",
- callback = self.master.run_script_once,
- args = (self.flow,)
- )
- elif key == "E":
+ if key == "E":
signals.status_prompt_onekey.send(
self,
prompt = "Export to file",
diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py
index 638a3d34..29a61bb3 100644
--- a/mitmproxy/tools/console/master.py
+++ b/mitmproxy/tools/console/master.py
@@ -13,10 +13,8 @@ import traceback
import urwid
from mitmproxy import addons
-from mitmproxy import exceptions
from mitmproxy import command
from mitmproxy import master
-from mitmproxy import io
from mitmproxy import log
from mitmproxy import flow
from mitmproxy.addons import intercept
@@ -170,6 +168,7 @@ def default_keymap(km):
km.add("X", "flow.kill @focus", context="flowlist")
km.add("z", "view.remove @all", context="flowlist")
km.add("Z", "view.remove @hidden", context="flowlist")
+ km.add("|", "console.command 'script.run @focus '", context="flowlist")
km.add("enter", "console.view.flow @focus", context="flowlist")
@@ -280,31 +279,10 @@ class ConsoleMaster(master.Master):
self.loop.widget = window
self.loop.draw_screen()
- def run_script_once(self, command, f):
- sc = self.addons.get("scriptloader")
- try:
- with self.handlecontext():
- sc.run_once(command, [f])
- except ValueError as e:
- signals.add_log("Input error: %s" % e, "warn")
-
def refresh_view(self):
self.view_flowlist()
signals.replace_view_state.send(self)
- def _readflows(self, path):
- """
- Utitility function that reads a list of flows
- or prints an error to the UI if that fails.
- Returns
- - None, if there was an error.
- - a list of flows, otherwise.
- """
- try:
- return io.read_flows_from_paths(path)
- except exceptions.FlowReadException as e:
- signals.status_message.send(message=str(e))
-
def spawn_editor(self, data):
text = not isinstance(data, bytes)
fd, name = tempfile.mkstemp('', "mproxy", text=text)
@@ -525,31 +503,6 @@ class ConsoleMaster(master.Master):
)
)
- def _write_flows(self, path, flows):
- with open(path, "wb") as f:
- fw = io.FlowWriter(f)
- for i in flows:
- fw.add(i)
-
- def save_one_flow(self, path, flow):
- return self._write_flows(path, [flow])
-
- def save_flows(self, path):
- return self._write_flows(path, self.view)
-
- def load_flows_callback(self, path):
- ret = self.load_flows_path(path)
- return ret or "Flows loaded from %s" % path
-
- def load_flows_path(self, path):
- reterr = None
- try:
- master.Master.load_flows_file(self, path)
- except exceptions.FlowReadException as e:
- reterr = str(e)
- signals.flowlist_change.send(self)
- return reterr
-
def quit(self, a):
if a != "n":
self.shutdown()
diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py
index 859d99f9..a3df1fcf 100644
--- a/test/mitmproxy/addons/test_script.py
+++ b/test/mitmproxy/addons/test_script.py
@@ -9,9 +9,6 @@ from mitmproxy.test import tutils
from mitmproxy.test import taddons
from mitmproxy import addonmanager
from mitmproxy import exceptions
-from mitmproxy import options
-from mitmproxy import proxy
-from mitmproxy import master
from mitmproxy.addons import script
@@ -48,9 +45,9 @@ def test_script_print_stdout():
class TestScript:
def test_notfound(self):
- with taddons.context() as tctx:
- sc = script.Script("nonexistent")
- tctx.master.addons.add(sc)
+ with taddons.context():
+ with pytest.raises(exceptions.OptionsError):
+ script.Script("nonexistent")
def test_simple(self):
with taddons.context() as tctx:
@@ -136,25 +133,45 @@ class TestCutTraceback:
class TestScriptLoader:
- def test_simple(self):
- o = options.Options(scripts=[])
- m = master.Master(o, proxy.DummyServer())
+ def test_script_run(self):
+ rp = tutils.test_data.path(
+ "mitmproxy/data/addonscripts/recorder/recorder.py"
+ )
sc = script.ScriptLoader()
- sc.running()
- m.addons.add(sc)
- assert len(m.addons) == 1
- o.update(
- scripts = [
- tutils.test_data.path(
- "mitmproxy/data/addonscripts/recorder/recorder.py"
- )
+ with taddons.context() as tctx:
+ sc.script_run([tflow.tflow(resp=True)], rp)
+ debug = [i.msg for i in tctx.master.logs if i.level == "debug"]
+ assert debug == [
+ 'recorder load', 'recorder running', 'recorder configure',
+ 'recorder tick',
+ 'recorder requestheaders', 'recorder request',
+ 'recorder responseheaders', 'recorder response'
]
- )
- assert len(m.addons) == 1
- assert len(sc.addons) == 1
- o.update(scripts = [])
- assert len(m.addons) == 1
- assert len(sc.addons) == 0
+
+ def test_script_run_nonexistent(self):
+ sc = script.ScriptLoader()
+ with taddons.context():
+ with pytest.raises(exceptions.CommandError):
+ sc.script_run([tflow.tflow(resp=True)], "/nonexistent")
+
+ def test_simple(self):
+ sc = script.ScriptLoader()
+ with taddons.context() as tctx:
+ tctx.master.addons.add(sc)
+ sc.running()
+ assert len(tctx.master.addons) == 1
+ tctx.master.options.update(
+ scripts = [
+ tutils.test_data.path(
+ "mitmproxy/data/addonscripts/recorder/recorder.py"
+ )
+ ]
+ )
+ assert len(tctx.master.addons) == 1
+ assert len(sc.addons) == 1
+ tctx.master.options.update(scripts = [])
+ assert len(tctx.master.addons) == 1
+ assert len(sc.addons) == 0
def test_dupes(self):
sc = script.ScriptLoader()
@@ -166,13 +183,6 @@ class TestScriptLoader:
scripts = ["one", "one"]
)
- def test_nonexistent(self):
- sc = script.ScriptLoader()
- with taddons.context() as tctx:
- tctx.master.addons.add(sc)
- tctx.configure(sc, scripts = ["nonexistent"])
- tctx.master.has_log("nonexistent: file not found")
-
def test_order(self):
rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder")
sc = script.ScriptLoader()