diff options
author | Maximilian Hils <git@maximilianhils.com> | 2019-12-12 15:12:37 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-12-12 15:12:37 +0100 |
commit | a58b8c9cdbfea7bb77d36e646ce14e6f4f1d64f3 (patch) | |
tree | f7ed8885641f1d4623c932f05a0531bec8c119d8 /mitmproxy | |
parent | d62835f6cd62cd0445c029f67e82afe32a82814a (diff) | |
parent | d42e8b2fe6cad23d3aca38823439a39d33adc8cb (diff) | |
download | mitmproxy-a58b8c9cdbfea7bb77d36e646ce14e6f4f1d64f3.tar.gz mitmproxy-a58b8c9cdbfea7bb77d36e646ce14e6f4f1d64f3.tar.bz2 mitmproxy-a58b8c9cdbfea7bb77d36e646ce14e6f4f1d64f3.zip |
Merge pull request #3724 from typoon/command-history-file
Save user executed commands to a history file
Diffstat (limited to 'mitmproxy')
-rw-r--r-- | mitmproxy/addons/__init__.py | 2 | ||||
-rw-r--r-- | mitmproxy/addons/command_history.py | 89 | ||||
-rw-r--r-- | mitmproxy/command.py | 6 | ||||
-rw-r--r-- | mitmproxy/tools/_main.py | 1 | ||||
-rw-r--r-- | mitmproxy/tools/console/commander/commander.py | 72 | ||||
-rw-r--r-- | mitmproxy/tools/console/statusbar.py | 11 |
6 files changed, 126 insertions, 55 deletions
diff --git a/mitmproxy/addons/__init__.py b/mitmproxy/addons/__init__.py index 838fba9b..ee238938 100644 --- a/mitmproxy/addons/__init__.py +++ b/mitmproxy/addons/__init__.py @@ -4,6 +4,7 @@ from mitmproxy.addons import block from mitmproxy.addons import browser from mitmproxy.addons import check_ca from mitmproxy.addons import clientplayback +from mitmproxy.addons import command_history from mitmproxy.addons import core from mitmproxy.addons import cut from mitmproxy.addons import disable_h2c @@ -30,6 +31,7 @@ def default_addons(): anticomp.AntiComp(), check_ca.CheckCA(), clientplayback.ClientPlayback(), + command_history.CommandHistory(), cut.Cut(), disable_h2c.DisableH2C(), export.Export(), diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py new file mode 100644 index 00000000..2c1bf887 --- /dev/null +++ b/mitmproxy/addons/command_history.py @@ -0,0 +1,89 @@ +import os +import pathlib +import typing + +from mitmproxy import command +from mitmproxy import ctx + + +class CommandHistory: + VACUUM_SIZE = 1024 + + def __init__(self) -> None: + self.history: typing.List[str] = [] + self.filtered_history: typing.List[str] = [""] + self.current_index: int = 0 + + def load(self, loader): + loader.add_option( + "command_history", bool, True, + """Persist command history between mitmproxy invocations.""" + ) + + @property + def history_file(self) -> pathlib.Path: + return pathlib.Path(os.path.expanduser(ctx.options.confdir)) / "command_history" + + def running(self): + # FIXME: We have a weird bug where the contract for configure is not followed and it is never called with + # confdir or command_history as updated. + self.configure("command_history") + + def configure(self, updated): + if "command_history" in updated or "confdir" in updated: + if ctx.options.command_history and self.history_file.is_file(): + self.history = self.history_file.read_text().splitlines() + self.set_filter('') + + def done(self): + if ctx.options.command_history and len(self.history) > self.VACUUM_SIZE: + # vacuum history so that it doesn't grow indefinitely. + history_str = "\n".join(self.history[-self.VACUUM_SIZE / 2:]) + "\n" + self.history_file.write_text(history_str) + + @command.command("commands.history.add") + def add_command(self, command: str) -> None: + if not command.strip(): + return + + self.history.append(command) + if ctx.options.command_history: + with self.history_file.open("a") as f: + f.write(f"{command}\n") + f.close() + + self.set_filter('') + + @command.command("commands.history.get") + def get_history(self) -> typing.Sequence[str]: + """Get the entire command history.""" + return self.history.copy() + + @command.command("commands.history.clear") + def clear_history(self): + if self.history_file.exists(): + self.history_file.unlink() + self.history = [] + self.set_filter('') + + # Functionality to provide a filtered list that can be iterated through. + + @command.command("commands.history.filter") + def set_filter(self, prefix: str) -> None: + self.filtered_history = [ + cmd + for cmd in self.history + if cmd.startswith(prefix) + ] + self.filtered_history.append(prefix) + self.current_index = len(self.filtered_history) - 1 + + @command.command("commands.history.next") + def get_next(self) -> str: + self.current_index = min(self.current_index + 1, len(self.filtered_history) - 1) + return self.filtered_history[self.current_index] + + @command.command("commands.history.prev") + def get_prev(self) -> str: + self.current_index = max(0, self.current_index - 1) + return self.filtered_history[self.current_index] diff --git a/mitmproxy/command.py b/mitmproxy/command.py index 6977ff91..48f9051e 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -100,8 +100,10 @@ class Command: def prepare_args(self, args: typing.Sequence[str]) -> inspect.BoundArguments: try: bound_arguments = self.signature.bind(*args) - except TypeError as v: - raise exceptions.CommandError(f"Command argument mismatch: {v.args[0]}") + except TypeError: + expected = f'Expected: {str(self.signature.parameters)}' + received = f'Received: {str(args)}' + raise exceptions.CommandError(f"Command argument mismatch: \n\t{expected}\n\t{received}") for name, value in bound_arguments.arguments.items(): convert_to = self.signature.parameters[name].annotation diff --git a/mitmproxy/tools/_main.py b/mitmproxy/tools/_main.py index 0163e8d3..b98bbe90 100644 --- a/mitmproxy/tools/_main.py +++ b/mitmproxy/tools/_main.py @@ -83,6 +83,7 @@ def run( except SystemExit: arg_check.check() sys.exit(1) + try: opts.set(*args.setoptions, defer=True) optmanager.load_paths( diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index 02eda3bf..0feae28e 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -1,6 +1,4 @@ import abc -import collections -import copy import typing import urwid @@ -158,53 +156,15 @@ class CommandBuffer: self.completion = None -class CommandHistory: - def __init__(self, master: mitmproxy.master.Master, size: int = 30) -> None: - self.saved_commands: collections.deque = collections.deque( - [CommandBuffer(master, "")], - maxlen=size - ) - self.index: int = 0 - - @property - def last_index(self): - return len(self.saved_commands) - 1 - - def get_next(self) -> typing.Optional[CommandBuffer]: - if self.index < self.last_index: - self.index = self.index + 1 - return self.saved_commands[self.index] - return None - - def get_prev(self) -> typing.Optional[CommandBuffer]: - if self.index > 0: - self.index = self.index - 1 - return self.saved_commands[self.index] - return None - - def add_command(self, command: CommandBuffer, execution: bool = False) -> None: - if self.index == self.last_index or execution: - last_item = self.saved_commands[-1] - last_item_empty = not last_item.text - if last_item.text == command.text or (last_item_empty and execution): - self.saved_commands[-1] = copy.copy(command) - else: - self.saved_commands.append(command) - if not execution and self.index < self.last_index: - self.index += 1 - if execution: - self.index = self.last_index - - class CommandEdit(urwid.WidgetWrap): leader = ": " - def __init__(self, master: mitmproxy.master.Master, - text: str, history: CommandHistory) -> None: + def __init__(self, master: mitmproxy.master.Master, text: str) -> None: super().__init__(urwid.Text(self.leader)) self.master = master + self.active_filter = False + self.filter_str = '' self.cbuf = CommandBuffer(master, text) - self.history = history self.update() def keypress(self, size, key) -> None: @@ -236,15 +196,35 @@ class CommandEdit(urwid.WidgetWrap): self.cbuf.cursor = cursor_pos elif key == "backspace": self.cbuf.backspace() + if self.cbuf.text == '': + self.active_filter = False + self.master.commands.call("commands.history.filter", "") + self.filter_str = '' elif key == "left" or key == "ctrl b": self.cbuf.left() elif key == "right" or key == "ctrl f": self.cbuf.right() elif key == "up" or key == "ctrl p": - self.history.add_command(self.cbuf) - self.cbuf = self.history.get_prev() or self.cbuf + if self.active_filter is False: + self.active_filter = True + self.filter_str = self.cbuf.text + self.master.commands.call("commands.history.filter", self.cbuf.text) + cmd = self.master.commands.execute("commands.history.prev") + self.cbuf = CommandBuffer(self.master, cmd) elif key == "down" or key == "ctrl n": - self.cbuf = self.history.get_next() or self.cbuf + prev_cmd = self.cbuf.text + cmd = self.master.commands.execute("commands.history.next") + + if cmd == '': + if prev_cmd == self.filter_str: + self.cbuf = CommandBuffer(self.master, prev_cmd) + else: + self.active_filter = False + self.master.commands.call("commands.history.filter", "") + self.filter_str = '' + self.cbuf = CommandBuffer(self.master, '') + else: + self.cbuf = CommandBuffer(self.master, cmd) elif key == "shift tab": self.cbuf.cycle_completion(False) elif key == "tab": diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 43f5170d..7bd92d01 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -3,10 +3,10 @@ from typing import Optional import urwid +import mitmproxy.tools.console.master # noqa from mitmproxy.tools.console import common from mitmproxy.tools.console import signals from mitmproxy.tools.console import commandexecutor -import mitmproxy.tools.console.master # noqa from mitmproxy.tools.console.commander import commander @@ -43,8 +43,6 @@ class ActionBar(urwid.WidgetWrap): signals.status_prompt_onekey.connect(self.sig_prompt_onekey) signals.status_prompt_command.connect(self.sig_prompt_command) - self.command_history = commander.CommandHistory(master) - self.prompting = None self.onekey = False @@ -104,7 +102,6 @@ class ActionBar(urwid.WidgetWrap): self._w = commander.CommandEdit( self.master, partial, - self.command_history, ) if cursor is not None: self._w.cbuf.cursor = cursor @@ -134,7 +131,6 @@ class ActionBar(urwid.WidgetWrap): def keypress(self, size, k): if self.prompting: if k == "esc": - self.command_history.index = self.command_history.last_index self.prompt_done() elif self.onekey: if k == "enter": @@ -142,8 +138,9 @@ class ActionBar(urwid.WidgetWrap): elif k in self.onekey: self.prompt_execute(k) elif k == "enter": - self.command_history.add_command(self._w.cbuf, True) - self.prompt_execute(self._w.get_edit_text()) + text = self._w.get_edit_text() + self.prompt_execute(text) + self.master.commands.call("commands.history.add", text) else: if common.is_keypress(k): self._w.keypress(size, k) |