diff options
author | Henrique <typoon@gmail.com> | 2019-11-27 08:13:19 -0500 |
---|---|---|
committer | Henrique <typoon@gmail.com> | 2019-11-27 08:13:19 -0500 |
commit | 863d2fbcb20b3f94df2ef2cf2478e86a736b7b65 (patch) | |
tree | 945e191b5815c1f98ad429cc66ed789ae6e473b2 | |
parent | 09c0162e8d1ee1480945725762557b8c8f02c2f2 (diff) | |
parent | 819d5e631757f33a751eab8491b39eaacdb0c0c9 (diff) | |
download | mitmproxy-863d2fbcb20b3f94df2ef2cf2478e86a736b7b65.tar.gz mitmproxy-863d2fbcb20b3f94df2ef2cf2478e86a736b7b65.tar.bz2 mitmproxy-863d2fbcb20b3f94df2ef2cf2478e86a736b7b65.zip |
Merge branch 'command-history-file' of github.com:typoon/mitmproxy into command-history-file
-rw-r--r-- | mitmproxy/addons/command_history.py | 191 | ||||
-rw-r--r-- | mitmproxy/tools/console/commander/commander.py | 13 | ||||
-rw-r--r-- | mitmproxy/tools/console/statusbar.py | 7 | ||||
-rw-r--r-- | test/mitmproxy/addons/test_command_history.py | 103 | ||||
-rw-r--r-- | test/mitmproxy/tools/console/test_commander.py | 12 |
5 files changed, 105 insertions, 221 deletions
diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index ad4c2346..f86a6196 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -1,6 +1,5 @@ -import atexit -import collections import os +import pathlib import typing from mitmproxy import command @@ -8,137 +7,77 @@ from mitmproxy import ctx class CommandHistory: - def __init__(self, size: int = 300) -> None: - self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size) - self.is_configured = False + VACUUM_SIZE = 1024 - self.filtered_commands: typing.Deque[str] = collections.deque() - self.current_index: int = -1 - self.filter_str: str = '' - self.command_history_path: str = '' + def __init__(self) -> None: + self.history: typing.List[str] = [] + self.filtered_history: typing.List[str] = [""] + self.current_index: int = 0 - atexit.register(self.cleanup) - - def cleanup(self): - self._sync_saved_commands() + def load(self, loader): + loader.add_option( + "command_history", bool, True, + """Persist command history between mitmproxy invocations.""" + ) @property - def last_filtered_index(self): - return len(self.filtered_commands) - 1 - - @command.command("command_history.clear") - def clear_history(self): - self.saved_commands.clear() - self.filtered_commands.clear() - - with open(self.command_history_path, 'w') as f: - f.truncate(0) - f.seek(0) - f.flush() - f.close() - - self.restart() - - @command.command("command_history.cancel") - def restart(self) -> None: - self.filtered_commands = self.saved_commands.copy() - self.current_index = -1 - - @command.command("command_history.next") - def get_next(self) -> str: - - if self.current_index == -1 or self.current_index == self.last_filtered_index: - self.current_index = -1 - return '' - elif self.current_index < self.last_filtered_index: - self.current_index += 1 - - ret = self.filtered_commands[self.current_index] - - return ret - - @command.command("command_history.prev") - def get_prev(self) -> str: - - if self.current_index == -1: - if self.last_filtered_index >= 0: - self.current_index = self.last_filtered_index - else: - return '' - - elif self.current_index > 0: - self.current_index -= 1 - - ret = self.filtered_commands[self.current_index] - - return ret - - @command.command("command_history.filter") - def set_filter(self, command: str) -> None: - self.filter_str = command - - _filtered_commands = [c for c in self.saved_commands if c.startswith(command)] - self.filtered_commands = collections.deque(_filtered_commands) - - if command and command not in self.filtered_commands: - self.filtered_commands.append(command) - - self.current_index = -1 - - @command.command("command_history.add") + def history_file(self) -> pathlib.Path: + return pathlib.Path(os.path.expanduser(ctx.options.confdir)) / "command_history" + + def running(self): + # FIXME: We have a weird bug where the contract for configure is not followed and it is never called with + # confdir or command_history as updated. + self.configure("command_history") + + def configure(self, updated): + if "command_history" in updated or "confdir" in updated: + if ctx.options.command_history and self.history_file.is_file(): + self.history = self.history_file.read_text().splitlines() + + def done(self): + if ctx.options.command_history and len(self.history) > self.VACUUM_SIZE: + # vacuum history so that it doesn't grow indefinitely. + history_str = "\n".join(self.history[-self.VACUUM_SIZE/2:]) + "\n" + self.history_file.write_text(history_str) + + @command.command("commands.history.add") def add_command(self, command: str) -> None: - if command.strip() == '': + if not command.strip(): return - self._sync_saved_commands() - - if command in self.saved_commands: - self.saved_commands.remove(command) - self.saved_commands.append(command) - - _history_str = "\n".join(self.saved_commands) - with open(self.command_history_path, 'w') as f: - f.truncate(0) - f.seek(0) - f.write(_history_str) - f.flush() - f.close() - - self.restart() - - def _sync_saved_commands(self): - # First read all commands from the file to merge anything that may - # have come from a different instance of the mitmproxy or sister tools - if not os.path.exists(self.command_history_path): - return + self.history.append(command) + if ctx.options.command_history: + with self.history_file.open("a") as f: + f.write(f"{command}\n") - with open(self.command_history_path, 'r') as f: - _history_lines = f.readlines() - f.close() + @command.command("commands.history.get") + def get_history(self) -> typing.Sequence[str]: + """Get the entire command history.""" + return self.history.copy() - self.saved_commands.clear() - for l in _history_lines: - l = l.strip() - if l in self.saved_commands: - self.saved_commands.remove(l) - self.saved_commands.append(l.strip()) - - def configure(self, updated: typing.Set[str]): - if self.is_configured: - return - - _command_history_dir = os.path.expanduser(ctx.options.confdir) - if not os.path.exists(_command_history_dir): - os.makedirs(_command_history_dir) - - self.command_history_path = os.path.join(_command_history_dir, 'command_history') - _history_lines: typing.List[str] = [] - if os.path.exists(self.command_history_path): - with open(self.command_history_path, 'r') as f: - _history_lines = f.readlines() - f.close() - - for l in _history_lines: - self.add_command(l.strip()) + @command.command("commands.history.clear") + def clear_history(self): + self.history_file.unlink() + self.history = [] + + # Functionality to provide a filtered list that can be iterated through. + + @command.command("commands.history.filter") + def set_filter(self, prefix: str) -> None: + self.filtered_history = [ + cmd + for cmd in self.history + if cmd.startswith(prefix) + ] + self.filtered_history.append(prefix) + self.current_index = len(self.filtered_history) - 1 + + @command.command("commands.history.next") + def get_next(self) -> str: + self.current_index = min(self.current_index + 1, len(self.filtered_history) - 1) + return self.filtered_history[self.current_index] - self.is_configured = True + @command.command("commands.history.prev") + def get_prev(self) -> str: + self.current_index = max(0, self.current_index - 1) + return self.filtered_history[self.current_index] diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index 0f45aa1f..0feae28e 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -9,8 +9,6 @@ import mitmproxy.flow import mitmproxy.master import mitmproxy.types -from mitmproxy import command_lexer - class Completer: @abc.abstractmethod @@ -200,7 +198,7 @@ class CommandEdit(urwid.WidgetWrap): self.cbuf.backspace() if self.cbuf.text == '': self.active_filter = False - self.master.commands.execute("command_history.filter ''") + self.master.commands.call("commands.history.filter", "") self.filter_str = '' elif key == "left" or key == "ctrl b": self.cbuf.left() @@ -210,20 +208,19 @@ class CommandEdit(urwid.WidgetWrap): if self.active_filter is False: self.active_filter = True self.filter_str = self.cbuf.text - _cmd = command_lexer.quote(self.cbuf.text) - self.master.commands.execute("command_history.filter %s" % _cmd) - cmd = self.master.commands.execute("command_history.prev") + self.master.commands.call("commands.history.filter", self.cbuf.text) + cmd = self.master.commands.execute("commands.history.prev") self.cbuf = CommandBuffer(self.master, cmd) elif key == "down" or key == "ctrl n": prev_cmd = self.cbuf.text - cmd = self.master.commands.execute("command_history.next") + cmd = self.master.commands.execute("commands.history.next") if cmd == '': if prev_cmd == self.filter_str: self.cbuf = CommandBuffer(self.master, prev_cmd) else: self.active_filter = False - self.master.commands.execute("command_history.filter ''") + self.master.commands.call("commands.history.filter", "") self.filter_str = '' self.cbuf = CommandBuffer(self.master, '') else: diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 39141b97..43b81d22 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -132,7 +132,6 @@ class ActionBar(urwid.WidgetWrap): def keypress(self, size, k): if self.prompting: if k == "esc": - self.master.commands.execute('command_history.cancel') self.prompt_done() elif self.onekey: if k == "enter": @@ -140,9 +139,9 @@ class ActionBar(urwid.WidgetWrap): elif k in self.onekey: self.prompt_execute(k) elif k == "enter": - cmd = command_lexer.quote(self._w.cbuf.text) - self.master.commands.execute(f"command_history.add {cmd}") - self.prompt_execute(self._w.get_edit_text()) + text = self._w.get_edit_text() + self.prompt_execute(text) + self.master.commands.call("commands.history.add", text) else: if common.is_keypress(k): self._w.keypress(size, k) diff --git a/test/mitmproxy/addons/test_command_history.py b/test/mitmproxy/addons/test_command_history.py index 026ce53e..e38b4061 100644 --- a/test/mitmproxy/addons/test_command_history.py +++ b/test/mitmproxy/addons/test_command_history.py @@ -1,81 +1,30 @@ import os -import pytest -from mitmproxy import options from mitmproxy.addons import command_history from mitmproxy.test import taddons -@pytest.fixture(autouse=True) -def tctx(tmpdir): - # This runs before each test - dir_name = tmpdir.mkdir('mitmproxy').dirname - confdir = dir_name - - opts = options.Options() - opts.set(*[f"confdir={confdir}"]) - tctx = taddons.context(options=opts) - ch = command_history.CommandHistory() - tctx.master.addons.add(ch) - ch.configure([]) - - yield tctx - - # This runs after each test - ch.cleanup() - - class TestCommandHistory: - def test_existing_command_history(self, tctx): + def test_load_from_file(self, tmpdir): commands = ['cmd1', 'cmd2', 'cmd3'] - confdir = tctx.options.confdir - f = open(os.path.join(confdir, 'command_history'), 'w') - f.write("\n".join(commands)) - f.close() - - history = command_history.CommandHistory() - history.configure([]) - - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] + with open(tmpdir.join('command_history'), 'w') as f: + f.write("\n".join(commands)) - history.cleanup() + ch = command_history.CommandHistory() + with taddons.context(ch) as tctx: + tctx.options.confdir = str(tmpdir) + assert ch.history == commands - def test_add_command(self, tctx): - history = command_history.CommandHistory(3) - history.configure([]) + def test_add_command(self): + history = command_history.CommandHistory() history.add_command('cmd1') history.add_command('cmd2') - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd1', 'cmd2'] - - history.add_command('') - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd1', 'cmd2'] - - # The history size is only 3. So, we forget the first - # one command, when adding fourth command - history.add_command('cmd3') - history.add_command('cmd4') - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd2', 'cmd3', 'cmd4'] + assert history.history == ['cmd1', 'cmd2'] history.add_command('') - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd2', 'cmd3', 'cmd4'] - - # Commands with the same text are not repeated in the history one by one - history.add_command('cmd3') - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd2', 'cmd4', 'cmd3'] - - history.add_command('cmd2') - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd4', 'cmd3', 'cmd2'] - - history.cleanup() + assert history.history == ['cmd1', 'cmd2'] def test_get_next_and_prev(self, tctx): history = command_history.CommandHistory(5) @@ -161,7 +110,7 @@ class TestCommandHistory: history.add_command('cmd2') history.clear_history() - saved_commands = [cmd for cmd in history.saved_commands] + saved_commands = [cmd for cmd in history.history] assert saved_commands == [] assert history.get_next() == '' @@ -215,57 +164,57 @@ class TestCommandHistory: for i in instances: i.configure([]) - saved_commands = [cmd for cmd in i.saved_commands] + saved_commands = [cmd for cmd in i.history] assert saved_commands == [] instances[0].add_command('cmd1') - saved_commands = [cmd for cmd in instances[0].saved_commands] + saved_commands = [cmd for cmd in instances[0].history] assert saved_commands == ['cmd1'] # These instances haven't yet added a new command, so they haven't # yet reloaded their commands from the command file. # This is expected, because if the user is filtering a command on # another window, we don't want to interfere with that - saved_commands = [cmd for cmd in instances[1].saved_commands] + saved_commands = [cmd for cmd in instances[1].history] assert saved_commands == [] - saved_commands = [cmd for cmd in instances[2].saved_commands] + saved_commands = [cmd for cmd in instances[2].history] assert saved_commands == [] # Since the second instanced added a new command, its list of # saved commands has been updated to have the commands from the # first instance + its own commands instances[1].add_command('cmd2') - saved_commands = [cmd for cmd in instances[1].saved_commands] + saved_commands = [cmd for cmd in instances[1].history] assert saved_commands == ['cmd1', 'cmd2'] - saved_commands = [cmd for cmd in instances[0].saved_commands] + saved_commands = [cmd for cmd in instances[0].history] assert saved_commands == ['cmd1'] # Third instance is still empty as it has not yet ran any command - saved_commands = [cmd for cmd in instances[2].saved_commands] + saved_commands = [cmd for cmd in instances[2].history] assert saved_commands == [] instances[2].add_command('cmd3') - saved_commands = [cmd for cmd in instances[2].saved_commands] + saved_commands = [cmd for cmd in instances[2].history] assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] instances[0].add_command('cmd4') - saved_commands = [cmd for cmd in instances[0].saved_commands] + saved_commands = [cmd for cmd in instances[0].history] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] instances.append(command_history.CommandHistory(10)) instances[3].configure([]) - saved_commands = [cmd for cmd in instances[3].saved_commands] + saved_commands = [cmd for cmd in instances[3].history] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] instances[0].add_command('cmd_before_close') instances.pop(0) - saved_commands = [cmd for cmd in instances[0].saved_commands] + saved_commands = [cmd for cmd in instances[0].history] assert saved_commands == ['cmd1', 'cmd2'] instances[0].add_command('new_cmd') - saved_commands = [cmd for cmd in instances[0].saved_commands] + saved_commands = [cmd for cmd in instances[0].history] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd'] instances.pop(0) @@ -285,7 +234,7 @@ class TestCommandHistory: for i in instances: i.configure([]) i.clear_history() - saved_commands = [cmd for cmd in i.saved_commands] + saved_commands = [cmd for cmd in i.history] assert saved_commands == [] instances[0].add_command('cmd1') @@ -294,7 +243,7 @@ class TestCommandHistory: instances[1].add_command('cmd4') instances[1].add_command('cmd5') - saved_commands = [cmd for cmd in instances[1].saved_commands] + saved_commands = [cmd for cmd in instances[1].history] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd5'] instances.pop() diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index 4fa10eb8..aa859092 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -114,8 +114,8 @@ class TestCommandEdit: def test_up_and_down(self, tctx): edit = commander.CommandEdit(tctx.master, '') - tctx.master.commands.execute('command_history.clear') - tctx.master.commands.execute('command_history.add "cmd1"') + tctx.master.commands.execute('commands.history.clear') + tctx.master.commands.execute('commands.history.add "cmd1"') edit.keypress(1, 'up') assert edit.get_edit_text() == 'cmd1' @@ -131,9 +131,9 @@ class TestCommandEdit: edit = commander.CommandEdit(tctx.master, '') - tctx.master.commands.execute('command_history.clear') - tctx.master.commands.execute('command_history.add "cmd1"') - tctx.master.commands.execute('command_history.add "cmd2"') + tctx.master.commands.execute('commands.history.clear') + tctx.master.commands.execute('commands.history.add "cmd1"') + tctx.master.commands.execute('commands.history.add "cmd2"') edit.keypress(1, 'up') assert edit.get_edit_text() == 'cmd2' @@ -168,7 +168,7 @@ class TestCommandEdit: assert edit.get_edit_text() == 'abc' edit = commander.CommandEdit(tctx.master, '') - tctx.master.commands.execute('command_history.add "cmd3"') + tctx.master.commands.execute('commands.history.add "cmd3"') edit.keypress(1, 'z') edit.keypress(1, 'up') |