diff options
author | Henrique <typoon@gmail.com> | 2019-11-27 08:13:19 -0500 |
---|---|---|
committer | Henrique <typoon@gmail.com> | 2019-11-27 08:13:19 -0500 |
commit | 863d2fbcb20b3f94df2ef2cf2478e86a736b7b65 (patch) | |
tree | 945e191b5815c1f98ad429cc66ed789ae6e473b2 /mitmproxy | |
parent | 09c0162e8d1ee1480945725762557b8c8f02c2f2 (diff) | |
parent | 819d5e631757f33a751eab8491b39eaacdb0c0c9 (diff) | |
download | mitmproxy-863d2fbcb20b3f94df2ef2cf2478e86a736b7b65.tar.gz mitmproxy-863d2fbcb20b3f94df2ef2cf2478e86a736b7b65.tar.bz2 mitmproxy-863d2fbcb20b3f94df2ef2cf2478e86a736b7b65.zip |
Merge branch 'command-history-file' of github.com:typoon/mitmproxy into command-history-file
Diffstat (limited to 'mitmproxy')
-rw-r--r-- | mitmproxy/addons/command_history.py | 191 | ||||
-rw-r--r-- | mitmproxy/tools/console/commander/commander.py | 13 | ||||
-rw-r--r-- | mitmproxy/tools/console/statusbar.py | 7 |
3 files changed, 73 insertions, 138 deletions
diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index ad4c2346..f86a6196 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -1,6 +1,5 @@ -import atexit -import collections import os +import pathlib import typing from mitmproxy import command @@ -8,137 +7,77 @@ from mitmproxy import ctx class CommandHistory: - def __init__(self, size: int = 300) -> None: - self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size) - self.is_configured = False + VACUUM_SIZE = 1024 - self.filtered_commands: typing.Deque[str] = collections.deque() - self.current_index: int = -1 - self.filter_str: str = '' - self.command_history_path: str = '' + def __init__(self) -> None: + self.history: typing.List[str] = [] + self.filtered_history: typing.List[str] = [""] + self.current_index: int = 0 - atexit.register(self.cleanup) - - def cleanup(self): - self._sync_saved_commands() + def load(self, loader): + loader.add_option( + "command_history", bool, True, + """Persist command history between mitmproxy invocations.""" + ) @property - def last_filtered_index(self): - return len(self.filtered_commands) - 1 - - @command.command("command_history.clear") - def clear_history(self): - self.saved_commands.clear() - self.filtered_commands.clear() - - with open(self.command_history_path, 'w') as f: - f.truncate(0) - f.seek(0) - f.flush() - f.close() - - self.restart() - - @command.command("command_history.cancel") - def restart(self) -> None: - self.filtered_commands = self.saved_commands.copy() - self.current_index = -1 - - @command.command("command_history.next") - def get_next(self) -> str: - - if self.current_index == -1 or self.current_index == self.last_filtered_index: - self.current_index = -1 - return '' - elif self.current_index < self.last_filtered_index: - self.current_index += 1 - - ret = self.filtered_commands[self.current_index] - - return ret - - @command.command("command_history.prev") - def get_prev(self) -> str: - - if self.current_index == -1: - if self.last_filtered_index >= 0: - self.current_index = self.last_filtered_index - else: - return '' - - elif self.current_index > 0: - self.current_index -= 1 - - ret = self.filtered_commands[self.current_index] - - return ret - - @command.command("command_history.filter") - def set_filter(self, command: str) -> None: - self.filter_str = command - - _filtered_commands = [c for c in self.saved_commands if c.startswith(command)] - self.filtered_commands = collections.deque(_filtered_commands) - - if command and command not in self.filtered_commands: - self.filtered_commands.append(command) - - self.current_index = -1 - - @command.command("command_history.add") + def history_file(self) -> pathlib.Path: + return pathlib.Path(os.path.expanduser(ctx.options.confdir)) / "command_history" + + def running(self): + # FIXME: We have a weird bug where the contract for configure is not followed and it is never called with + # confdir or command_history as updated. + self.configure("command_history") + + def configure(self, updated): + if "command_history" in updated or "confdir" in updated: + if ctx.options.command_history and self.history_file.is_file(): + self.history = self.history_file.read_text().splitlines() + + def done(self): + if ctx.options.command_history and len(self.history) > self.VACUUM_SIZE: + # vacuum history so that it doesn't grow indefinitely. + history_str = "\n".join(self.history[-self.VACUUM_SIZE/2:]) + "\n" + self.history_file.write_text(history_str) + + @command.command("commands.history.add") def add_command(self, command: str) -> None: - if command.strip() == '': + if not command.strip(): return - self._sync_saved_commands() - - if command in self.saved_commands: - self.saved_commands.remove(command) - self.saved_commands.append(command) - - _history_str = "\n".join(self.saved_commands) - with open(self.command_history_path, 'w') as f: - f.truncate(0) - f.seek(0) - f.write(_history_str) - f.flush() - f.close() - - self.restart() - - def _sync_saved_commands(self): - # First read all commands from the file to merge anything that may - # have come from a different instance of the mitmproxy or sister tools - if not os.path.exists(self.command_history_path): - return + self.history.append(command) + if ctx.options.command_history: + with self.history_file.open("a") as f: + f.write(f"{command}\n") - with open(self.command_history_path, 'r') as f: - _history_lines = f.readlines() - f.close() + @command.command("commands.history.get") + def get_history(self) -> typing.Sequence[str]: + """Get the entire command history.""" + return self.history.copy() - self.saved_commands.clear() - for l in _history_lines: - l = l.strip() - if l in self.saved_commands: - self.saved_commands.remove(l) - self.saved_commands.append(l.strip()) - - def configure(self, updated: typing.Set[str]): - if self.is_configured: - return - - _command_history_dir = os.path.expanduser(ctx.options.confdir) - if not os.path.exists(_command_history_dir): - os.makedirs(_command_history_dir) - - self.command_history_path = os.path.join(_command_history_dir, 'command_history') - _history_lines: typing.List[str] = [] - if os.path.exists(self.command_history_path): - with open(self.command_history_path, 'r') as f: - _history_lines = f.readlines() - f.close() - - for l in _history_lines: - self.add_command(l.strip()) + @command.command("commands.history.clear") + def clear_history(self): + self.history_file.unlink() + self.history = [] + + # Functionality to provide a filtered list that can be iterated through. + + @command.command("commands.history.filter") + def set_filter(self, prefix: str) -> None: + self.filtered_history = [ + cmd + for cmd in self.history + if cmd.startswith(prefix) + ] + self.filtered_history.append(prefix) + self.current_index = len(self.filtered_history) - 1 + + @command.command("commands.history.next") + def get_next(self) -> str: + self.current_index = min(self.current_index + 1, len(self.filtered_history) - 1) + return self.filtered_history[self.current_index] - self.is_configured = True + @command.command("commands.history.prev") + def get_prev(self) -> str: + self.current_index = max(0, self.current_index - 1) + return self.filtered_history[self.current_index] diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index 0f45aa1f..0feae28e 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -9,8 +9,6 @@ import mitmproxy.flow import mitmproxy.master import mitmproxy.types -from mitmproxy import command_lexer - class Completer: @abc.abstractmethod @@ -200,7 +198,7 @@ class CommandEdit(urwid.WidgetWrap): self.cbuf.backspace() if self.cbuf.text == '': self.active_filter = False - self.master.commands.execute("command_history.filter ''") + self.master.commands.call("commands.history.filter", "") self.filter_str = '' elif key == "left" or key == "ctrl b": self.cbuf.left() @@ -210,20 +208,19 @@ class CommandEdit(urwid.WidgetWrap): if self.active_filter is False: self.active_filter = True self.filter_str = self.cbuf.text - _cmd = command_lexer.quote(self.cbuf.text) - self.master.commands.execute("command_history.filter %s" % _cmd) - cmd = self.master.commands.execute("command_history.prev") + self.master.commands.call("commands.history.filter", self.cbuf.text) + cmd = self.master.commands.execute("commands.history.prev") self.cbuf = CommandBuffer(self.master, cmd) elif key == "down" or key == "ctrl n": prev_cmd = self.cbuf.text - cmd = self.master.commands.execute("command_history.next") + cmd = self.master.commands.execute("commands.history.next") if cmd == '': if prev_cmd == self.filter_str: self.cbuf = CommandBuffer(self.master, prev_cmd) else: self.active_filter = False - self.master.commands.execute("command_history.filter ''") + self.master.commands.call("commands.history.filter", "") self.filter_str = '' self.cbuf = CommandBuffer(self.master, '') else: diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 39141b97..43b81d22 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -132,7 +132,6 @@ class ActionBar(urwid.WidgetWrap): def keypress(self, size, k): if self.prompting: if k == "esc": - self.master.commands.execute('command_history.cancel') self.prompt_done() elif self.onekey: if k == "enter": @@ -140,9 +139,9 @@ class ActionBar(urwid.WidgetWrap): elif k in self.onekey: self.prompt_execute(k) elif k == "enter": - cmd = command_lexer.quote(self._w.cbuf.text) - self.master.commands.execute(f"command_history.add {cmd}") - self.prompt_execute(self._w.get_edit_text()) + text = self._w.get_edit_text() + self.prompt_execute(text) + self.master.commands.call("commands.history.add", text) else: if common.is_keypress(k): self._w.keypress(size, k) |