From d439b3451148038a83a20964d1b26e6a17ae7eb0 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Sat, 29 Apr 2017 12:00:27 +1200 Subject: command: script.run Plus the flowlist binding adjustments. --- mitmproxy/addons/script.py | 33 ++++++++++++++--- mitmproxy/tools/console/flowlist.py | 8 +---- mitmproxy/tools/console/master.py | 49 +------------------------ test/mitmproxy/addons/test_script.py | 70 ++++++++++++++++++++---------------- 4 files changed, 70 insertions(+), 90 deletions(-) diff --git a/mitmproxy/addons/script.py b/mitmproxy/addons/script.py index 99a8f6a4..e90dd885 100644 --- a/mitmproxy/addons/script.py +++ b/mitmproxy/addons/script.py @@ -2,9 +2,13 @@ import os import importlib import time import sys +import typing from mitmproxy import addonmanager from mitmproxy import exceptions +from mitmproxy import flow +from mitmproxy import command +from mitmproxy import eventsequence from mitmproxy import ctx @@ -34,10 +38,13 @@ class Script: def __init__(self, path): self.name = "scriptmanager:" + path self.path = path + self.fullpath = os.path.expanduser(path) self.ns = None self.last_load = 0 self.last_mtime = 0 + if not os.path.isfile(self.fullpath): + raise exceptions.OptionsError("No such script: %s" % path) @property def addons(self): @@ -45,12 +52,12 @@ class Script: def tick(self): if time.time() - self.last_load > self.ReloadInterval: - mtime = os.stat(self.path).st_mtime + mtime = os.stat(self.fullpath).st_mtime if mtime > self.last_mtime: ctx.log.info("Loading script: %s" % self.path) if self.ns: ctx.master.addons.remove(self.ns) - self.ns = load_script(ctx, self.path) + self.ns = load_script(ctx, self.fullpath) if self.ns: # We're already running, so we have to explicitly register and # configure the addon @@ -76,9 +83,25 @@ class ScriptLoader: def running(self): self.is_running = True - def run_once(self, command, flows): - # Returning once we have proper commands - raise NotImplementedError + @command.command("script.run") + def script_run(self, flows: typing.Sequence[flow.Flow], path: str) -> None: + """ + Run a script on the specified flows. The script is loaded with + default options, and all lifecycle events for each flow are + simulated. + """ + try: + s = Script(path) + l = addonmanager.Loader(ctx.master) + ctx.master.addons.invoke_addon(s, "load", l) + ctx.master.addons.invoke_addon(s, "configure", ctx.options.keys()) + # Script is loaded on the first tick + ctx.master.addons.invoke_addon(s, "tick") + for f in flows: + for evt, arg in eventsequence.iterate(f): + ctx.master.addons.invoke_addon(s, evt, arg) + except exceptions.OptionsError as e: + raise exceptions.CommandError("Error running script: %s" % e) from e def configure(self, updated): if "scripts" in updated: diff --git a/mitmproxy/tools/console/flowlist.py b/mitmproxy/tools/console/flowlist.py index 2b89915f..8f8f5493 100644 --- a/mitmproxy/tools/console/flowlist.py +++ b/mitmproxy/tools/console/flowlist.py @@ -141,13 +141,7 @@ class FlowItem(urwid.WidgetWrap): def keypress(self, xxx_todo_changeme, key): (maxcol,) = xxx_todo_changeme key = common.shortcuts(key) - if key == "|": - signals.status_prompt_path.send( - prompt = "Send flow to script", - callback = self.master.run_script_once, - args = (self.flow,) - ) - elif key == "E": + if key == "E": signals.status_prompt_onekey.send( self, prompt = "Export to file", diff --git a/mitmproxy/tools/console/master.py b/mitmproxy/tools/console/master.py index 638a3d34..29a61bb3 100644 --- a/mitmproxy/tools/console/master.py +++ b/mitmproxy/tools/console/master.py @@ -13,10 +13,8 @@ import traceback import urwid from mitmproxy import addons -from mitmproxy import exceptions from mitmproxy import command from mitmproxy import master -from mitmproxy import io from mitmproxy import log from mitmproxy import flow from mitmproxy.addons import intercept @@ -170,6 +168,7 @@ def default_keymap(km): km.add("X", "flow.kill @focus", context="flowlist") km.add("z", "view.remove @all", context="flowlist") km.add("Z", "view.remove @hidden", context="flowlist") + km.add("|", "console.command 'script.run @focus '", context="flowlist") km.add("enter", "console.view.flow @focus", context="flowlist") @@ -280,31 +279,10 @@ class ConsoleMaster(master.Master): self.loop.widget = window self.loop.draw_screen() - def run_script_once(self, command, f): - sc = self.addons.get("scriptloader") - try: - with self.handlecontext(): - sc.run_once(command, [f]) - except ValueError as e: - signals.add_log("Input error: %s" % e, "warn") - def refresh_view(self): self.view_flowlist() signals.replace_view_state.send(self) - def _readflows(self, path): - """ - Utitility function that reads a list of flows - or prints an error to the UI if that fails. - Returns - - None, if there was an error. - - a list of flows, otherwise. - """ - try: - return io.read_flows_from_paths(path) - except exceptions.FlowReadException as e: - signals.status_message.send(message=str(e)) - def spawn_editor(self, data): text = not isinstance(data, bytes) fd, name = tempfile.mkstemp('', "mproxy", text=text) @@ -525,31 +503,6 @@ class ConsoleMaster(master.Master): ) ) - def _write_flows(self, path, flows): - with open(path, "wb") as f: - fw = io.FlowWriter(f) - for i in flows: - fw.add(i) - - def save_one_flow(self, path, flow): - return self._write_flows(path, [flow]) - - def save_flows(self, path): - return self._write_flows(path, self.view) - - def load_flows_callback(self, path): - ret = self.load_flows_path(path) - return ret or "Flows loaded from %s" % path - - def load_flows_path(self, path): - reterr = None - try: - master.Master.load_flows_file(self, path) - except exceptions.FlowReadException as e: - reterr = str(e) - signals.flowlist_change.send(self) - return reterr - def quit(self, a): if a != "n": self.shutdown() diff --git a/test/mitmproxy/addons/test_script.py b/test/mitmproxy/addons/test_script.py index 859d99f9..a3df1fcf 100644 --- a/test/mitmproxy/addons/test_script.py +++ b/test/mitmproxy/addons/test_script.py @@ -9,9 +9,6 @@ from mitmproxy.test import tutils from mitmproxy.test import taddons from mitmproxy import addonmanager from mitmproxy import exceptions -from mitmproxy import options -from mitmproxy import proxy -from mitmproxy import master from mitmproxy.addons import script @@ -48,9 +45,9 @@ def test_script_print_stdout(): class TestScript: def test_notfound(self): - with taddons.context() as tctx: - sc = script.Script("nonexistent") - tctx.master.addons.add(sc) + with taddons.context(): + with pytest.raises(exceptions.OptionsError): + script.Script("nonexistent") def test_simple(self): with taddons.context() as tctx: @@ -136,25 +133,45 @@ class TestCutTraceback: class TestScriptLoader: - def test_simple(self): - o = options.Options(scripts=[]) - m = master.Master(o, proxy.DummyServer()) + def test_script_run(self): + rp = tutils.test_data.path( + "mitmproxy/data/addonscripts/recorder/recorder.py" + ) sc = script.ScriptLoader() - sc.running() - m.addons.add(sc) - assert len(m.addons) == 1 - o.update( - scripts = [ - tutils.test_data.path( - "mitmproxy/data/addonscripts/recorder/recorder.py" - ) + with taddons.context() as tctx: + sc.script_run([tflow.tflow(resp=True)], rp) + debug = [i.msg for i in tctx.master.logs if i.level == "debug"] + assert debug == [ + 'recorder load', 'recorder running', 'recorder configure', + 'recorder tick', + 'recorder requestheaders', 'recorder request', + 'recorder responseheaders', 'recorder response' ] - ) - assert len(m.addons) == 1 - assert len(sc.addons) == 1 - o.update(scripts = []) - assert len(m.addons) == 1 - assert len(sc.addons) == 0 + + def test_script_run_nonexistent(self): + sc = script.ScriptLoader() + with taddons.context(): + with pytest.raises(exceptions.CommandError): + sc.script_run([tflow.tflow(resp=True)], "/nonexistent") + + def test_simple(self): + sc = script.ScriptLoader() + with taddons.context() as tctx: + tctx.master.addons.add(sc) + sc.running() + assert len(tctx.master.addons) == 1 + tctx.master.options.update( + scripts = [ + tutils.test_data.path( + "mitmproxy/data/addonscripts/recorder/recorder.py" + ) + ] + ) + assert len(tctx.master.addons) == 1 + assert len(sc.addons) == 1 + tctx.master.options.update(scripts = []) + assert len(tctx.master.addons) == 1 + assert len(sc.addons) == 0 def test_dupes(self): sc = script.ScriptLoader() @@ -166,13 +183,6 @@ class TestScriptLoader: scripts = ["one", "one"] ) - def test_nonexistent(self): - sc = script.ScriptLoader() - with taddons.context() as tctx: - tctx.master.addons.add(sc) - tctx.configure(sc, scripts = ["nonexistent"]) - tctx.master.has_log("nonexistent: file not found") - def test_order(self): rec = tutils.test_data.path("mitmproxy/data/addonscripts/recorder") sc = script.ScriptLoader() -- cgit v1.2.3