diff options
Diffstat (limited to 'mitmproxy/builtins/script.py')
-rw-r--r-- | mitmproxy/builtins/script.py | 185 |
1 files changed, 185 insertions, 0 deletions
diff --git a/mitmproxy/builtins/script.py b/mitmproxy/builtins/script.py new file mode 100644 index 00000000..14f7dd5f --- /dev/null +++ b/mitmproxy/builtins/script.py @@ -0,0 +1,185 @@ +from __future__ import absolute_import, print_function, division + +import contextlib +import os +import shlex +import sys +import threading +import traceback + +from mitmproxy import exceptions +from mitmproxy import controller +from mitmproxy import ctx + + +import watchdog.events +from watchdog.observers import polling + + +def parse_command(command): + """ + Returns a (path, args) tuple. + """ + if not command or not command.strip(): + raise exceptions.AddonError("Empty script command.") + # Windows: escape all backslashes in the path. + if os.name == "nt": # pragma: no cover + backslashes = shlex.split(command, posix=False)[0].count("\\") + command = command.replace("\\", "\\\\", backslashes) + args = shlex.split(command) # pragma: no cover + args[0] = os.path.expanduser(args[0]) + if not os.path.exists(args[0]): + raise exceptions.AddonError( + ("Script file not found: %s.\r\n" + "If your script path contains spaces, " + "make sure to wrap it in additional quotes, e.g. -s \"'./foo bar/baz.py' --args\".") % + args[0]) + elif os.path.isdir(args[0]): + raise exceptions.AddonError("Not a file: %s" % args[0]) + return args[0], args[1:] + + +@contextlib.contextmanager +def scriptenv(path, args): + oldargs = sys.argv + sys.argv = [path] + args + script_dir = os.path.dirname(os.path.abspath(path)) + sys.path.append(script_dir) + try: + yield + except Exception: + _, _, tb = sys.exc_info() + scriptdir = os.path.dirname(os.path.abspath(path)) + for i, s in enumerate(reversed(traceback.extract_tb(tb))): + tb = tb.tb_next + if not os.path.abspath(s[0]).startswith(scriptdir): + break + ctx.log.error("Script error: %s" % "".join(traceback.format_tb(tb))) + finally: + sys.argv = oldargs + sys.path.pop() + + +def load_script(path, args): + with open(path, "rb") as f: + try: + code = compile(f.read(), path, 'exec') + except SyntaxError as e: + ctx.log.error( + "Script error: %s line %s: %s" % ( + e.filename, e.lineno, e.msg + ) + ) + return + ns = {'__file__': os.path.abspath(path)} + with scriptenv(path, args): + exec(code, ns, ns) + return ns + + +class ReloadHandler(watchdog.events.FileSystemEventHandler): + def __init__(self, callback): + self.callback = callback + + def on_modified(self, event): + self.callback() + + def on_created(self, event): + self.callback() + + +class Script: + """ + An addon that manages a single script. + """ + def __init__(self, command): + self.name = command + + self.command = command + self.path, self.args = parse_command(command) + self.ns = None + self.observer = None + self.dead = False + + self.last_options = None + self.should_reload = threading.Event() + + for i in controller.Events: + if not hasattr(self, i): + def mkprox(): + evt = i + + def prox(*args, **kwargs): + self.run(evt, *args, **kwargs) + return prox + setattr(self, i, mkprox()) + + def run(self, name, *args, **kwargs): + # It's possible for ns to be un-initialised if we failed during + # configure + if self.ns is not None and not self.dead: + func = self.ns.get(name) + if func: + with scriptenv(self.path, self.args): + func(*args, **kwargs) + + def reload(self): + self.should_reload.set() + + def tick(self): + if self.should_reload.is_set(): + self.should_reload.clear() + ctx.log.info("Reloading script: %s" % self.name) + self.ns = load_script(self.path, self.args) + self.configure(self.last_options) + else: + self.run("tick") + + def start(self): + self.ns = load_script(self.path, self.args) + self.run("start") + + def configure(self, options): + self.last_options = options + if not self.observer: + self.observer = polling.PollingObserver() + # Bind the handler to the real underlying master object + self.observer.schedule( + ReloadHandler(self.reload), + os.path.dirname(self.path) or "." + ) + self.observer.start() + self.run("configure", options) + + def done(self): + self.run("done") + self.dead = True + + +class ScriptLoader(): + """ + An addon that manages loading scripts from options. + """ + def configure(self, options): + for s in options.scripts: + if options.scripts.count(s) > 1: + raise exceptions.OptionsError("Duplicate script: %s" % s) + + for a in ctx.master.addons.chain[:]: + if isinstance(a, Script) and a.name not in options.scripts: + ctx.log.info("Un-loading script: %s" % a.name) + ctx.master.addons.remove(a) + + current = {} + for a in ctx.master.addons.chain[:]: + if isinstance(a, Script): + current[a.name] = a + ctx.master.addons.chain.remove(a) + + for s in options.scripts: + if s in current: + ctx.master.addons.chain.append(current[s]) + else: + ctx.log.info("Loading script: %s" % s) + sc = Script(s) + ctx.master.addons.add(sc) |