diff options
author | Maximilian Hils <git@maximilianhils.com> | 2019-11-26 02:42:47 +0100 |
---|---|---|
committer | Maximilian Hils <git@maximilianhils.com> | 2019-11-26 02:42:47 +0100 |
commit | 06ef7350f52af34954ecde2431c34c31d177d0c7 (patch) | |
tree | 29583df93abe69c878b012277efd32faaf9b9c45 /mitmproxy/addons | |
parent | 68b016e180ec1475391f2e8389ae1c708692c499 (diff) | |
download | mitmproxy-06ef7350f52af34954ecde2431c34c31d177d0c7.tar.gz mitmproxy-06ef7350f52af34954ecde2431c34c31d177d0c7.tar.bz2 mitmproxy-06ef7350f52af34954ecde2431c34c31d177d0c7.zip |
simplify command history addon
Diffstat (limited to 'mitmproxy/addons')
-rw-r--r-- | mitmproxy/addons/command_history.py | 191 |
1 files changed, 65 insertions, 126 deletions
diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index ad4c2346..f86a6196 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -1,6 +1,5 @@ -import atexit -import collections import os +import pathlib import typing from mitmproxy import command @@ -8,137 +7,77 @@ from mitmproxy import ctx class CommandHistory: - def __init__(self, size: int = 300) -> None: - self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size) - self.is_configured = False + VACUUM_SIZE = 1024 - self.filtered_commands: typing.Deque[str] = collections.deque() - self.current_index: int = -1 - self.filter_str: str = '' - self.command_history_path: str = '' + def __init__(self) -> None: + self.history: typing.List[str] = [] + self.filtered_history: typing.List[str] = [""] + self.current_index: int = 0 - atexit.register(self.cleanup) - - def cleanup(self): - self._sync_saved_commands() + def load(self, loader): + loader.add_option( + "command_history", bool, True, + """Persist command history between mitmproxy invocations.""" + ) @property - def last_filtered_index(self): - return len(self.filtered_commands) - 1 - - @command.command("command_history.clear") - def clear_history(self): - self.saved_commands.clear() - self.filtered_commands.clear() - - with open(self.command_history_path, 'w') as f: - f.truncate(0) - f.seek(0) - f.flush() - f.close() - - self.restart() - - @command.command("command_history.cancel") - def restart(self) -> None: - self.filtered_commands = self.saved_commands.copy() - self.current_index = -1 - - @command.command("command_history.next") - def get_next(self) -> str: - - if self.current_index == -1 or self.current_index == self.last_filtered_index: - self.current_index = -1 - return '' - elif self.current_index < self.last_filtered_index: - self.current_index += 1 - - ret = self.filtered_commands[self.current_index] - - return ret - - @command.command("command_history.prev") - def get_prev(self) -> str: - - if self.current_index == -1: - if self.last_filtered_index >= 0: - self.current_index = self.last_filtered_index - else: - return '' - - elif self.current_index > 0: - self.current_index -= 1 - - ret = self.filtered_commands[self.current_index] - - return ret - - @command.command("command_history.filter") - def set_filter(self, command: str) -> None: - self.filter_str = command - - _filtered_commands = [c for c in self.saved_commands if c.startswith(command)] - self.filtered_commands = collections.deque(_filtered_commands) - - if command and command not in self.filtered_commands: - self.filtered_commands.append(command) - - self.current_index = -1 - - @command.command("command_history.add") + def history_file(self) -> pathlib.Path: + return pathlib.Path(os.path.expanduser(ctx.options.confdir)) / "command_history" + + def running(self): + # FIXME: We have a weird bug where the contract for configure is not followed and it is never called with + # confdir or command_history as updated. + self.configure("command_history") + + def configure(self, updated): + if "command_history" in updated or "confdir" in updated: + if ctx.options.command_history and self.history_file.is_file(): + self.history = self.history_file.read_text().splitlines() + + def done(self): + if ctx.options.command_history and len(self.history) > self.VACUUM_SIZE: + # vacuum history so that it doesn't grow indefinitely. + history_str = "\n".join(self.history[-self.VACUUM_SIZE/2:]) + "\n" + self.history_file.write_text(history_str) + + @command.command("commands.history.add") def add_command(self, command: str) -> None: - if command.strip() == '': + if not command.strip(): return - self._sync_saved_commands() - - if command in self.saved_commands: - self.saved_commands.remove(command) - self.saved_commands.append(command) - - _history_str = "\n".join(self.saved_commands) - with open(self.command_history_path, 'w') as f: - f.truncate(0) - f.seek(0) - f.write(_history_str) - f.flush() - f.close() - - self.restart() - - def _sync_saved_commands(self): - # First read all commands from the file to merge anything that may - # have come from a different instance of the mitmproxy or sister tools - if not os.path.exists(self.command_history_path): - return + self.history.append(command) + if ctx.options.command_history: + with self.history_file.open("a") as f: + f.write(f"{command}\n") - with open(self.command_history_path, 'r') as f: - _history_lines = f.readlines() - f.close() + @command.command("commands.history.get") + def get_history(self) -> typing.Sequence[str]: + """Get the entire command history.""" + return self.history.copy() - self.saved_commands.clear() - for l in _history_lines: - l = l.strip() - if l in self.saved_commands: - self.saved_commands.remove(l) - self.saved_commands.append(l.strip()) - - def configure(self, updated: typing.Set[str]): - if self.is_configured: - return - - _command_history_dir = os.path.expanduser(ctx.options.confdir) - if not os.path.exists(_command_history_dir): - os.makedirs(_command_history_dir) - - self.command_history_path = os.path.join(_command_history_dir, 'command_history') - _history_lines: typing.List[str] = [] - if os.path.exists(self.command_history_path): - with open(self.command_history_path, 'r') as f: - _history_lines = f.readlines() - f.close() - - for l in _history_lines: - self.add_command(l.strip()) + @command.command("commands.history.clear") + def clear_history(self): + self.history_file.unlink() + self.history = [] + + # Functionality to provide a filtered list that can be iterated through. + + @command.command("commands.history.filter") + def set_filter(self, prefix: str) -> None: + self.filtered_history = [ + cmd + for cmd in self.history + if cmd.startswith(prefix) + ] + self.filtered_history.append(prefix) + self.current_index = len(self.filtered_history) - 1 + + @command.command("commands.history.next") + def get_next(self) -> str: + self.current_index = min(self.current_index + 1, len(self.filtered_history) - 1) + return self.filtered_history[self.current_index] - self.is_configured = True + @command.command("commands.history.prev") + def get_prev(self) -> str: + self.current_index = max(0, self.current_index - 1) + return self.filtered_history[self.current_index] |