From 16b55f9476373347a3c2553e070497b383288360 Mon Sep 17 00:00:00 2001 From: Henrique Date: Fri, 22 Nov 2019 10:00:17 -0500 Subject: Implemented feature to save command history to a file. This allows users to reuse their commands the next time they open mitmproxy --- mitmproxy/tools/console/commander/commander.py | 46 +++++++++++++++++++++++--- test/mitmproxy/tools/console/test_commander.py | 7 ++-- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index d751422b..d661d530 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -2,6 +2,7 @@ import abc import collections import copy import typing +import os import urwid from urwid.text_layout import calc_coords @@ -147,29 +148,55 @@ class CommandBuffer: self.completion = None +# TODO: This class should be a Singleton class CommandHistory: - def __init__(self, master: mitmproxy.master.Master, size: int = 30) -> None: + def __init__(self, master: mitmproxy.master.Master, size: int = 300) -> None: self.saved_commands: collections.deque = collections.deque( [CommandBuffer(master, "")], maxlen=size ) self.index: int = 0 + self.size: int = size + self.master: mitmproxy.master.Master = master + + _command_history_path = os.path.join(os.path.expanduser(mitmproxy.options.CONF_DIR), 'command_history') + if os.path.exists(_command_history_path): + with open(_command_history_path, 'r') as f: + for l in f.readlines(): + cbuf = CommandBuffer(master, l.strip()) + self.add_command(cbuf) + f.close() + + self.command_history_file = open(_command_history_path, 'w') @property def last_index(self): return len(self.saved_commands) - 1 + def clear_history(self): + """ + Needed for test suite. + TODO: Maybe create a command to clear the history? + """ + self.saved_commands: collections.deque = collections.deque( + [CommandBuffer(self.master, "")], + maxlen=self.size + ) + + self.index = 0 + self.command_history_file.truncate(0) + self.command_history_file.seek(0) + self.command_history_file.flush() + def get_next(self) -> typing.Optional[CommandBuffer]: if self.index < self.last_index: self.index = self.index + 1 - return self.saved_commands[self.index] - return None + return self.saved_commands[self.index] def get_prev(self) -> typing.Optional[CommandBuffer]: if self.index > 0: self.index = self.index - 1 - return self.saved_commands[self.index] - return None + return self.saved_commands[self.index] def add_command(self, command: CommandBuffer, execution: bool = False) -> None: if self.index == self.last_index or execution: @@ -184,6 +211,15 @@ class CommandHistory: if execution: self.index = self.last_index + # This prevents the constructor from trying to overwrite the file + # that it is currently reading + if hasattr(self, 'command_history_file'): + _history_str = "\n".join([c.text for c in self.saved_commands]) + self.command_history_file.truncate(0) + self.command_history_file.seek(0) + self.command_history_file.write(_history_str) + self.command_history_file.flush() + class CommandEdit(urwid.WidgetWrap): leader = ": " diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index a77be043..5e11166e 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -97,6 +97,7 @@ class TestCommandEdit: def test_up_and_down(self): with taddons.context() as tctx: history = commander.CommandHistory(tctx.master, size=3) + history.clear_history() edit = commander.CommandEdit(tctx.master, '', history) buf = commander.CommandBuffer(tctx.master, 'cmd1') @@ -112,6 +113,7 @@ class TestCommandEdit: assert edit.get_edit_text() == 'cmd1' history = commander.CommandHistory(tctx.master, size=5) + history.clear_history() edit = commander.CommandEdit(tctx.master, '', history) edit.keypress(1, 'a') edit.keypress(1, 'b') @@ -139,6 +141,7 @@ class TestCommandHistory: def fill_history(self, commands): with taddons.context() as tctx: history = commander.CommandHistory(tctx.master, size=3) + history.clear_history() for c in commands: cbuf = commander.CommandBuffer(tctx.master, c) history.add_command(cbuf) @@ -183,7 +186,7 @@ class TestCommandHistory: for i in range(3): assert history.get_next().text == expected_items[i] # We are at the last item of the history - assert history.get_next() is None + assert history.get_next().text == expected_items[-1] def test_get_prev(self): commands = ["command1", "command2"] @@ -194,7 +197,7 @@ class TestCommandHistory: for i in range(3): assert history.get_prev().text == expected_items[i] # We are at the first item of the history - assert history.get_prev() is None + assert history.get_prev().text == expected_items[-1] class TestCommandBuffer: -- cgit v1.2.3 From a866b424fe60928fb5336f1fa146326424763ca5 Mon Sep 17 00:00:00 2001 From: Henrique Date: Sat, 23 Nov 2019 15:31:00 -0500 Subject: Moved command history to an addon and added a new feature: * If you start typing a command and press "up" only commands starting with that string will be returned --- mitmproxy/addons/__init__.py | 2 + mitmproxy/addons/command_history.py | 152 +++++++++++++++++++++++++ mitmproxy/command.py | 4 +- mitmproxy/tools/console/commander/commander.py | 89 ++------------- mitmproxy/tools/console/statusbar.py | 11 +- mitmproxy/utils/debug.py | 4 + 6 files changed, 176 insertions(+), 86 deletions(-) create mode 100644 mitmproxy/addons/command_history.py diff --git a/mitmproxy/addons/__init__.py b/mitmproxy/addons/__init__.py index 838fba9b..ee238938 100644 --- a/mitmproxy/addons/__init__.py +++ b/mitmproxy/addons/__init__.py @@ -4,6 +4,7 @@ from mitmproxy.addons import block from mitmproxy.addons import browser from mitmproxy.addons import check_ca from mitmproxy.addons import clientplayback +from mitmproxy.addons import command_history from mitmproxy.addons import core from mitmproxy.addons import cut from mitmproxy.addons import disable_h2c @@ -30,6 +31,7 @@ def default_addons(): anticomp.AntiComp(), check_ca.CheckCA(), clientplayback.ClientPlayback(), + command_history.CommandHistory(), cut.Cut(), disable_h2c.DisableH2C(), export.Export(), diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py new file mode 100644 index 00000000..2c348d1e --- /dev/null +++ b/mitmproxy/addons/command_history.py @@ -0,0 +1,152 @@ +import collections +import copy +import os +import typing + +import mitmproxy.options +import mitmproxy.types + +from mitmproxy import command +from mitmproxy.tools.console.commander.commander import CommandBuffer + + +class CommandHistory: + def __init__(self, size: int = 300) -> None: + self.saved_commands: typing.Deque[str] = collections.deque( + maxlen=size + ) + self.index: int = 0 + + self.filter: str = '' + self.filtered_index: int = 0 + self.filtered_commands: typing.Deque[str] = collections.deque() + self.filter_active: bool = True + + _command_history_path = os.path.join(os.path.expanduser(mitmproxy.options.CONF_DIR), 'command_history') + _history_lines = open(_command_history_path, 'r').readlines() + + self.command_history_file = open(_command_history_path, 'w') + + for l in _history_lines: + self.add_command(l.strip(), True) + + @property + def last_index(self): + return len(self.saved_commands) - 1 + + @property + def last_filtered_index(self): + return len(self.filtered_commands) - 1 + + @command.command("command_history.clear") + def clear_history(self): + self.saved_commands.clear() + self.index = 0 + self.command_history_file.truncate(0) + self.command_history_file.seek(0) + self.command_history_file.flush() + self.filter = '' + self.filtered_index = 0 + self.filtered_commands.clear() + self.filter_active = True + + @command.command("command_history.next") + def get_next(self) -> str: + if self.last_index == -1: + return '' + + if self.filter != '': + if self.filtered_index < self.last_filtered_index: + self.filtered_index = self.filtered_index + 1 + ret = self.filtered_commands[self.filtered_index] + else: + if self.index == -1: + ret = '' + elif self.index < self.last_index: + self.index = self.index + 1 + ret = self.saved_commands[self.index] + else: + self.index = -1 + ret = '' + + + return ret + + @command.command("command_history.prev") + def get_prev(self) -> str: + if self.last_index == -1: + return '' + + if self.filter != '': + if self.filtered_index > 0: + self.filtered_index = self.filtered_index - 1 + ret = self.filtered_commands[self.filtered_index] + else: + if self.index == -1: + self.index = self.last_index + elif self.index > 0: + self.index = self.index - 1 + + ret = self.saved_commands[self.index] + + return ret + + @command.command("command_history.filter") + def set_filter(self, command: str) -> None: + """ + This is used when the user starts typing part of a command + and then press the "up" arrow. This way, the results returned are + only for the command that the user started typing + """ + if command.strip() == '': + return + + if self.filter != '': + last_filtered_command = self.filtered_commands[-1] + if command == last_filtered_command: + self.filter = '' + self.filtered_commands = [] + self.filtered_index = 0 + else: + self.filter = command + _filtered_commands = [c for c in self.saved_commands if c.startswith(command)] + self.filtered_commands = collections.deque(_filtered_commands) + + if command not in self.filtered_commands: + self.filtered_commands.append(command) + + self.filtered_index = self.last_filtered_index + + # No commands found, so act like no filter was added + if len(self.filtered_commands) == 1: + self.add_command(command) + self.filter = '' + + @command.command("command_history.cancel") + def restart(self) -> None: + self.index = -1 + self.filter = '' + self.filtered_commands = [] + self.filtered_index = 0 + + @command.command("command_history.add") + def add_command(self, command: str, execution: bool = False) -> None: + if command.strip() == '': + return + + if execution: + if command in self.saved_commands: + self.saved_commands.remove(command) + + self.saved_commands.append(command) + + _history_str = "\n".join(self.saved_commands) + self.command_history_file.truncate(0) + self.command_history_file.seek(0) + self.command_history_file.write(_history_str) + self.command_history_file.flush() + + self.restart() + else: + if command not in self.saved_commands: + self.saved_commands.append(command) diff --git a/mitmproxy/command.py b/mitmproxy/command.py index 6977ff91..23c39594 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -101,7 +101,9 @@ class Command: try: bound_arguments = self.signature.bind(*args) except TypeError as v: - raise exceptions.CommandError(f"Command argument mismatch: {v.args[0]}") + expected = f'Expected: {str(self.signature.parameters)}' + received = f'Received: {str(args)}' + raise exceptions.CommandError(f"Command argument mismatch: \n\t{expected}\n\t{received}") for name, value in bound_arguments.arguments.items(): convert_to = self.signature.parameters[name].annotation diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index d661d530..c59c7f83 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -12,6 +12,7 @@ import mitmproxy.flow import mitmproxy.master import mitmproxy.types +from mitmproxy import command_lexer class Completer: @abc.abstractmethod @@ -148,88 +149,13 @@ class CommandBuffer: self.completion = None -# TODO: This class should be a Singleton -class CommandHistory: - def __init__(self, master: mitmproxy.master.Master, size: int = 300) -> None: - self.saved_commands: collections.deque = collections.deque( - [CommandBuffer(master, "")], - maxlen=size - ) - self.index: int = 0 - self.size: int = size - self.master: mitmproxy.master.Master = master - - _command_history_path = os.path.join(os.path.expanduser(mitmproxy.options.CONF_DIR), 'command_history') - if os.path.exists(_command_history_path): - with open(_command_history_path, 'r') as f: - for l in f.readlines(): - cbuf = CommandBuffer(master, l.strip()) - self.add_command(cbuf) - f.close() - - self.command_history_file = open(_command_history_path, 'w') - - @property - def last_index(self): - return len(self.saved_commands) - 1 - - def clear_history(self): - """ - Needed for test suite. - TODO: Maybe create a command to clear the history? - """ - self.saved_commands: collections.deque = collections.deque( - [CommandBuffer(self.master, "")], - maxlen=self.size - ) - - self.index = 0 - self.command_history_file.truncate(0) - self.command_history_file.seek(0) - self.command_history_file.flush() - - def get_next(self) -> typing.Optional[CommandBuffer]: - if self.index < self.last_index: - self.index = self.index + 1 - return self.saved_commands[self.index] - - def get_prev(self) -> typing.Optional[CommandBuffer]: - if self.index > 0: - self.index = self.index - 1 - return self.saved_commands[self.index] - - def add_command(self, command: CommandBuffer, execution: bool = False) -> None: - if self.index == self.last_index or execution: - last_item = self.saved_commands[-1] - last_item_empty = not last_item.text - if last_item.text == command.text or (last_item_empty and execution): - self.saved_commands[-1] = copy.copy(command) - else: - self.saved_commands.append(command) - if not execution and self.index < self.last_index: - self.index += 1 - if execution: - self.index = self.last_index - - # This prevents the constructor from trying to overwrite the file - # that it is currently reading - if hasattr(self, 'command_history_file'): - _history_str = "\n".join([c.text for c in self.saved_commands]) - self.command_history_file.truncate(0) - self.command_history_file.seek(0) - self.command_history_file.write(_history_str) - self.command_history_file.flush() - - class CommandEdit(urwid.WidgetWrap): leader = ": " - def __init__(self, master: mitmproxy.master.Master, - text: str, history: CommandHistory) -> None: + def __init__(self, master: mitmproxy.master.Master, text: str) -> None: super().__init__(urwid.Text(self.leader)) self.master = master self.cbuf = CommandBuffer(master, text) - self.history = history self.update() def keypress(self, size, key) -> None: @@ -240,10 +166,15 @@ class CommandEdit(urwid.WidgetWrap): elif key == "right": self.cbuf.right() elif key == "up": - self.history.add_command(self.cbuf) - self.cbuf = self.history.get_prev() or self.cbuf + _cmd = command_lexer.quote(self.cbuf.text) + self.master.commands.execute("command_history.filter %s" % _cmd) + cmd = self.master.commands.execute("command_history.prev") + self.cbuf = CommandBuffer(self.master, cmd) elif key == "down": - self.cbuf = self.history.get_next() or self.cbuf + _cmd = command_lexer.quote(self.cbuf.text) + self.master.commands.execute("command_history.filter %s" % _cmd) + cmd = self.master.commands.execute("command_history.next") + self.cbuf = CommandBuffer(self.master, cmd) elif key == "shift tab": self.cbuf.cycle_completion(False) elif key == "tab": diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 43f5170d..6d040d92 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -3,10 +3,11 @@ from typing import Optional import urwid +import mitmproxy.tools.console.master # noqa +from mitmproxy import command_lexer from mitmproxy.tools.console import common from mitmproxy.tools.console import signals from mitmproxy.tools.console import commandexecutor -import mitmproxy.tools.console.master # noqa from mitmproxy.tools.console.commander import commander @@ -43,8 +44,6 @@ class ActionBar(urwid.WidgetWrap): signals.status_prompt_onekey.connect(self.sig_prompt_onekey) signals.status_prompt_command.connect(self.sig_prompt_command) - self.command_history = commander.CommandHistory(master) - self.prompting = None self.onekey = False @@ -104,7 +103,6 @@ class ActionBar(urwid.WidgetWrap): self._w = commander.CommandEdit( self.master, partial, - self.command_history, ) if cursor is not None: self._w.cbuf.cursor = cursor @@ -134,7 +132,7 @@ class ActionBar(urwid.WidgetWrap): def keypress(self, size, k): if self.prompting: if k == "esc": - self.command_history.index = self.command_history.last_index + self.master.commands.execute('command_history.cancel') self.prompt_done() elif self.onekey: if k == "enter": @@ -142,7 +140,8 @@ class ActionBar(urwid.WidgetWrap): elif k in self.onekey: self.prompt_execute(k) elif k == "enter": - self.command_history.add_command(self._w.cbuf, True) + cmd = command_lexer.quote(self._w.cbuf.text) + self.master.commands.execute(f"command_history.add {cmd} true") self.prompt_execute(self._w.get_edit_text()) else: if common.is_keypress(k): diff --git a/mitmproxy/utils/debug.py b/mitmproxy/utils/debug.py index c9ffb614..e36a50a4 100644 --- a/mitmproxy/utils/debug.py +++ b/mitmproxy/utils/debug.py @@ -10,6 +10,10 @@ from OpenSSL import SSL from mitmproxy import version +def remote_debug(host='localhost', port=4444): + import remote_pdb + remote_pdb.RemotePdb(host, port).set_trace() + def dump_system_info(): mitmproxy_version = version.get_dev_version() -- cgit v1.2.3 From 01b40e2c36267db67b19ff6267b5ae3fc8d0ad24 Mon Sep 17 00:00:00 2001 From: Henrique Date: Sat, 23 Nov 2019 16:31:03 -0500 Subject: Fix to create the confdir in case it doesn't exist --- mitmproxy/tools/_main.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/mitmproxy/tools/_main.py b/mitmproxy/tools/_main.py index 0163e8d3..581498ff 100644 --- a/mitmproxy/tools/_main.py +++ b/mitmproxy/tools/_main.py @@ -69,22 +69,35 @@ def run( debug.register_info_dumpers() opts = options.Options() - master = master_cls(opts) - parser = make_parser(opts) - # To make migration from 2.x to 3.0 bearable. - if "-R" in sys.argv and sys.argv[sys.argv.index("-R") + 1].startswith("http"): - print("-R is used for specifying replacements.\n" - "To use mitmproxy in reverse mode please use --mode reverse:SPEC instead") - try: args = parser.parse_args(arguments) except SystemExit: arg_check.check() sys.exit(1) + + try: opts.set(*args.setoptions, defer=True) + opts.confdir = os.path.expanduser(opts.confdir) + if not os.path.isdir(opts.confdir): + os.makedirs(opts.confdir) + + except exceptions.OptionsError as e: + print("%s: %s" % (sys.argv[0], e), file=sys.stderr) + sys.exit(1) + + master = master_cls(opts) + + # To make migration from 2.x to 3.0 bearable. + if "-R" in sys.argv and sys.argv[sys.argv.index("-R") + 1].startswith("http"): + print("-R is used for specifying replacements.\n" + "To use mitmproxy in reverse mode please use --mode reverse:SPEC instead") + + + try: + optmanager.load_paths( opts, os.path.join(opts.confdir, "config.yaml"), -- cgit v1.2.3 From 6d67286bd1e68b12fb1151cc6790e3a4bdadbe57 Mon Sep 17 00:00:00 2001 From: Henrique Date: Sat, 23 Nov 2019 16:31:21 -0500 Subject: Fix to check if command_history file exists prior to trying to read it --- mitmproxy/addons/command_history.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index 2c348d1e..113af213 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -7,6 +7,7 @@ import mitmproxy.options import mitmproxy.types from mitmproxy import command +from mitmproxy import ctx from mitmproxy.tools.console.commander.commander import CommandBuffer @@ -22,8 +23,10 @@ class CommandHistory: self.filtered_commands: typing.Deque[str] = collections.deque() self.filter_active: bool = True - _command_history_path = os.path.join(os.path.expanduser(mitmproxy.options.CONF_DIR), 'command_history') - _history_lines = open(_command_history_path, 'r').readlines() + _command_history_path = os.path.join(os.path.expanduser(ctx.options.confdir), 'command_history') + _history_lines = [] + if os.path.exists(_command_history_path): + _history_lines = open(_command_history_path, 'r').readlines() self.command_history_file = open(_command_history_path, 'w') -- cgit v1.2.3 From 1d43abcb289823107bd305ed2485af0c3986a270 Mon Sep 17 00:00:00 2001 From: Henrique Date: Sat, 23 Nov 2019 16:36:52 -0500 Subject: Making the linter happy --- mitmproxy/addons/command_history.py | 10 ++-------- mitmproxy/command.py | 2 +- mitmproxy/tools/_main.py | 3 --- mitmproxy/tools/console/commander/commander.py | 4 +--- mitmproxy/utils/debug.py | 1 + 5 files changed, 5 insertions(+), 15 deletions(-) diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index 113af213..9b1fd7f1 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -1,14 +1,9 @@ import collections -import copy import os import typing -import mitmproxy.options -import mitmproxy.types - from mitmproxy import command from mitmproxy import ctx -from mitmproxy.tools.console.commander.commander import CommandBuffer class CommandHistory: @@ -66,13 +61,12 @@ class CommandHistory: if self.index == -1: ret = '' elif self.index < self.last_index: - self.index = self.index + 1 + self.index = self.index + 1 ret = self.saved_commands[self.index] else: self.index = -1 ret = '' - return ret @command.command("command_history.prev") @@ -112,7 +106,7 @@ class CommandHistory: self.filtered_index = 0 else: self.filter = command - _filtered_commands = [c for c in self.saved_commands if c.startswith(command)] + _filtered_commands = [c for c in self.saved_commands if c.startswith(command)] self.filtered_commands = collections.deque(_filtered_commands) if command not in self.filtered_commands: diff --git a/mitmproxy/command.py b/mitmproxy/command.py index 23c39594..48f9051e 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -100,7 +100,7 @@ class Command: def prepare_args(self, args: typing.Sequence[str]) -> inspect.BoundArguments: try: bound_arguments = self.signature.bind(*args) - except TypeError as v: + except TypeError: expected = f'Expected: {str(self.signature.parameters)}' received = f'Received: {str(args)}' raise exceptions.CommandError(f"Command argument mismatch: \n\t{expected}\n\t{received}") diff --git a/mitmproxy/tools/_main.py b/mitmproxy/tools/_main.py index 581498ff..3e6933b9 100644 --- a/mitmproxy/tools/_main.py +++ b/mitmproxy/tools/_main.py @@ -77,7 +77,6 @@ def run( arg_check.check() sys.exit(1) - try: opts.set(*args.setoptions, defer=True) opts.confdir = os.path.expanduser(opts.confdir) @@ -95,9 +94,7 @@ def run( print("-R is used for specifying replacements.\n" "To use mitmproxy in reverse mode please use --mode reverse:SPEC instead") - try: - optmanager.load_paths( opts, os.path.join(opts.confdir, "config.yaml"), diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index c59c7f83..99533cfa 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -1,8 +1,5 @@ import abc -import collections -import copy import typing -import os import urwid from urwid.text_layout import calc_coords @@ -14,6 +11,7 @@ import mitmproxy.types from mitmproxy import command_lexer + class Completer: @abc.abstractmethod def cycle(self, forward: bool = True) -> str: diff --git a/mitmproxy/utils/debug.py b/mitmproxy/utils/debug.py index e36a50a4..282db927 100644 --- a/mitmproxy/utils/debug.py +++ b/mitmproxy/utils/debug.py @@ -10,6 +10,7 @@ from OpenSSL import SSL from mitmproxy import version + def remote_debug(host='localhost', port=4444): import remote_pdb remote_pdb.RemotePdb(host, port).set_trace() -- cgit v1.2.3 From 7b386d5393a68715e70a9ea6d2936c8b09104f86 Mon Sep 17 00:00:00 2001 From: Henrique Date: Sun, 24 Nov 2019 20:13:25 -0500 Subject: Fixed the logic according to some tests, added new tests --- mitmproxy/addons/command_history.py | 131 +++----- mitmproxy/tools/console/commander/commander.py | 30 +- mitmproxy/tools/console/statusbar.py | 2 +- test/mitmproxy/tools/console/test_commander.py | 412 +++++++++++++++---------- 4 files changed, 314 insertions(+), 261 deletions(-) diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index 9b1fd7f1..c97cab8f 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -8,15 +8,11 @@ from mitmproxy import ctx class CommandHistory: def __init__(self, size: int = 300) -> None: - self.saved_commands: typing.Deque[str] = collections.deque( - maxlen=size - ) - self.index: int = 0 + self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size) - self.filter: str = '' - self.filtered_index: int = 0 self.filtered_commands: typing.Deque[str] = collections.deque() - self.filter_active: bool = True + self.current_index: int = -1 + self.filter_str: str = '' _command_history_path = os.path.join(os.path.expanduser(ctx.options.confdir), 'command_history') _history_lines = [] @@ -26,11 +22,7 @@ class CommandHistory: self.command_history_file = open(_command_history_path, 'w') for l in _history_lines: - self.add_command(l.strip(), True) - - @property - def last_index(self): - return len(self.saved_commands) - 1 + self.add_command(l.strip()) @property def last_filtered_index(self): @@ -39,111 +31,72 @@ class CommandHistory: @command.command("command_history.clear") def clear_history(self): self.saved_commands.clear() - self.index = 0 + self.filtered_commands.clear() self.command_history_file.truncate(0) self.command_history_file.seek(0) self.command_history_file.flush() - self.filter = '' - self.filtered_index = 0 - self.filtered_commands.clear() - self.filter_active = True + self.restart() + + @command.command("command_history.cancel") + def restart(self) -> None: + self.filtered_commands = self.saved_commands.copy() + self.current_index = -1 @command.command("command_history.next") def get_next(self) -> str: - if self.last_index == -1: + + if self.current_index == -1 or self.current_index == self.last_filtered_index: + self.current_index = -1 return '' + elif self.current_index < self.last_filtered_index: + self.current_index += 1 - if self.filter != '': - if self.filtered_index < self.last_filtered_index: - self.filtered_index = self.filtered_index + 1 - ret = self.filtered_commands[self.filtered_index] - else: - if self.index == -1: - ret = '' - elif self.index < self.last_index: - self.index = self.index + 1 - ret = self.saved_commands[self.index] - else: - self.index = -1 - ret = '' + ret = self.filtered_commands[self.current_index] return ret @command.command("command_history.prev") def get_prev(self) -> str: - if self.last_index == -1: - return '' - if self.filter != '': - if self.filtered_index > 0: - self.filtered_index = self.filtered_index - 1 - ret = self.filtered_commands[self.filtered_index] - else: - if self.index == -1: - self.index = self.last_index - elif self.index > 0: - self.index = self.index - 1 + if self.current_index == -1: + if self.last_filtered_index >= 0: + self.current_index = self.last_filtered_index + else: + return '' - ret = self.saved_commands[self.index] + elif self.current_index > 0: + self.current_index -= 1 + + ret = self.filtered_commands[self.current_index] return ret @command.command("command_history.filter") def set_filter(self, command: str) -> None: - """ - This is used when the user starts typing part of a command - and then press the "up" arrow. This way, the results returned are - only for the command that the user started typing - """ - if command.strip() == '': - return - - if self.filter != '': - last_filtered_command = self.filtered_commands[-1] - if command == last_filtered_command: - self.filter = '' - self.filtered_commands = [] - self.filtered_index = 0 - else: - self.filter = command - _filtered_commands = [c for c in self.saved_commands if c.startswith(command)] - self.filtered_commands = collections.deque(_filtered_commands) - - if command not in self.filtered_commands: - self.filtered_commands.append(command) + self.filter_str = command - self.filtered_index = self.last_filtered_index + _filtered_commands = [c for c in self.saved_commands if c.startswith(command)] + self.filtered_commands = collections.deque(_filtered_commands) - # No commands found, so act like no filter was added - if len(self.filtered_commands) == 1: - self.add_command(command) - self.filter = '' + if command and command not in self.filtered_commands: + self.filtered_commands.append(command) - @command.command("command_history.cancel") - def restart(self) -> None: - self.index = -1 - self.filter = '' - self.filtered_commands = [] - self.filtered_index = 0 + self.current_index = -1 @command.command("command_history.add") - def add_command(self, command: str, execution: bool = False) -> None: + def add_command(self, command: str) -> None: if command.strip() == '': return - if execution: - if command in self.saved_commands: - self.saved_commands.remove(command) + if command in self.saved_commands: + self.saved_commands.remove(command) - self.saved_commands.append(command) + self.saved_commands.append(command) - _history_str = "\n".join(self.saved_commands) - self.command_history_file.truncate(0) - self.command_history_file.seek(0) - self.command_history_file.write(_history_str) - self.command_history_file.flush() + _history_str = "\n".join(self.saved_commands) + self.command_history_file.truncate(0) + self.command_history_file.seek(0) + self.command_history_file.write(_history_str) + self.command_history_file.flush() - self.restart() - else: - if command not in self.saved_commands: - self.saved_commands.append(command) + self.restart() diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index 99533cfa..ac313290 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -153,26 +153,46 @@ class CommandEdit(urwid.WidgetWrap): def __init__(self, master: mitmproxy.master.Master, text: str) -> None: super().__init__(urwid.Text(self.leader)) self.master = master + self.active_filter = False + self.filter_str = '' self.cbuf = CommandBuffer(master, text) self.update() def keypress(self, size, key) -> None: if key == "backspace": self.cbuf.backspace() + if self.cbuf.text == '': + self.active_filter = False + self.master.commands.execute("command_history.filter ''") + self.filter_str = '' elif key == "left": self.cbuf.left() elif key == "right": self.cbuf.right() elif key == "up": - _cmd = command_lexer.quote(self.cbuf.text) - self.master.commands.execute("command_history.filter %s" % _cmd) + if self.active_filter is False: + self.active_filter = True + self.filter_str = self.cbuf.text + _cmd = command_lexer.quote(self.cbuf.text) + self.master.commands.execute("command_history.filter %s" % _cmd) + cmd = self.master.commands.execute("command_history.prev") self.cbuf = CommandBuffer(self.master, cmd) elif key == "down": - _cmd = command_lexer.quote(self.cbuf.text) - self.master.commands.execute("command_history.filter %s" % _cmd) + prev_cmd = self.cbuf.text cmd = self.master.commands.execute("command_history.next") - self.cbuf = CommandBuffer(self.master, cmd) + + if cmd == '': + if prev_cmd == self.filter_str: + self.cbuf = CommandBuffer(self.master, prev_cmd) + else: + self.active_filter = False + self.master.commands.execute("command_history.filter ''") + self.filter_str = '' + self.cbuf = CommandBuffer(self.master, '') + else: + self.cbuf = CommandBuffer(self.master, cmd) + elif key == "shift tab": self.cbuf.cycle_completion(False) elif key == "tab": diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 6d040d92..39141b97 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -141,7 +141,7 @@ class ActionBar(urwid.WidgetWrap): self.prompt_execute(k) elif k == "enter": cmd = command_lexer.quote(self._w.cbuf.text) - self.master.commands.execute(f"command_history.add {cmd} true") + self.master.commands.execute(f"command_history.add {cmd}") self.prompt_execute(self._w.get_edit_text()) else: if common.is_keypress(k): diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index 5e11166e..b95fe707 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -1,9 +1,34 @@ +import os import pytest +import shutil +import uuid +from mitmproxy import options +from mitmproxy.addons import command_history from mitmproxy.test import taddons from mitmproxy.tools.console.commander import commander +@pytest.fixture(autouse=True) +def tctx(): + # This runs before each test + dir_id = str(uuid.uuid4()) + confdir = os.path.expanduser(f"~/.mitmproxy-test-suite-{dir_id}") + if not os.path.exists(confdir): + os.makedirs(confdir) + + opts = options.Options() + opts.set(*[f"confdir={confdir}"]) + tctx = taddons.context(options=opts) + ch = command_history.CommandHistory() + tctx.master.addons.add(ch) + + yield tctx + + # This runs after each test + shutil.rmtree(confdir) + + class TestListCompleter: def test_cycle(self): tests = [ @@ -23,181 +48,237 @@ class TestListCompleter: ["b", "ba", "bb", "b"] ], ] - for start, options, cycle in tests: - c = commander.ListCompleter(start, options) + for start, opts, cycle in tests: + c = commander.ListCompleter(start, opts) for expected in cycle: assert c.cycle() == expected class TestCommandEdit: - def test_open_command_bar(self): - with taddons.context() as tctx: - history = commander.CommandHistory(tctx.master, size=3) - edit = commander.CommandEdit(tctx.master, '', history) - try: - edit.update() - except IndexError: - pytest.faied("Unexpected IndexError") + def test_open_command_bar(self, tctx): + edit = commander.CommandEdit(tctx.master, '') - def test_insert(self): - with taddons.context() as tctx: - history = commander.CommandHistory(tctx.master, size=3) - edit = commander.CommandEdit(tctx.master, '', history) - edit.keypress(1, 'a') - assert edit.get_edit_text() == 'a' - - # Don't let users type a space before starting a command - # as a usability feature - history = commander.CommandHistory(tctx.master, size=3) - edit = commander.CommandEdit(tctx.master, '', history) - edit.keypress(1, ' ') - assert edit.get_edit_text() == '' + try: + edit.update() + except IndexError: + pytest.faied("Unexpected IndexError") - def test_backspace(self): - with taddons.context() as tctx: - history = commander.CommandHistory(tctx.master, size=3) - edit = commander.CommandEdit(tctx.master, '', history) - edit.keypress(1, 'a') - edit.keypress(1, 'b') - assert edit.get_edit_text() == 'ab' - edit.keypress(1, 'backspace') - assert edit.get_edit_text() == 'a' + def test_insert(self, tctx): + edit = commander.CommandEdit(tctx.master, '') + edit.keypress(1, 'a') + assert edit.get_edit_text() == 'a' - def test_left(self): - with taddons.context() as tctx: - history = commander.CommandHistory(tctx.master, size=3) - edit = commander.CommandEdit(tctx.master, '', history) - edit.keypress(1, 'a') - assert edit.cbuf.cursor == 1 - edit.keypress(1, 'left') - assert edit.cbuf.cursor == 0 + # Don't let users type a space before starting a command + # as a usability feature + edit = commander.CommandEdit(tctx.master, '') + edit.keypress(1, ' ') + assert edit.get_edit_text() == '' - # Do it again to make sure it won't go negative - edit.keypress(1, 'left') - assert edit.cbuf.cursor == 0 + def test_backspace(self, tctx): + edit = commander.CommandEdit(tctx.master, '') - def test_right(self): - with taddons.context() as tctx: - history = commander.CommandHistory(tctx.master, size=3) - edit = commander.CommandEdit(tctx.master, '', history) - edit.keypress(1, 'a') - assert edit.cbuf.cursor == 1 - - # Make sure cursor won't go past the text - edit.keypress(1, 'right') - assert edit.cbuf.cursor == 1 - - # Make sure cursor goes left and then back right - edit.keypress(1, 'left') - assert edit.cbuf.cursor == 0 - edit.keypress(1, 'right') - assert edit.cbuf.cursor == 1 - - def test_up_and_down(self): - with taddons.context() as tctx: - history = commander.CommandHistory(tctx.master, size=3) - history.clear_history() - edit = commander.CommandEdit(tctx.master, '', history) - - buf = commander.CommandBuffer(tctx.master, 'cmd1') - history.add_command(buf) - buf = commander.CommandBuffer(tctx.master, 'cmd2') - history.add_command(buf) - - edit.keypress(1, 'up') - assert edit.get_edit_text() == 'cmd2' - edit.keypress(1, 'up') - assert edit.get_edit_text() == 'cmd1' - edit.keypress(1, 'up') - assert edit.get_edit_text() == 'cmd1' - - history = commander.CommandHistory(tctx.master, size=5) - history.clear_history() - edit = commander.CommandEdit(tctx.master, '', history) - edit.keypress(1, 'a') - edit.keypress(1, 'b') - edit.keypress(1, 'c') - assert edit.get_edit_text() == 'abc' - edit.keypress(1, 'up') - assert edit.get_edit_text() == '' - edit.keypress(1, 'down') - assert edit.get_edit_text() == 'abc' - edit.keypress(1, 'down') - assert edit.get_edit_text() == 'abc' - - history = commander.CommandHistory(tctx.master, size=5) - edit = commander.CommandEdit(tctx.master, '', history) - buf = commander.CommandBuffer(tctx.master, 'cmd3') - history.add_command(buf) - edit.keypress(1, 'z') - edit.keypress(1, 'up') - assert edit.get_edit_text() == 'cmd3' - edit.keypress(1, 'down') - assert edit.get_edit_text() == 'z' - - -class TestCommandHistory: - def fill_history(self, commands): - with taddons.context() as tctx: - history = commander.CommandHistory(tctx.master, size=3) - history.clear_history() - for c in commands: - cbuf = commander.CommandBuffer(tctx.master, c) - history.add_command(cbuf) - return history, tctx.master - - def test_add_command(self): - commands = ["command1", "command2"] - history, tctx_master = self.fill_history(commands) - - saved_commands = [buf.text for buf in history.saved_commands] - assert saved_commands == [""] + commands - - # The history size is only 3. So, we forget the first - # one command, when adding fourth command - cbuf = commander.CommandBuffer(tctx_master, "command3") - history.add_command(cbuf) - saved_commands = [buf.text for buf in history.saved_commands] - assert saved_commands == commands + ["command3"] - - # Commands with the same text are not repeated in the history one by one - history.add_command(cbuf) - saved_commands = [buf.text for buf in history.saved_commands] - assert saved_commands == commands + ["command3"] - - # adding command in execution mode sets index at the beginning of the history - # and replace the last command buffer if it is empty or has the same text - cbuf = commander.CommandBuffer(tctx_master, "") - history.add_command(cbuf) - history.index = 0 - cbuf = commander.CommandBuffer(tctx_master, "command4") - history.add_command(cbuf, True) - assert history.index == history.last_index - saved_commands = [buf.text for buf in history.saved_commands] - assert saved_commands == ["command2", "command3", "command4"] - - def test_get_next(self): - commands = ["command1", "command2"] - history, tctx_master = self.fill_history(commands) - - history.index = -1 - expected_items = ["", "command1", "command2"] - for i in range(3): - assert history.get_next().text == expected_items[i] - # We are at the last item of the history - assert history.get_next().text == expected_items[-1] - - def test_get_prev(self): - commands = ["command1", "command2"] - history, tctx_master = self.fill_history(commands) - - expected_items = ["command2", "command1", ""] - history.index = history.last_index + 1 - for i in range(3): - assert history.get_prev().text == expected_items[i] - # We are at the first item of the history - assert history.get_prev().text == expected_items[-1] + edit.keypress(1, 'a') + edit.keypress(1, 'b') + assert edit.get_edit_text() == 'ab' + + edit.keypress(1, 'backspace') + assert edit.get_edit_text() == 'a' + + def test_left(self, tctx): + edit = commander.CommandEdit(tctx.master, '') + + edit.keypress(1, 'a') + assert edit.cbuf.cursor == 1 + + edit.keypress(1, 'left') + assert edit.cbuf.cursor == 0 + + # Do it again to make sure it won't go negative + edit.keypress(1, 'left') + assert edit.cbuf.cursor == 0 + + def test_right(self, tctx): + edit = commander.CommandEdit(tctx.master, '') + + edit.keypress(1, 'a') + assert edit.cbuf.cursor == 1 + + # Make sure cursor won't go past the text + edit.keypress(1, 'right') + assert edit.cbuf.cursor == 1 + + # Make sure cursor goes left and then back right + edit.keypress(1, 'left') + assert edit.cbuf.cursor == 0 + + edit.keypress(1, 'right') + assert edit.cbuf.cursor == 1 + + def test_up_and_down(self, tctx): + edit = commander.CommandEdit(tctx.master, '') + + tctx.master.commands.execute('command_history.clear') + tctx.master.commands.execute('command_history.add "cmd1"') + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd1' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd1' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == '' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == '' + + edit = commander.CommandEdit(tctx.master, '') + + tctx.master.commands.execute('command_history.clear') + tctx.master.commands.execute('command_history.add "cmd1"') + tctx.master.commands.execute('command_history.add "cmd2"') + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd2' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd1' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd1' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'cmd2' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == '' + + edit.keypress(1, 'a') + edit.keypress(1, 'b') + edit.keypress(1, 'c') + assert edit.get_edit_text() == 'abc' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'abc' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'abc' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'abc' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'abc' + + edit = commander.CommandEdit(tctx.master, '') + tctx.master.commands.execute('command_history.add "cmd3"') + + edit.keypress(1, 'z') + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'z' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'z' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'z' + + edit.keypress(1, 'backspace') + assert edit.get_edit_text() == '' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd3' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd2' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd1' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'cmd2' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'cmd3' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == '' + + edit.keypress(1, 'c') + assert edit.get_edit_text() == 'c' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == '' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd3' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == '' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd3' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd2' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'cmd3' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == '' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == '' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd3' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd2' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd1' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd1' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd1' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'cmd2' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'cmd3' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == '' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == '' + + edit.keypress(1, 'backspace') + assert edit.get_edit_text() == '' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd3' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd2' + + edit.keypress(1, 'up') + assert edit.get_edit_text() == 'cmd1' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'cmd2' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == 'cmd3' + + edit.keypress(1, 'down') + assert edit.get_edit_text() == '' class TestCommandBuffer: @@ -258,8 +339,7 @@ class TestCommandBuffer: cb.cursor = len(cb.text) cb.cycle_completion() - ch = commander.CommandHistory(tctx.master, 30) - ce = commander.CommandEdit(tctx.master, "se", ch) + ce = commander.CommandEdit(tctx.master, "se") ce.keypress(1, 'tab') ce.update() ret = ce.cbuf.render() -- cgit v1.2.3 From 6a7f27ff152fc05f9bbe847625e92dc1ecad3653 Mon Sep 17 00:00:00 2001 From: Henrique Date: Sun, 24 Nov 2019 20:21:14 -0500 Subject: Make `mypy` happy --- mitmproxy/addons/command_history.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index c97cab8f..3df2fe5d 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -15,7 +15,7 @@ class CommandHistory: self.filter_str: str = '' _command_history_path = os.path.join(os.path.expanduser(ctx.options.confdir), 'command_history') - _history_lines = [] + _history_lines: typing.List[str] = [] if os.path.exists(_command_history_path): _history_lines = open(_command_history_path, 'r').readlines() -- cgit v1.2.3 From 0d29804ab852893e4e41c415de1ce3cc08ffcfae Mon Sep 17 00:00:00 2001 From: Henrique Date: Mon, 25 Nov 2019 08:56:44 -0500 Subject: Moved confdir check logic into the addon --- mitmproxy/addons/command_history.py | 6 +++++- mitmproxy/tools/_main.py | 14 ++------------ 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index 3df2fe5d..c66af2be 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -14,7 +14,11 @@ class CommandHistory: self.current_index: int = -1 self.filter_str: str = '' - _command_history_path = os.path.join(os.path.expanduser(ctx.options.confdir), 'command_history') + _command_history_dir = os.path.expanduser(ctx.options.confdir) + if not os.path.exists(_command_history_dir): + os.makedirs(_command_history_dir) + + _command_history_path = os.path.join(_command_history_dir, 'command_history') _history_lines: typing.List[str] = [] if os.path.exists(_command_history_path): _history_lines = open(_command_history_path, 'r').readlines() diff --git a/mitmproxy/tools/_main.py b/mitmproxy/tools/_main.py index 3e6933b9..77f8e675 100644 --- a/mitmproxy/tools/_main.py +++ b/mitmproxy/tools/_main.py @@ -69,6 +69,7 @@ def run( debug.register_info_dumpers() opts = options.Options() + master = master_cls(opts) parser = make_parser(opts) try: @@ -77,24 +78,13 @@ def run( arg_check.check() sys.exit(1) - try: - opts.set(*args.setoptions, defer=True) - opts.confdir = os.path.expanduser(opts.confdir) - if not os.path.isdir(opts.confdir): - os.makedirs(opts.confdir) - - except exceptions.OptionsError as e: - print("%s: %s" % (sys.argv[0], e), file=sys.stderr) - sys.exit(1) - - master = master_cls(opts) - # To make migration from 2.x to 3.0 bearable. if "-R" in sys.argv and sys.argv[sys.argv.index("-R") + 1].startswith("http"): print("-R is used for specifying replacements.\n" "To use mitmproxy in reverse mode please use --mode reverse:SPEC instead") try: + opts.set(*args.setoptions, defer=True) optmanager.load_paths( opts, os.path.join(opts.confdir, "config.yaml"), -- cgit v1.2.3 From 1b5337e4d44ec0b3d3dc34d8bac66aa0ffff480a Mon Sep 17 00:00:00 2001 From: Henrique Date: Mon, 25 Nov 2019 09:17:40 -0500 Subject: Making windows happy --- mitmproxy/addons/command_history.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index c66af2be..8c1ef052 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -1,6 +1,7 @@ import collections import os import typing +import atexit from mitmproxy import command from mitmproxy import ctx @@ -28,6 +29,12 @@ class CommandHistory: for l in _history_lines: self.add_command(l.strip()) + atexit.register(self.cleanup) + + def cleanup(self): + if self.command_history_file: + self.command_history_file.close() + @property def last_filtered_index(self): return len(self.filtered_commands) - 1 -- cgit v1.2.3 From 640bec24e5e056b8f5963b890d6764f434ce5b3c Mon Sep 17 00:00:00 2001 From: Henrique Date: Mon, 25 Nov 2019 10:23:51 -0500 Subject: Oops, forgot to add the tests for the CommandHistory addon --- test/mitmproxy/addons/test_command_history.py | 196 ++++++++++++++++++++++++++ 1 file changed, 196 insertions(+) create mode 100644 test/mitmproxy/addons/test_command_history.py diff --git a/test/mitmproxy/addons/test_command_history.py b/test/mitmproxy/addons/test_command_history.py new file mode 100644 index 00000000..898692ec --- /dev/null +++ b/test/mitmproxy/addons/test_command_history.py @@ -0,0 +1,196 @@ +import os +import pytest +import shutil +import uuid + +from mitmproxy import options +from mitmproxy.addons import command_history +from mitmproxy.test import taddons + + +@pytest.fixture(autouse=True) +def tctx(): + # This runs before each test + dir_id = str(uuid.uuid4()) + confdir = os.path.expanduser(f"~/.mitmproxy-test-suite-{dir_id}") + if not os.path.exists(confdir): + os.makedirs(confdir) + + opts = options.Options() + opts.set(*[f"confdir={confdir}"]) + tctx = taddons.context(options=opts) + ch = command_history.CommandHistory() + tctx.master.addons.add(ch) + + yield tctx + + # This runs after each test + ch.command_history_file.close() # Makes windows happy + shutil.rmtree(confdir) + + +class TestCommandHistory: + def test_existing_command_history(self, tctx): + commands = ['cmd1', 'cmd2', 'cmd3'] + confdir = tctx.options.confdir + f = open(os.path.join(confdir, 'command_history'), 'w') + f.write("\n".join(commands)) + f.close() + + history = command_history.CommandHistory() + + saved_commands = [cmd for cmd in history.saved_commands] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] + + def test_add_command(self, tctx): + history = command_history.CommandHistory(3) + + history.add_command('cmd1') + history.add_command('cmd2') + + saved_commands = [cmd for cmd in history.saved_commands] + assert saved_commands == ['cmd1', 'cmd2'] + + history.add_command('') + saved_commands = [cmd for cmd in history.saved_commands] + assert saved_commands == ['cmd1', 'cmd2'] + + # The history size is only 3. So, we forget the first + # one command, when adding fourth command + history.add_command('cmd3') + history.add_command('cmd4') + saved_commands = [cmd for cmd in history.saved_commands] + assert saved_commands == ['cmd2', 'cmd3', 'cmd4'] + + history.add_command('') + saved_commands = [cmd for cmd in history.saved_commands] + assert saved_commands == ['cmd2', 'cmd3', 'cmd4'] + + # Commands with the same text are not repeated in the history one by one + history.add_command('cmd3') + saved_commands = [cmd for cmd in history.saved_commands] + assert saved_commands == ['cmd2', 'cmd4', 'cmd3'] + + history.add_command('cmd2') + saved_commands = [cmd for cmd in history.saved_commands] + assert saved_commands == ['cmd4', 'cmd3', 'cmd2'] + + def test_get_next_and_prev(self, tctx): + history = command_history.CommandHistory(5) + + history.add_command('cmd1') + + assert history.get_next() == '' + assert history.get_next() == '' + assert history.get_prev() == 'cmd1' + assert history.get_prev() == 'cmd1' + assert history.get_prev() == 'cmd1' + assert history.get_next() == '' + assert history.get_next() == '' + + history.add_command('cmd2') + + assert history.get_next() == '' + assert history.get_next() == '' + assert history.get_prev() == 'cmd2' + assert history.get_prev() == 'cmd1' + assert history.get_prev() == 'cmd1' + assert history.get_next() == 'cmd2' + assert history.get_next() == '' + assert history.get_next() == '' + + history.add_command('cmd3') + + assert history.get_next() == '' + assert history.get_next() == '' + assert history.get_prev() == 'cmd3' + assert history.get_prev() == 'cmd2' + assert history.get_prev() == 'cmd1' + assert history.get_prev() == 'cmd1' + assert history.get_next() == 'cmd2' + assert history.get_next() == 'cmd3' + assert history.get_next() == '' + assert history.get_next() == '' + assert history.get_prev() == 'cmd3' + assert history.get_prev() == 'cmd2' + + history.add_command('cmd4') + + assert history.get_prev() == 'cmd4' + assert history.get_prev() == 'cmd3' + assert history.get_prev() == 'cmd2' + assert history.get_prev() == 'cmd1' + assert history.get_prev() == 'cmd1' + assert history.get_next() == 'cmd2' + assert history.get_next() == 'cmd3' + assert history.get_next() == 'cmd4' + assert history.get_next() == '' + assert history.get_next() == '' + + history.add_command('cmd5') + history.add_command('cmd6') + + assert history.get_next() == '' + assert history.get_prev() == 'cmd6' + assert history.get_prev() == 'cmd5' + assert history.get_prev() == 'cmd4' + assert history.get_next() == 'cmd5' + assert history.get_prev() == 'cmd4' + assert history.get_prev() == 'cmd3' + assert history.get_prev() == 'cmd2' + assert history.get_next() == 'cmd3' + assert history.get_prev() == 'cmd2' + assert history.get_prev() == 'cmd2' + assert history.get_next() == 'cmd3' + assert history.get_next() == 'cmd4' + assert history.get_next() == 'cmd5' + assert history.get_next() == 'cmd6' + assert history.get_next() == '' + assert history.get_next() == '' + + def test_clear(self, tctx): + history = command_history.CommandHistory(3) + + history.add_command('cmd1') + history.add_command('cmd2') + history.clear_history() + + saved_commands = [cmd for cmd in history.saved_commands] + assert saved_commands == [] + + assert history.get_next() == '' + assert history.get_next() == '' + assert history.get_prev() == '' + assert history.get_prev() == '' + + def test_filter(self, tctx): + history = command_history.CommandHistory(3) + + history.add_command('cmd1') + history.add_command('cmd2') + history.add_command('abc') + history.set_filter('c') + + assert history.get_next() == '' + assert history.get_next() == '' + assert history.get_prev() == 'c' + assert history.get_prev() == 'cmd2' + assert history.get_prev() == 'cmd1' + assert history.get_prev() == 'cmd1' + assert history.get_next() == 'cmd2' + assert history.get_next() == 'c' + assert history.get_next() == '' + assert history.get_next() == '' + + history.set_filter('') + + assert history.get_next() == '' + assert history.get_next() == '' + assert history.get_prev() == 'abc' + assert history.get_prev() == 'cmd2' + assert history.get_prev() == 'cmd1' + assert history.get_prev() == 'cmd1' + assert history.get_next() == 'cmd2' + assert history.get_next() == 'abc' + assert history.get_next() == '' + assert history.get_next() == '' -- cgit v1.2.3 From ed7f0b4b39f622b13fce4d3c4ef154231fe07954 Mon Sep 17 00:00:00 2001 From: Henrique Date: Mon, 25 Nov 2019 10:24:46 -0500 Subject: Making windows happy --- test/mitmproxy/tools/console/test_commander.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index b95fe707..06e676c0 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -26,6 +26,7 @@ def tctx(): yield tctx # This runs after each test + ch.command_history_file.close() shutil.rmtree(confdir) -- cgit v1.2.3 From 1c8abaed78028f4a14044b06430ef86e7e36b6c3 Mon Sep 17 00:00:00 2001 From: Henrique Date: Mon, 25 Nov 2019 10:39:36 -0500 Subject: Make windows happy --- test/mitmproxy/addons/test_command_history.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/mitmproxy/addons/test_command_history.py b/test/mitmproxy/addons/test_command_history.py index 898692ec..847c273d 100644 --- a/test/mitmproxy/addons/test_command_history.py +++ b/test/mitmproxy/addons/test_command_history.py @@ -42,6 +42,8 @@ class TestCommandHistory: saved_commands = [cmd for cmd in history.saved_commands] assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] + history.command_history_file.close() + def test_add_command(self, tctx): history = command_history.CommandHistory(3) -- cgit v1.2.3 From 5b582a76a862b0ad31e048e1e5f3f66df09062bc Mon Sep 17 00:00:00 2001 From: Henrique Date: Mon, 25 Nov 2019 10:48:42 -0500 Subject: Make windows happy once again --- test/mitmproxy/addons/test_command_history.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/mitmproxy/addons/test_command_history.py b/test/mitmproxy/addons/test_command_history.py index 847c273d..405b8323 100644 --- a/test/mitmproxy/addons/test_command_history.py +++ b/test/mitmproxy/addons/test_command_history.py @@ -77,6 +77,8 @@ class TestCommandHistory: saved_commands = [cmd for cmd in history.saved_commands] assert saved_commands == ['cmd4', 'cmd3', 'cmd2'] + history.command_history_file.close() + def test_get_next_and_prev(self, tctx): history = command_history.CommandHistory(5) @@ -150,6 +152,8 @@ class TestCommandHistory: assert history.get_next() == '' assert history.get_next() == '' + history.command_history_file.close() + def test_clear(self, tctx): history = command_history.CommandHistory(3) @@ -165,6 +169,8 @@ class TestCommandHistory: assert history.get_prev() == '' assert history.get_prev() == '' + history.command_history_file.close() + def test_filter(self, tctx): history = command_history.CommandHistory(3) @@ -196,3 +202,5 @@ class TestCommandHistory: assert history.get_next() == 'abc' assert history.get_next() == '' assert history.get_next() == '' + + history.command_history_file.close() -- cgit v1.2.3 From 4464648c38021bb3fcbac6f978bf40e9636dad4c Mon Sep 17 00:00:00 2001 From: Henrique Date: Mon, 25 Nov 2019 13:08:09 -0500 Subject: Logic to handle multiple instances using CommandHistory. --- mitmproxy/addons/command_history.py | 30 +++++++--- test/mitmproxy/addons/test_command_history.py | 82 +++++++++++++++++++++++++-- 2 files changed, 99 insertions(+), 13 deletions(-) diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index 8c1ef052..fb27e805 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -1,7 +1,7 @@ +import atexit import collections import os import typing -import atexit from mitmproxy import command from mitmproxy import ctx @@ -19,12 +19,12 @@ class CommandHistory: if not os.path.exists(_command_history_dir): os.makedirs(_command_history_dir) - _command_history_path = os.path.join(_command_history_dir, 'command_history') + self.command_history_path = os.path.join(_command_history_dir, 'command_history') _history_lines: typing.List[str] = [] - if os.path.exists(_command_history_path): - _history_lines = open(_command_history_path, 'r').readlines() + if os.path.exists(self.command_history_path): + _history_lines = open(self.command_history_path, 'r').readlines() - self.command_history_file = open(_command_history_path, 'w') + self.command_history_file = open(self.command_history_path, 'w') for l in _history_lines: self.add_command(l.strip()) @@ -32,7 +32,8 @@ class CommandHistory: atexit.register(self.cleanup) def cleanup(self): - if self.command_history_file: + self._reload_saved_commands() + if self.command_history_file and not self.command_history_file.closed: self.command_history_file.close() @property @@ -99,9 +100,10 @@ class CommandHistory: if command.strip() == '': return + self._reload_saved_commands() + if command in self.saved_commands: self.saved_commands.remove(command) - self.saved_commands.append(command) _history_str = "\n".join(self.saved_commands) @@ -111,3 +113,17 @@ class CommandHistory: self.command_history_file.flush() self.restart() + + def _reload_saved_commands(self): + # First read all commands from the file to merge anything that may + # have come from a different instance of the mitmproxy or sister tools + if not os.path.exists(self.command_history_path): + return + + _history_lines = open(self.command_history_path, 'r').readlines() + self.saved_commands.clear() + for l in _history_lines: + l = l.strip() + if l in self.saved_commands: + self.saved_commands.remove(l) + self.saved_commands.append(l.strip()) diff --git a/test/mitmproxy/addons/test_command_history.py b/test/mitmproxy/addons/test_command_history.py index 405b8323..d22e1fb1 100644 --- a/test/mitmproxy/addons/test_command_history.py +++ b/test/mitmproxy/addons/test_command_history.py @@ -25,7 +25,7 @@ def tctx(): yield tctx # This runs after each test - ch.command_history_file.close() # Makes windows happy + ch.cleanup() shutil.rmtree(confdir) @@ -42,7 +42,7 @@ class TestCommandHistory: saved_commands = [cmd for cmd in history.saved_commands] assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] - history.command_history_file.close() + history.cleanup() def test_add_command(self, tctx): history = command_history.CommandHistory(3) @@ -77,7 +77,7 @@ class TestCommandHistory: saved_commands = [cmd for cmd in history.saved_commands] assert saved_commands == ['cmd4', 'cmd3', 'cmd2'] - history.command_history_file.close() + history.cleanup() def test_get_next_and_prev(self, tctx): history = command_history.CommandHistory(5) @@ -152,7 +152,7 @@ class TestCommandHistory: assert history.get_next() == '' assert history.get_next() == '' - history.command_history_file.close() + history.cleanup() def test_clear(self, tctx): history = command_history.CommandHistory(3) @@ -169,7 +169,7 @@ class TestCommandHistory: assert history.get_prev() == '' assert history.get_prev() == '' - history.command_history_file.close() + history.cleanup() def test_filter(self, tctx): history = command_history.CommandHistory(3) @@ -203,4 +203,74 @@ class TestCommandHistory: assert history.get_next() == '' assert history.get_next() == '' - history.command_history_file.close() + history.cleanup() + + def test_multiple_instances(self, tctx): + + instances = [ + command_history.CommandHistory(10), + command_history.CommandHistory(10), + command_history.CommandHistory(10) + ] + + for i in instances: + saved_commands = [cmd for cmd in i.saved_commands] + assert saved_commands == [] + + instances[0].add_command('cmd1') + saved_commands = [cmd for cmd in instances[0].saved_commands] + assert saved_commands == ['cmd1'] + + # These instances haven't yet added a new command, so they haven't + # yet reloaded their commands from the command file. + # This is expected, because if the user is filtering a command on + # another window, we don't want to interfere with that + saved_commands = [cmd for cmd in instances[1].saved_commands] + assert saved_commands == [] + saved_commands = [cmd for cmd in instances[2].saved_commands] + assert saved_commands == [] + + # Since the second instanced added a new command, its list of + # saved commands has been updated to have the commands from the + # first instance + its own commands + instances[1].add_command('cmd2') + saved_commands = [cmd for cmd in instances[1].saved_commands] + assert saved_commands == ['cmd1', 'cmd2'] + + saved_commands = [cmd for cmd in instances[0].saved_commands] + assert saved_commands == ['cmd1'] + + # Third instance is still empty as it has not yet ran any command + saved_commands = [cmd for cmd in instances[2].saved_commands] + assert saved_commands == [] + + instances[2].add_command('cmd3') + saved_commands = [cmd for cmd in instances[2].saved_commands] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] + + instances[0].add_command('cmd4') + saved_commands = [cmd for cmd in instances[0].saved_commands] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] + + instances.append(command_history.CommandHistory(10)) + saved_commands = [cmd for cmd in instances[3].saved_commands] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] + + instances[0].add_command('cmd_before_close') + instances.pop(0) + + saved_commands = [cmd for cmd in instances[0].saved_commands] + assert saved_commands == ['cmd1', 'cmd2'] + + instances[0].add_command('new_cmd') + saved_commands = [cmd for cmd in instances[0].saved_commands] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd'] + + instances.pop(0) + instances.pop(0) + instances.pop(0) + + _path = os.path.join(tctx.options.confdir, 'command_history') + lines = open(_path, 'r').readlines() + saved_commands = [cmd.strip() for cmd in lines] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd'] -- cgit v1.2.3 From 1948a270234395a437a2605020f8a9f60b04d617 Mon Sep 17 00:00:00 2001 From: Henrique Date: Mon, 25 Nov 2019 13:13:50 -0500 Subject: Fixing it back to what it was --- mitmproxy/tools/_main.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/mitmproxy/tools/_main.py b/mitmproxy/tools/_main.py index 77f8e675..b98bbe90 100644 --- a/mitmproxy/tools/_main.py +++ b/mitmproxy/tools/_main.py @@ -70,19 +70,20 @@ def run( opts = options.Options() master = master_cls(opts) + parser = make_parser(opts) + # To make migration from 2.x to 3.0 bearable. + if "-R" in sys.argv and sys.argv[sys.argv.index("-R") + 1].startswith("http"): + print("-R is used for specifying replacements.\n" + "To use mitmproxy in reverse mode please use --mode reverse:SPEC instead") + try: args = parser.parse_args(arguments) except SystemExit: arg_check.check() sys.exit(1) - # To make migration from 2.x to 3.0 bearable. - if "-R" in sys.argv and sys.argv[sys.argv.index("-R") + 1].startswith("http"): - print("-R is used for specifying replacements.\n" - "To use mitmproxy in reverse mode please use --mode reverse:SPEC instead") - try: opts.set(*args.setoptions, defer=True) optmanager.load_paths( -- cgit v1.2.3 From 0f0fc15c16ca4615d7182fd8d07f914428ea0838 Mon Sep 17 00:00:00 2001 From: Henrique Date: Mon, 25 Nov 2019 14:12:36 -0500 Subject: Removing the remotepdb stuff. --- mitmproxy/utils/debug.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/mitmproxy/utils/debug.py b/mitmproxy/utils/debug.py index 282db927..c9ffb614 100644 --- a/mitmproxy/utils/debug.py +++ b/mitmproxy/utils/debug.py @@ -11,11 +11,6 @@ from OpenSSL import SSL from mitmproxy import version -def remote_debug(host='localhost', port=4444): - import remote_pdb - remote_pdb.RemotePdb(host, port).set_trace() - - def dump_system_info(): mitmproxy_version = version.get_dev_version() -- cgit v1.2.3 From 68b016e180ec1475391f2e8389ae1c708692c499 Mon Sep 17 00:00:00 2001 From: Henrique Date: Mon, 25 Nov 2019 14:37:49 -0500 Subject: Addressing comments from review --- mitmproxy/addons/command_history.py | 69 ++++++++++++++++---------- test/mitmproxy/addons/test_command_history.py | 48 ++++++++++++++---- test/mitmproxy/tools/console/test_commander.py | 15 ++---- 3 files changed, 86 insertions(+), 46 deletions(-) diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index fb27e805..ad4c2346 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -10,31 +10,17 @@ from mitmproxy import ctx class CommandHistory: def __init__(self, size: int = 300) -> None: self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size) + self.is_configured = False self.filtered_commands: typing.Deque[str] = collections.deque() self.current_index: int = -1 self.filter_str: str = '' - - _command_history_dir = os.path.expanduser(ctx.options.confdir) - if not os.path.exists(_command_history_dir): - os.makedirs(_command_history_dir) - - self.command_history_path = os.path.join(_command_history_dir, 'command_history') - _history_lines: typing.List[str] = [] - if os.path.exists(self.command_history_path): - _history_lines = open(self.command_history_path, 'r').readlines() - - self.command_history_file = open(self.command_history_path, 'w') - - for l in _history_lines: - self.add_command(l.strip()) + self.command_history_path: str = '' atexit.register(self.cleanup) def cleanup(self): - self._reload_saved_commands() - if self.command_history_file and not self.command_history_file.closed: - self.command_history_file.close() + self._sync_saved_commands() @property def last_filtered_index(self): @@ -44,9 +30,13 @@ class CommandHistory: def clear_history(self): self.saved_commands.clear() self.filtered_commands.clear() - self.command_history_file.truncate(0) - self.command_history_file.seek(0) - self.command_history_file.flush() + + with open(self.command_history_path, 'w') as f: + f.truncate(0) + f.seek(0) + f.flush() + f.close() + self.restart() @command.command("command_history.cancel") @@ -100,30 +90,55 @@ class CommandHistory: if command.strip() == '': return - self._reload_saved_commands() + self._sync_saved_commands() if command in self.saved_commands: self.saved_commands.remove(command) self.saved_commands.append(command) _history_str = "\n".join(self.saved_commands) - self.command_history_file.truncate(0) - self.command_history_file.seek(0) - self.command_history_file.write(_history_str) - self.command_history_file.flush() + with open(self.command_history_path, 'w') as f: + f.truncate(0) + f.seek(0) + f.write(_history_str) + f.flush() + f.close() self.restart() - def _reload_saved_commands(self): + def _sync_saved_commands(self): # First read all commands from the file to merge anything that may # have come from a different instance of the mitmproxy or sister tools if not os.path.exists(self.command_history_path): return - _history_lines = open(self.command_history_path, 'r').readlines() + with open(self.command_history_path, 'r') as f: + _history_lines = f.readlines() + f.close() + self.saved_commands.clear() for l in _history_lines: l = l.strip() if l in self.saved_commands: self.saved_commands.remove(l) self.saved_commands.append(l.strip()) + + def configure(self, updated: typing.Set[str]): + if self.is_configured: + return + + _command_history_dir = os.path.expanduser(ctx.options.confdir) + if not os.path.exists(_command_history_dir): + os.makedirs(_command_history_dir) + + self.command_history_path = os.path.join(_command_history_dir, 'command_history') + _history_lines: typing.List[str] = [] + if os.path.exists(self.command_history_path): + with open(self.command_history_path, 'r') as f: + _history_lines = f.readlines() + f.close() + + for l in _history_lines: + self.add_command(l.strip()) + + self.is_configured = True diff --git a/test/mitmproxy/addons/test_command_history.py b/test/mitmproxy/addons/test_command_history.py index d22e1fb1..026ce53e 100644 --- a/test/mitmproxy/addons/test_command_history.py +++ b/test/mitmproxy/addons/test_command_history.py @@ -1,7 +1,5 @@ import os import pytest -import shutil -import uuid from mitmproxy import options from mitmproxy.addons import command_history @@ -9,24 +7,22 @@ from mitmproxy.test import taddons @pytest.fixture(autouse=True) -def tctx(): +def tctx(tmpdir): # This runs before each test - dir_id = str(uuid.uuid4()) - confdir = os.path.expanduser(f"~/.mitmproxy-test-suite-{dir_id}") - if not os.path.exists(confdir): - os.makedirs(confdir) + dir_name = tmpdir.mkdir('mitmproxy').dirname + confdir = dir_name opts = options.Options() opts.set(*[f"confdir={confdir}"]) tctx = taddons.context(options=opts) ch = command_history.CommandHistory() tctx.master.addons.add(ch) + ch.configure([]) yield tctx # This runs after each test ch.cleanup() - shutil.rmtree(confdir) class TestCommandHistory: @@ -38,6 +34,7 @@ class TestCommandHistory: f.close() history = command_history.CommandHistory() + history.configure([]) saved_commands = [cmd for cmd in history.saved_commands] assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] @@ -46,6 +43,7 @@ class TestCommandHistory: def test_add_command(self, tctx): history = command_history.CommandHistory(3) + history.configure([]) history.add_command('cmd1') history.add_command('cmd2') @@ -81,6 +79,7 @@ class TestCommandHistory: def test_get_next_and_prev(self, tctx): history = command_history.CommandHistory(5) + history.configure([]) history.add_command('cmd1') @@ -156,6 +155,7 @@ class TestCommandHistory: def test_clear(self, tctx): history = command_history.CommandHistory(3) + history.configure([]) history.add_command('cmd1') history.add_command('cmd2') @@ -173,6 +173,7 @@ class TestCommandHistory: def test_filter(self, tctx): history = command_history.CommandHistory(3) + history.configure([]) history.add_command('cmd1') history.add_command('cmd2') @@ -206,7 +207,6 @@ class TestCommandHistory: history.cleanup() def test_multiple_instances(self, tctx): - instances = [ command_history.CommandHistory(10), command_history.CommandHistory(10), @@ -214,6 +214,7 @@ class TestCommandHistory: ] for i in instances: + i.configure([]) saved_commands = [cmd for cmd in i.saved_commands] assert saved_commands == [] @@ -253,6 +254,7 @@ class TestCommandHistory: assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] instances.append(command_history.CommandHistory(10)) + instances[3].configure([]) saved_commands = [cmd for cmd in instances[3].saved_commands] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] @@ -274,3 +276,31 @@ class TestCommandHistory: lines = open(_path, 'r').readlines() saved_commands = [cmd.strip() for cmd in lines] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd'] + + instances = [ + command_history.CommandHistory(10), + command_history.CommandHistory(10) + ] + + for i in instances: + i.configure([]) + i.clear_history() + saved_commands = [cmd for cmd in i.saved_commands] + assert saved_commands == [] + + instances[0].add_command('cmd1') + instances[0].add_command('cmd2') + instances[1].add_command('cmd3') + instances[1].add_command('cmd4') + instances[1].add_command('cmd5') + + saved_commands = [cmd for cmd in instances[1].saved_commands] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd5'] + + instances.pop() + instances.pop() + + _path = os.path.join(tctx.options.confdir, 'command_history') + lines = open(_path, 'r').readlines() + saved_commands = [cmd.strip() for cmd in lines] + assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd5'] diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index 06e676c0..4fa10eb8 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -1,7 +1,4 @@ -import os import pytest -import shutil -import uuid from mitmproxy import options from mitmproxy.addons import command_history @@ -10,24 +7,22 @@ from mitmproxy.tools.console.commander import commander @pytest.fixture(autouse=True) -def tctx(): +def tctx(tmpdir): # This runs before each test - dir_id = str(uuid.uuid4()) - confdir = os.path.expanduser(f"~/.mitmproxy-test-suite-{dir_id}") - if not os.path.exists(confdir): - os.makedirs(confdir) + dir_name = tmpdir.mkdir('mitmproxy').dirname + confdir = dir_name opts = options.Options() opts.set(*[f"confdir={confdir}"]) tctx = taddons.context(options=opts) ch = command_history.CommandHistory() tctx.master.addons.add(ch) + ch.configure([]) yield tctx # This runs after each test - ch.command_history_file.close() - shutil.rmtree(confdir) + ch.cleanup() class TestListCompleter: -- cgit v1.2.3 From 06ef7350f52af34954ecde2431c34c31d177d0c7 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Tue, 26 Nov 2019 02:42:47 +0100 Subject: simplify command history addon --- mitmproxy/addons/command_history.py | 191 +++++++++---------------- mitmproxy/tools/console/commander/commander.py | 13 +- mitmproxy/tools/console/statusbar.py | 7 +- 3 files changed, 73 insertions(+), 138 deletions(-) diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index ad4c2346..f86a6196 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -1,6 +1,5 @@ -import atexit -import collections import os +import pathlib import typing from mitmproxy import command @@ -8,137 +7,77 @@ from mitmproxy import ctx class CommandHistory: - def __init__(self, size: int = 300) -> None: - self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size) - self.is_configured = False + VACUUM_SIZE = 1024 - self.filtered_commands: typing.Deque[str] = collections.deque() - self.current_index: int = -1 - self.filter_str: str = '' - self.command_history_path: str = '' + def __init__(self) -> None: + self.history: typing.List[str] = [] + self.filtered_history: typing.List[str] = [""] + self.current_index: int = 0 - atexit.register(self.cleanup) - - def cleanup(self): - self._sync_saved_commands() + def load(self, loader): + loader.add_option( + "command_history", bool, True, + """Persist command history between mitmproxy invocations.""" + ) @property - def last_filtered_index(self): - return len(self.filtered_commands) - 1 - - @command.command("command_history.clear") - def clear_history(self): - self.saved_commands.clear() - self.filtered_commands.clear() - - with open(self.command_history_path, 'w') as f: - f.truncate(0) - f.seek(0) - f.flush() - f.close() - - self.restart() - - @command.command("command_history.cancel") - def restart(self) -> None: - self.filtered_commands = self.saved_commands.copy() - self.current_index = -1 - - @command.command("command_history.next") - def get_next(self) -> str: - - if self.current_index == -1 or self.current_index == self.last_filtered_index: - self.current_index = -1 - return '' - elif self.current_index < self.last_filtered_index: - self.current_index += 1 - - ret = self.filtered_commands[self.current_index] - - return ret - - @command.command("command_history.prev") - def get_prev(self) -> str: - - if self.current_index == -1: - if self.last_filtered_index >= 0: - self.current_index = self.last_filtered_index - else: - return '' - - elif self.current_index > 0: - self.current_index -= 1 - - ret = self.filtered_commands[self.current_index] - - return ret - - @command.command("command_history.filter") - def set_filter(self, command: str) -> None: - self.filter_str = command - - _filtered_commands = [c for c in self.saved_commands if c.startswith(command)] - self.filtered_commands = collections.deque(_filtered_commands) - - if command and command not in self.filtered_commands: - self.filtered_commands.append(command) - - self.current_index = -1 - - @command.command("command_history.add") + def history_file(self) -> pathlib.Path: + return pathlib.Path(os.path.expanduser(ctx.options.confdir)) / "command_history" + + def running(self): + # FIXME: We have a weird bug where the contract for configure is not followed and it is never called with + # confdir or command_history as updated. + self.configure("command_history") + + def configure(self, updated): + if "command_history" in updated or "confdir" in updated: + if ctx.options.command_history and self.history_file.is_file(): + self.history = self.history_file.read_text().splitlines() + + def done(self): + if ctx.options.command_history and len(self.history) > self.VACUUM_SIZE: + # vacuum history so that it doesn't grow indefinitely. + history_str = "\n".join(self.history[-self.VACUUM_SIZE/2:]) + "\n" + self.history_file.write_text(history_str) + + @command.command("commands.history.add") def add_command(self, command: str) -> None: - if command.strip() == '': + if not command.strip(): return - self._sync_saved_commands() - - if command in self.saved_commands: - self.saved_commands.remove(command) - self.saved_commands.append(command) - - _history_str = "\n".join(self.saved_commands) - with open(self.command_history_path, 'w') as f: - f.truncate(0) - f.seek(0) - f.write(_history_str) - f.flush() - f.close() - - self.restart() - - def _sync_saved_commands(self): - # First read all commands from the file to merge anything that may - # have come from a different instance of the mitmproxy or sister tools - if not os.path.exists(self.command_history_path): - return + self.history.append(command) + if ctx.options.command_history: + with self.history_file.open("a") as f: + f.write(f"{command}\n") - with open(self.command_history_path, 'r') as f: - _history_lines = f.readlines() - f.close() + @command.command("commands.history.get") + def get_history(self) -> typing.Sequence[str]: + """Get the entire command history.""" + return self.history.copy() - self.saved_commands.clear() - for l in _history_lines: - l = l.strip() - if l in self.saved_commands: - self.saved_commands.remove(l) - self.saved_commands.append(l.strip()) - - def configure(self, updated: typing.Set[str]): - if self.is_configured: - return - - _command_history_dir = os.path.expanduser(ctx.options.confdir) - if not os.path.exists(_command_history_dir): - os.makedirs(_command_history_dir) - - self.command_history_path = os.path.join(_command_history_dir, 'command_history') - _history_lines: typing.List[str] = [] - if os.path.exists(self.command_history_path): - with open(self.command_history_path, 'r') as f: - _history_lines = f.readlines() - f.close() - - for l in _history_lines: - self.add_command(l.strip()) + @command.command("commands.history.clear") + def clear_history(self): + self.history_file.unlink() + self.history = [] + + # Functionality to provide a filtered list that can be iterated through. + + @command.command("commands.history.filter") + def set_filter(self, prefix: str) -> None: + self.filtered_history = [ + cmd + for cmd in self.history + if cmd.startswith(prefix) + ] + self.filtered_history.append(prefix) + self.current_index = len(self.filtered_history) - 1 + + @command.command("commands.history.next") + def get_next(self) -> str: + self.current_index = min(self.current_index + 1, len(self.filtered_history) - 1) + return self.filtered_history[self.current_index] - self.is_configured = True + @command.command("commands.history.prev") + def get_prev(self) -> str: + self.current_index = max(0, self.current_index - 1) + return self.filtered_history[self.current_index] diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index ac313290..a5bd65f9 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -9,8 +9,6 @@ import mitmproxy.flow import mitmproxy.master import mitmproxy.types -from mitmproxy import command_lexer - class Completer: @abc.abstractmethod @@ -163,7 +161,7 @@ class CommandEdit(urwid.WidgetWrap): self.cbuf.backspace() if self.cbuf.text == '': self.active_filter = False - self.master.commands.execute("command_history.filter ''") + self.master.commands.call("commands.history.filter", "") self.filter_str = '' elif key == "left": self.cbuf.left() @@ -173,21 +171,20 @@ class CommandEdit(urwid.WidgetWrap): if self.active_filter is False: self.active_filter = True self.filter_str = self.cbuf.text - _cmd = command_lexer.quote(self.cbuf.text) - self.master.commands.execute("command_history.filter %s" % _cmd) + self.master.commands.call("commands.history.filter", self.cbuf.text) - cmd = self.master.commands.execute("command_history.prev") + cmd = self.master.commands.execute("commands.history.prev") self.cbuf = CommandBuffer(self.master, cmd) elif key == "down": prev_cmd = self.cbuf.text - cmd = self.master.commands.execute("command_history.next") + cmd = self.master.commands.execute("commands.history.next") if cmd == '': if prev_cmd == self.filter_str: self.cbuf = CommandBuffer(self.master, prev_cmd) else: self.active_filter = False - self.master.commands.execute("command_history.filter ''") + self.master.commands.call("commands.history.filter", "") self.filter_str = '' self.cbuf = CommandBuffer(self.master, '') else: diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 39141b97..43b81d22 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -132,7 +132,6 @@ class ActionBar(urwid.WidgetWrap): def keypress(self, size, k): if self.prompting: if k == "esc": - self.master.commands.execute('command_history.cancel') self.prompt_done() elif self.onekey: if k == "enter": @@ -140,9 +139,9 @@ class ActionBar(urwid.WidgetWrap): elif k in self.onekey: self.prompt_execute(k) elif k == "enter": - cmd = command_lexer.quote(self._w.cbuf.text) - self.master.commands.execute(f"command_history.add {cmd}") - self.prompt_execute(self._w.get_edit_text()) + text = self._w.get_edit_text() + self.prompt_execute(text) + self.master.commands.call("commands.history.add", text) else: if common.is_keypress(k): self._w.keypress(size, k) -- cgit v1.2.3 From 819d5e631757f33a751eab8491b39eaacdb0c0c9 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Tue, 26 Nov 2019 02:43:09 +0100 Subject: command history: start adjusting tests --- test/mitmproxy/addons/test_command_history.py | 103 +++++++------------------ test/mitmproxy/tools/console/test_commander.py | 12 +-- 2 files changed, 32 insertions(+), 83 deletions(-) diff --git a/test/mitmproxy/addons/test_command_history.py b/test/mitmproxy/addons/test_command_history.py index 026ce53e..e38b4061 100644 --- a/test/mitmproxy/addons/test_command_history.py +++ b/test/mitmproxy/addons/test_command_history.py @@ -1,81 +1,30 @@ import os -import pytest -from mitmproxy import options from mitmproxy.addons import command_history from mitmproxy.test import taddons -@pytest.fixture(autouse=True) -def tctx(tmpdir): - # This runs before each test - dir_name = tmpdir.mkdir('mitmproxy').dirname - confdir = dir_name - - opts = options.Options() - opts.set(*[f"confdir={confdir}"]) - tctx = taddons.context(options=opts) - ch = command_history.CommandHistory() - tctx.master.addons.add(ch) - ch.configure([]) - - yield tctx - - # This runs after each test - ch.cleanup() - - class TestCommandHistory: - def test_existing_command_history(self, tctx): + def test_load_from_file(self, tmpdir): commands = ['cmd1', 'cmd2', 'cmd3'] - confdir = tctx.options.confdir - f = open(os.path.join(confdir, 'command_history'), 'w') - f.write("\n".join(commands)) - f.close() - - history = command_history.CommandHistory() - history.configure([]) - - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] + with open(tmpdir.join('command_history'), 'w') as f: + f.write("\n".join(commands)) - history.cleanup() + ch = command_history.CommandHistory() + with taddons.context(ch) as tctx: + tctx.options.confdir = str(tmpdir) + assert ch.history == commands - def test_add_command(self, tctx): - history = command_history.CommandHistory(3) - history.configure([]) + def test_add_command(self): + history = command_history.CommandHistory() history.add_command('cmd1') history.add_command('cmd2') - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd1', 'cmd2'] - - history.add_command('') - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd1', 'cmd2'] - - # The history size is only 3. So, we forget the first - # one command, when adding fourth command - history.add_command('cmd3') - history.add_command('cmd4') - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd2', 'cmd3', 'cmd4'] + assert history.history == ['cmd1', 'cmd2'] history.add_command('') - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd2', 'cmd3', 'cmd4'] - - # Commands with the same text are not repeated in the history one by one - history.add_command('cmd3') - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd2', 'cmd4', 'cmd3'] - - history.add_command('cmd2') - saved_commands = [cmd for cmd in history.saved_commands] - assert saved_commands == ['cmd4', 'cmd3', 'cmd2'] - - history.cleanup() + assert history.history == ['cmd1', 'cmd2'] def test_get_next_and_prev(self, tctx): history = command_history.CommandHistory(5) @@ -161,7 +110,7 @@ class TestCommandHistory: history.add_command('cmd2') history.clear_history() - saved_commands = [cmd for cmd in history.saved_commands] + saved_commands = [cmd for cmd in history.history] assert saved_commands == [] assert history.get_next() == '' @@ -215,57 +164,57 @@ class TestCommandHistory: for i in instances: i.configure([]) - saved_commands = [cmd for cmd in i.saved_commands] + saved_commands = [cmd for cmd in i.history] assert saved_commands == [] instances[0].add_command('cmd1') - saved_commands = [cmd for cmd in instances[0].saved_commands] + saved_commands = [cmd for cmd in instances[0].history] assert saved_commands == ['cmd1'] # These instances haven't yet added a new command, so they haven't # yet reloaded their commands from the command file. # This is expected, because if the user is filtering a command on # another window, we don't want to interfere with that - saved_commands = [cmd for cmd in instances[1].saved_commands] + saved_commands = [cmd for cmd in instances[1].history] assert saved_commands == [] - saved_commands = [cmd for cmd in instances[2].saved_commands] + saved_commands = [cmd for cmd in instances[2].history] assert saved_commands == [] # Since the second instanced added a new command, its list of # saved commands has been updated to have the commands from the # first instance + its own commands instances[1].add_command('cmd2') - saved_commands = [cmd for cmd in instances[1].saved_commands] + saved_commands = [cmd for cmd in instances[1].history] assert saved_commands == ['cmd1', 'cmd2'] - saved_commands = [cmd for cmd in instances[0].saved_commands] + saved_commands = [cmd for cmd in instances[0].history] assert saved_commands == ['cmd1'] # Third instance is still empty as it has not yet ran any command - saved_commands = [cmd for cmd in instances[2].saved_commands] + saved_commands = [cmd for cmd in instances[2].history] assert saved_commands == [] instances[2].add_command('cmd3') - saved_commands = [cmd for cmd in instances[2].saved_commands] + saved_commands = [cmd for cmd in instances[2].history] assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] instances[0].add_command('cmd4') - saved_commands = [cmd for cmd in instances[0].saved_commands] + saved_commands = [cmd for cmd in instances[0].history] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] instances.append(command_history.CommandHistory(10)) instances[3].configure([]) - saved_commands = [cmd for cmd in instances[3].saved_commands] + saved_commands = [cmd for cmd in instances[3].history] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] instances[0].add_command('cmd_before_close') instances.pop(0) - saved_commands = [cmd for cmd in instances[0].saved_commands] + saved_commands = [cmd for cmd in instances[0].history] assert saved_commands == ['cmd1', 'cmd2'] instances[0].add_command('new_cmd') - saved_commands = [cmd for cmd in instances[0].saved_commands] + saved_commands = [cmd for cmd in instances[0].history] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd'] instances.pop(0) @@ -285,7 +234,7 @@ class TestCommandHistory: for i in instances: i.configure([]) i.clear_history() - saved_commands = [cmd for cmd in i.saved_commands] + saved_commands = [cmd for cmd in i.history] assert saved_commands == [] instances[0].add_command('cmd1') @@ -294,7 +243,7 @@ class TestCommandHistory: instances[1].add_command('cmd4') instances[1].add_command('cmd5') - saved_commands = [cmd for cmd in instances[1].saved_commands] + saved_commands = [cmd for cmd in instances[1].history] assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd5'] instances.pop() diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index 4fa10eb8..aa859092 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -114,8 +114,8 @@ class TestCommandEdit: def test_up_and_down(self, tctx): edit = commander.CommandEdit(tctx.master, '') - tctx.master.commands.execute('command_history.clear') - tctx.master.commands.execute('command_history.add "cmd1"') + tctx.master.commands.execute('commands.history.clear') + tctx.master.commands.execute('commands.history.add "cmd1"') edit.keypress(1, 'up') assert edit.get_edit_text() == 'cmd1' @@ -131,9 +131,9 @@ class TestCommandEdit: edit = commander.CommandEdit(tctx.master, '') - tctx.master.commands.execute('command_history.clear') - tctx.master.commands.execute('command_history.add "cmd1"') - tctx.master.commands.execute('command_history.add "cmd2"') + tctx.master.commands.execute('commands.history.clear') + tctx.master.commands.execute('commands.history.add "cmd1"') + tctx.master.commands.execute('commands.history.add "cmd2"') edit.keypress(1, 'up') assert edit.get_edit_text() == 'cmd2' @@ -168,7 +168,7 @@ class TestCommandEdit: assert edit.get_edit_text() == 'abc' edit = commander.CommandEdit(tctx.master, '') - tctx.master.commands.execute('command_history.add "cmd3"') + tctx.master.commands.execute('commands.history.add "cmd3"') edit.keypress(1, 'z') edit.keypress(1, 'up') -- cgit v1.2.3 From 8eb173b44e6c9fe093e58428dc6dd8d812b80318 Mon Sep 17 00:00:00 2001 From: Henrique Date: Wed, 27 Nov 2019 09:21:30 -0500 Subject: Fixed small bugs on command_history and tests --- mitmproxy/addons/command_history.py | 10 +- test/mitmproxy/addons/test_command_history.py | 320 +++++++++++++------------- 2 files changed, 173 insertions(+), 157 deletions(-) diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py index f86a6196..2c1bf887 100644 --- a/mitmproxy/addons/command_history.py +++ b/mitmproxy/addons/command_history.py @@ -33,11 +33,12 @@ class CommandHistory: if "command_history" in updated or "confdir" in updated: if ctx.options.command_history and self.history_file.is_file(): self.history = self.history_file.read_text().splitlines() + self.set_filter('') def done(self): if ctx.options.command_history and len(self.history) > self.VACUUM_SIZE: # vacuum history so that it doesn't grow indefinitely. - history_str = "\n".join(self.history[-self.VACUUM_SIZE/2:]) + "\n" + history_str = "\n".join(self.history[-self.VACUUM_SIZE / 2:]) + "\n" self.history_file.write_text(history_str) @command.command("commands.history.add") @@ -49,6 +50,9 @@ class CommandHistory: if ctx.options.command_history: with self.history_file.open("a") as f: f.write(f"{command}\n") + f.close() + + self.set_filter('') @command.command("commands.history.get") def get_history(self) -> typing.Sequence[str]: @@ -57,8 +61,10 @@ class CommandHistory: @command.command("commands.history.clear") def clear_history(self): - self.history_file.unlink() + if self.history_file.exists(): + self.history_file.unlink() self.history = [] + self.set_filter('') # Functionality to provide a filtered list that can be iterated through. diff --git a/test/mitmproxy/addons/test_command_history.py b/test/mitmproxy/addons/test_command_history.py index e38b4061..df20fba7 100644 --- a/test/mitmproxy/addons/test_command_history.py +++ b/test/mitmproxy/addons/test_command_history.py @@ -26,200 +26,210 @@ class TestCommandHistory: history.add_command('') assert history.history == ['cmd1', 'cmd2'] - def test_get_next_and_prev(self, tctx): - history = command_history.CommandHistory(5) - history.configure([]) + def test_get_next_and_prev(self, tmpdir): + ch = command_history.CommandHistory() - history.add_command('cmd1') + with taddons.context(ch) as tctx: + tctx.options.confdir = str(tmpdir) - assert history.get_next() == '' - assert history.get_next() == '' - assert history.get_prev() == 'cmd1' - assert history.get_prev() == 'cmd1' - assert history.get_prev() == 'cmd1' - assert history.get_next() == '' - assert history.get_next() == '' + ch.add_command('cmd1') + + assert ch.get_next() == '' + assert ch.get_next() == '' + assert ch.get_prev() == 'cmd1' + assert ch.get_prev() == 'cmd1' + assert ch.get_prev() == 'cmd1' + assert ch.get_next() == '' + assert ch.get_next() == '' + + ch.add_command('cmd2') + + assert ch.get_next() == '' + assert ch.get_next() == '' + assert ch.get_prev() == 'cmd2' + assert ch.get_prev() == 'cmd1' + assert ch.get_prev() == 'cmd1' + assert ch.get_next() == 'cmd2' + assert ch.get_next() == '' + assert ch.get_next() == '' + + ch.add_command('cmd3') + + assert ch.get_next() == '' + assert ch.get_next() == '' + assert ch.get_prev() == 'cmd3' + assert ch.get_prev() == 'cmd2' + assert ch.get_prev() == 'cmd1' + assert ch.get_prev() == 'cmd1' + assert ch.get_next() == 'cmd2' + assert ch.get_next() == 'cmd3' + assert ch.get_next() == '' + assert ch.get_next() == '' + assert ch.get_prev() == 'cmd3' + assert ch.get_prev() == 'cmd2' + + ch.add_command('cmd4') + + assert ch.get_prev() == 'cmd4' + assert ch.get_prev() == 'cmd3' + assert ch.get_prev() == 'cmd2' + assert ch.get_prev() == 'cmd1' + assert ch.get_prev() == 'cmd1' + assert ch.get_next() == 'cmd2' + assert ch.get_next() == 'cmd3' + assert ch.get_next() == 'cmd4' + assert ch.get_next() == '' + assert ch.get_next() == '' + + ch.add_command('cmd5') + ch.add_command('cmd6') + + assert ch.get_next() == '' + assert ch.get_prev() == 'cmd6' + assert ch.get_prev() == 'cmd5' + assert ch.get_prev() == 'cmd4' + assert ch.get_next() == 'cmd5' + assert ch.get_prev() == 'cmd4' + assert ch.get_prev() == 'cmd3' + assert ch.get_prev() == 'cmd2' + assert ch.get_next() == 'cmd3' + assert ch.get_prev() == 'cmd2' + assert ch.get_prev() == 'cmd1' + assert ch.get_prev() == 'cmd1' + assert ch.get_prev() == 'cmd1' + assert ch.get_next() == 'cmd2' + assert ch.get_next() == 'cmd3' + assert ch.get_next() == 'cmd4' + assert ch.get_next() == 'cmd5' + assert ch.get_next() == 'cmd6' + assert ch.get_next() == '' + assert ch.get_next() == '' + + ch.clear_history() + + def test_clear(self, tmpdir): + ch = command_history.CommandHistory() - history.add_command('cmd2') + with taddons.context(ch) as tctx: + tctx.options.confdir = str(tmpdir) + ch.add_command('cmd1') + ch.add_command('cmd2') + ch.clear_history() - assert history.get_next() == '' - assert history.get_next() == '' - assert history.get_prev() == 'cmd2' - assert history.get_prev() == 'cmd1' - assert history.get_prev() == 'cmd1' - assert history.get_next() == 'cmd2' - assert history.get_next() == '' - assert history.get_next() == '' - - history.add_command('cmd3') - - assert history.get_next() == '' - assert history.get_next() == '' - assert history.get_prev() == 'cmd3' - assert history.get_prev() == 'cmd2' - assert history.get_prev() == 'cmd1' - assert history.get_prev() == 'cmd1' - assert history.get_next() == 'cmd2' - assert history.get_next() == 'cmd3' - assert history.get_next() == '' - assert history.get_next() == '' - assert history.get_prev() == 'cmd3' - assert history.get_prev() == 'cmd2' - - history.add_command('cmd4') - - assert history.get_prev() == 'cmd4' - assert history.get_prev() == 'cmd3' - assert history.get_prev() == 'cmd2' - assert history.get_prev() == 'cmd1' - assert history.get_prev() == 'cmd1' - assert history.get_next() == 'cmd2' - assert history.get_next() == 'cmd3' - assert history.get_next() == 'cmd4' - assert history.get_next() == '' - assert history.get_next() == '' - - history.add_command('cmd5') - history.add_command('cmd6') - - assert history.get_next() == '' - assert history.get_prev() == 'cmd6' - assert history.get_prev() == 'cmd5' - assert history.get_prev() == 'cmd4' - assert history.get_next() == 'cmd5' - assert history.get_prev() == 'cmd4' - assert history.get_prev() == 'cmd3' - assert history.get_prev() == 'cmd2' - assert history.get_next() == 'cmd3' - assert history.get_prev() == 'cmd2' - assert history.get_prev() == 'cmd2' - assert history.get_next() == 'cmd3' - assert history.get_next() == 'cmd4' - assert history.get_next() == 'cmd5' - assert history.get_next() == 'cmd6' - assert history.get_next() == '' - assert history.get_next() == '' - - history.cleanup() - - def test_clear(self, tctx): - history = command_history.CommandHistory(3) - history.configure([]) + saved_commands = ch.get_history() + assert saved_commands == [] - history.add_command('cmd1') - history.add_command('cmd2') - history.clear_history() + assert ch.get_next() == '' + assert ch.get_next() == '' + assert ch.get_prev() == '' + assert ch.get_prev() == '' - saved_commands = [cmd for cmd in history.history] - assert saved_commands == [] + ch.clear_history() - assert history.get_next() == '' - assert history.get_next() == '' - assert history.get_prev() == '' - assert history.get_prev() == '' + def test_filter(self, tmpdir): + ch = command_history.CommandHistory() - history.cleanup() + with taddons.context(ch) as tctx: + tctx.options.confdir = str(tmpdir) - def test_filter(self, tctx): - history = command_history.CommandHistory(3) - history.configure([]) + ch.add_command('cmd1') + ch.add_command('cmd2') + ch.add_command('abc') + ch.set_filter('c') + + assert ch.get_next() == 'c' + assert ch.get_next() == 'c' + assert ch.get_prev() == 'cmd2' + assert ch.get_prev() == 'cmd1' + assert ch.get_prev() == 'cmd1' + assert ch.get_next() == 'cmd2' + assert ch.get_next() == 'c' + assert ch.get_next() == 'c' + + ch.set_filter('') + + assert ch.get_next() == '' + assert ch.get_next() == '' + assert ch.get_prev() == 'abc' + assert ch.get_prev() == 'cmd2' + assert ch.get_prev() == 'cmd1' + assert ch.get_prev() == 'cmd1' + assert ch.get_next() == 'cmd2' + assert ch.get_next() == 'abc' + assert ch.get_next() == '' + assert ch.get_next() == '' + + ch.clear_history() + + def test_multiple_instances(self, tmpdir): + ch = command_history.CommandHistory() + with taddons.context(ch) as tctx: + tctx.options.confdir = str(tmpdir) - history.add_command('cmd1') - history.add_command('cmd2') - history.add_command('abc') - history.set_filter('c') - - assert history.get_next() == '' - assert history.get_next() == '' - assert history.get_prev() == 'c' - assert history.get_prev() == 'cmd2' - assert history.get_prev() == 'cmd1' - assert history.get_prev() == 'cmd1' - assert history.get_next() == 'cmd2' - assert history.get_next() == 'c' - assert history.get_next() == '' - assert history.get_next() == '' - - history.set_filter('') - - assert history.get_next() == '' - assert history.get_next() == '' - assert history.get_prev() == 'abc' - assert history.get_prev() == 'cmd2' - assert history.get_prev() == 'cmd1' - assert history.get_prev() == 'cmd1' - assert history.get_next() == 'cmd2' - assert history.get_next() == 'abc' - assert history.get_next() == '' - assert history.get_next() == '' - - history.cleanup() - - def test_multiple_instances(self, tctx): instances = [ - command_history.CommandHistory(10), - command_history.CommandHistory(10), - command_history.CommandHistory(10) + command_history.CommandHistory(), + command_history.CommandHistory(), + command_history.CommandHistory() ] for i in instances: - i.configure([]) - saved_commands = [cmd for cmd in i.history] + i.configure('command_history') + saved_commands = i.get_history() assert saved_commands == [] instances[0].add_command('cmd1') - saved_commands = [cmd for cmd in instances[0].history] + saved_commands = instances[0].get_history() assert saved_commands == ['cmd1'] # These instances haven't yet added a new command, so they haven't # yet reloaded their commands from the command file. # This is expected, because if the user is filtering a command on # another window, we don't want to interfere with that - saved_commands = [cmd for cmd in instances[1].history] + saved_commands = instances[1].get_history() assert saved_commands == [] - saved_commands = [cmd for cmd in instances[2].history] + saved_commands = instances[2].get_history() assert saved_commands == [] # Since the second instanced added a new command, its list of # saved commands has been updated to have the commands from the # first instance + its own commands instances[1].add_command('cmd2') - saved_commands = [cmd for cmd in instances[1].history] - assert saved_commands == ['cmd1', 'cmd2'] + saved_commands = instances[1].get_history() + assert saved_commands == ['cmd2'] - saved_commands = [cmd for cmd in instances[0].history] + saved_commands = instances[0].get_history() assert saved_commands == ['cmd1'] # Third instance is still empty as it has not yet ran any command - saved_commands = [cmd for cmd in instances[2].history] + saved_commands = instances[2].get_history() assert saved_commands == [] instances[2].add_command('cmd3') - saved_commands = [cmd for cmd in instances[2].history] - assert saved_commands == ['cmd1', 'cmd2', 'cmd3'] + saved_commands = instances[2].get_history() + assert saved_commands == ['cmd3'] instances[0].add_command('cmd4') - saved_commands = [cmd for cmd in instances[0].history] - assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] + saved_commands = instances[0].get_history() + assert saved_commands == ['cmd1', 'cmd4'] - instances.append(command_history.CommandHistory(10)) - instances[3].configure([]) - saved_commands = [cmd for cmd in instances[3].history] + instances.append(command_history.CommandHistory()) + instances[3].configure('command_history') + saved_commands = instances[3].get_history() assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4'] instances[0].add_command('cmd_before_close') - instances.pop(0) + instances.pop(0).done() - saved_commands = [cmd for cmd in instances[0].history] - assert saved_commands == ['cmd1', 'cmd2'] + saved_commands = instances[0].get_history() + assert saved_commands == ['cmd2'] instances[0].add_command('new_cmd') - saved_commands = [cmd for cmd in instances[0].history] - assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd'] + saved_commands = instances[0].get_history() + assert saved_commands == ['cmd2', 'new_cmd'] - instances.pop(0) - instances.pop(0) - instances.pop(0) + instances.pop(0).done() + instances.pop(0).done() + instances.pop(0).done() _path = os.path.join(tctx.options.confdir, 'command_history') lines = open(_path, 'r').readlines() @@ -227,14 +237,14 @@ class TestCommandHistory: assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd'] instances = [ - command_history.CommandHistory(10), - command_history.CommandHistory(10) + command_history.CommandHistory(), + command_history.CommandHistory() ] for i in instances: - i.configure([]) + i.configure('command_history') i.clear_history() - saved_commands = [cmd for cmd in i.history] + saved_commands = i.get_history() assert saved_commands == [] instances[0].add_command('cmd1') @@ -243,11 +253,11 @@ class TestCommandHistory: instances[1].add_command('cmd4') instances[1].add_command('cmd5') - saved_commands = [cmd for cmd in instances[1].history] - assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd5'] + saved_commands = instances[1].get_history() + assert saved_commands == ['cmd3', 'cmd4', 'cmd5'] - instances.pop() - instances.pop() + instances.pop().done() + instances.pop().done() _path = os.path.join(tctx.options.confdir, 'command_history') lines = open(_path, 'r').readlines() -- cgit v1.2.3 From 2177eb9e35a252f20ea5bc2d5a9abee29b7601ff Mon Sep 17 00:00:00 2001 From: Henrique Date: Wed, 27 Nov 2019 09:27:38 -0500 Subject: Fixed small issue --- test/mitmproxy/tools/console/test_commander.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py index aa859092..839f81e5 100644 --- a/test/mitmproxy/tools/console/test_commander.py +++ b/test/mitmproxy/tools/console/test_commander.py @@ -17,12 +17,12 @@ def tctx(tmpdir): tctx = taddons.context(options=opts) ch = command_history.CommandHistory() tctx.master.addons.add(ch) - ch.configure([]) + ch.configure('command_history') yield tctx # This runs after each test - ch.cleanup() + ch.clear_history() class TestListCompleter: -- cgit v1.2.3 From d42e8b2fe6cad23d3aca38823439a39d33adc8cb Mon Sep 17 00:00:00 2001 From: Henrique Date: Wed, 27 Nov 2019 09:44:47 -0500 Subject: Making linter happy --- mitmproxy/tools/console/statusbar.py | 1 - 1 file changed, 1 deletion(-) diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 43b81d22..7bd92d01 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -4,7 +4,6 @@ from typing import Optional import urwid import mitmproxy.tools.console.master # noqa -from mitmproxy import command_lexer from mitmproxy.tools.console import common from mitmproxy.tools.console import signals from mitmproxy.tools.console import commandexecutor -- cgit v1.2.3