diff options
author | Henrique <typoon@gmail.com> | 2019-11-23 15:31:00 -0500 |
---|---|---|
committer | Henrique <typoon@gmail.com> | 2019-11-23 15:31:00 -0500 |
commit | a866b424fe60928fb5336f1fa146326424763ca5 (patch) | |
tree | c2cdbebf181c3f15601daf472929210a59449f52 /mitmproxy | |
parent | 16b55f9476373347a3c2553e070497b383288360 (diff) | |
download | mitmproxy-a866b424fe60928fb5336f1fa146326424763ca5.tar.gz mitmproxy-a866b424fe60928fb5336f1fa146326424763ca5.tar.bz2 mitmproxy-a866b424fe60928fb5336f1fa146326424763ca5.zip |
Moved command history to an addon and added a new feature:
* If you start typing a command and press "up" only commands starting
with that string will be returned
Diffstat (limited to 'mitmproxy')
-rw-r--r-- | mitmproxy/addons/__init__.py | 2 | ||||
-rw-r--r-- | mitmproxy/addons/command_history.py | 152 | ||||
-rw-r--r-- | mitmproxy/command.py | 4 | ||||
-rw-r--r-- | mitmproxy/tools/console/commander/commander.py | 89 | ||||
-rw-r--r-- | mitmproxy/tools/console/statusbar.py | 11 | ||||
-rw-r--r-- | mitmproxy/utils/debug.py | 4 |
6 files changed, 176 insertions, 86 deletions
diff --git a/mitmproxy/addons/__init__.py b/mitmproxy/addons/__init__.py index 838fba9b..ee238938 100644 --- a/mitmproxy/addons/__init__.py +++ b/mitmproxy/addons/__init__.py @@ -4,6 +4,7 @@ from mitmproxy.addons import block from mitmproxy.addons import browser from mitmproxy.addons import check_ca from mitmproxy.addons import clientplayback +from mitmproxy.addons import command_history from mitmproxy.addons import core from mitmproxy.addons import cut from mitmproxy.addons import disable_h2c @@ -30,6 +31,7 @@ def default_addons(): anticomp.AntiComp(), check_ca.CheckCA(), clientplayback.ClientPlayback(), + command_history.CommandHistory(), cut.Cut(), disable_h2c.DisableH2C(), export.Export(), diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py new file mode 100644 index 00000000..2c348d1e --- /dev/null +++ b/mitmproxy/addons/command_history.py @@ -0,0 +1,152 @@ +import collections +import copy +import os +import typing + +import mitmproxy.options +import mitmproxy.types + +from mitmproxy import command +from mitmproxy.tools.console.commander.commander import CommandBuffer + + +class CommandHistory: + def __init__(self, size: int = 300) -> None: + self.saved_commands: typing.Deque[str] = collections.deque( + maxlen=size + ) + self.index: int = 0 + + self.filter: str = '' + self.filtered_index: int = 0 + self.filtered_commands: typing.Deque[str] = collections.deque() + self.filter_active: bool = True + + _command_history_path = os.path.join(os.path.expanduser(mitmproxy.options.CONF_DIR), 'command_history') + _history_lines = open(_command_history_path, 'r').readlines() + + self.command_history_file = open(_command_history_path, 'w') + + for l in _history_lines: + self.add_command(l.strip(), True) + + @property + def last_index(self): + return len(self.saved_commands) - 1 + + @property + def last_filtered_index(self): + return len(self.filtered_commands) - 1 + + @command.command("command_history.clear") + def clear_history(self): + self.saved_commands.clear() + self.index = 0 + self.command_history_file.truncate(0) + self.command_history_file.seek(0) + self.command_history_file.flush() + self.filter = '' + self.filtered_index = 0 + self.filtered_commands.clear() + self.filter_active = True + + @command.command("command_history.next") + def get_next(self) -> str: + if self.last_index == -1: + return '' + + if self.filter != '': + if self.filtered_index < self.last_filtered_index: + self.filtered_index = self.filtered_index + 1 + ret = self.filtered_commands[self.filtered_index] + else: + if self.index == -1: + ret = '' + elif self.index < self.last_index: + self.index = self.index + 1 + ret = self.saved_commands[self.index] + else: + self.index = -1 + ret = '' + + + return ret + + @command.command("command_history.prev") + def get_prev(self) -> str: + if self.last_index == -1: + return '' + + if self.filter != '': + if self.filtered_index > 0: + self.filtered_index = self.filtered_index - 1 + ret = self.filtered_commands[self.filtered_index] + else: + if self.index == -1: + self.index = self.last_index + elif self.index > 0: + self.index = self.index - 1 + + ret = self.saved_commands[self.index] + + return ret + + @command.command("command_history.filter") + def set_filter(self, command: str) -> None: + """ + This is used when the user starts typing part of a command + and then press the "up" arrow. This way, the results returned are + only for the command that the user started typing + """ + if command.strip() == '': + return + + if self.filter != '': + last_filtered_command = self.filtered_commands[-1] + if command == last_filtered_command: + self.filter = '' + self.filtered_commands = [] + self.filtered_index = 0 + else: + self.filter = command + _filtered_commands = [c for c in self.saved_commands if c.startswith(command)] + self.filtered_commands = collections.deque(_filtered_commands) + + if command not in self.filtered_commands: + self.filtered_commands.append(command) + + self.filtered_index = self.last_filtered_index + + # No commands found, so act like no filter was added + if len(self.filtered_commands) == 1: + self.add_command(command) + self.filter = '' + + @command.command("command_history.cancel") + def restart(self) -> None: + self.index = -1 + self.filter = '' + self.filtered_commands = [] + self.filtered_index = 0 + + @command.command("command_history.add") + def add_command(self, command: str, execution: bool = False) -> None: + if command.strip() == '': + return + + if execution: + if command in self.saved_commands: + self.saved_commands.remove(command) + + self.saved_commands.append(command) + + _history_str = "\n".join(self.saved_commands) + self.command_history_file.truncate(0) + self.command_history_file.seek(0) + self.command_history_file.write(_history_str) + self.command_history_file.flush() + + self.restart() + else: + if command not in self.saved_commands: + self.saved_commands.append(command) diff --git a/mitmproxy/command.py b/mitmproxy/command.py index 6977ff91..23c39594 100644 --- a/mitmproxy/command.py +++ b/mitmproxy/command.py @@ -101,7 +101,9 @@ class Command: try: bound_arguments = self.signature.bind(*args) except TypeError as v: - raise exceptions.CommandError(f"Command argument mismatch: {v.args[0]}") + expected = f'Expected: {str(self.signature.parameters)}' + received = f'Received: {str(args)}' + raise exceptions.CommandError(f"Command argument mismatch: \n\t{expected}\n\t{received}") for name, value in bound_arguments.arguments.items(): convert_to = self.signature.parameters[name].annotation diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py index d661d530..c59c7f83 100644 --- a/mitmproxy/tools/console/commander/commander.py +++ b/mitmproxy/tools/console/commander/commander.py @@ -12,6 +12,7 @@ import mitmproxy.flow import mitmproxy.master import mitmproxy.types +from mitmproxy import command_lexer class Completer: @abc.abstractmethod @@ -148,88 +149,13 @@ class CommandBuffer: self.completion = None -# TODO: This class should be a Singleton -class CommandHistory: - def __init__(self, master: mitmproxy.master.Master, size: int = 300) -> None: - self.saved_commands: collections.deque = collections.deque( - [CommandBuffer(master, "")], - maxlen=size - ) - self.index: int = 0 - self.size: int = size - self.master: mitmproxy.master.Master = master - - _command_history_path = os.path.join(os.path.expanduser(mitmproxy.options.CONF_DIR), 'command_history') - if os.path.exists(_command_history_path): - with open(_command_history_path, 'r') as f: - for l in f.readlines(): - cbuf = CommandBuffer(master, l.strip()) - self.add_command(cbuf) - f.close() - - self.command_history_file = open(_command_history_path, 'w') - - @property - def last_index(self): - return len(self.saved_commands) - 1 - - def clear_history(self): - """ - Needed for test suite. - TODO: Maybe create a command to clear the history? - """ - self.saved_commands: collections.deque = collections.deque( - [CommandBuffer(self.master, "")], - maxlen=self.size - ) - - self.index = 0 - self.command_history_file.truncate(0) - self.command_history_file.seek(0) - self.command_history_file.flush() - - def get_next(self) -> typing.Optional[CommandBuffer]: - if self.index < self.last_index: - self.index = self.index + 1 - return self.saved_commands[self.index] - - def get_prev(self) -> typing.Optional[CommandBuffer]: - if self.index > 0: - self.index = self.index - 1 - return self.saved_commands[self.index] - - def add_command(self, command: CommandBuffer, execution: bool = False) -> None: - if self.index == self.last_index or execution: - last_item = self.saved_commands[-1] - last_item_empty = not last_item.text - if last_item.text == command.text or (last_item_empty and execution): - self.saved_commands[-1] = copy.copy(command) - else: - self.saved_commands.append(command) - if not execution and self.index < self.last_index: - self.index += 1 - if execution: - self.index = self.last_index - - # This prevents the constructor from trying to overwrite the file - # that it is currently reading - if hasattr(self, 'command_history_file'): - _history_str = "\n".join([c.text for c in self.saved_commands]) - self.command_history_file.truncate(0) - self.command_history_file.seek(0) - self.command_history_file.write(_history_str) - self.command_history_file.flush() - - class CommandEdit(urwid.WidgetWrap): leader = ": " - def __init__(self, master: mitmproxy.master.Master, - text: str, history: CommandHistory) -> None: + def __init__(self, master: mitmproxy.master.Master, text: str) -> None: super().__init__(urwid.Text(self.leader)) self.master = master self.cbuf = CommandBuffer(master, text) - self.history = history self.update() def keypress(self, size, key) -> None: @@ -240,10 +166,15 @@ class CommandEdit(urwid.WidgetWrap): elif key == "right": self.cbuf.right() elif key == "up": - self.history.add_command(self.cbuf) - self.cbuf = self.history.get_prev() or self.cbuf + _cmd = command_lexer.quote(self.cbuf.text) + self.master.commands.execute("command_history.filter %s" % _cmd) + cmd = self.master.commands.execute("command_history.prev") + self.cbuf = CommandBuffer(self.master, cmd) elif key == "down": - self.cbuf = self.history.get_next() or self.cbuf + _cmd = command_lexer.quote(self.cbuf.text) + self.master.commands.execute("command_history.filter %s" % _cmd) + cmd = self.master.commands.execute("command_history.next") + self.cbuf = CommandBuffer(self.master, cmd) elif key == "shift tab": self.cbuf.cycle_completion(False) elif key == "tab": diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py index 43f5170d..6d040d92 100644 --- a/mitmproxy/tools/console/statusbar.py +++ b/mitmproxy/tools/console/statusbar.py @@ -3,10 +3,11 @@ from typing import Optional import urwid +import mitmproxy.tools.console.master # noqa +from mitmproxy import command_lexer from mitmproxy.tools.console import common from mitmproxy.tools.console import signals from mitmproxy.tools.console import commandexecutor -import mitmproxy.tools.console.master # noqa from mitmproxy.tools.console.commander import commander @@ -43,8 +44,6 @@ class ActionBar(urwid.WidgetWrap): signals.status_prompt_onekey.connect(self.sig_prompt_onekey) signals.status_prompt_command.connect(self.sig_prompt_command) - self.command_history = commander.CommandHistory(master) - self.prompting = None self.onekey = False @@ -104,7 +103,6 @@ class ActionBar(urwid.WidgetWrap): self._w = commander.CommandEdit( self.master, partial, - self.command_history, ) if cursor is not None: self._w.cbuf.cursor = cursor @@ -134,7 +132,7 @@ class ActionBar(urwid.WidgetWrap): def keypress(self, size, k): if self.prompting: if k == "esc": - self.command_history.index = self.command_history.last_index + self.master.commands.execute('command_history.cancel') self.prompt_done() elif self.onekey: if k == "enter": @@ -142,7 +140,8 @@ class ActionBar(urwid.WidgetWrap): elif k in self.onekey: self.prompt_execute(k) elif k == "enter": - self.command_history.add_command(self._w.cbuf, True) + cmd = command_lexer.quote(self._w.cbuf.text) + self.master.commands.execute(f"command_history.add {cmd} true") self.prompt_execute(self._w.get_edit_text()) else: if common.is_keypress(k): diff --git a/mitmproxy/utils/debug.py b/mitmproxy/utils/debug.py index c9ffb614..e36a50a4 100644 --- a/mitmproxy/utils/debug.py +++ b/mitmproxy/utils/debug.py @@ -10,6 +10,10 @@ from OpenSSL import SSL from mitmproxy import version +def remote_debug(host='localhost', port=4444): + import remote_pdb + remote_pdb.RemotePdb(host, port).set_trace() + def dump_system_info(): mitmproxy_version = version.get_dev_version() |