aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy
diff options
context:
space:
mode:
authorHenrique <typoon@gmail.com>2019-11-23 15:31:00 -0500
committerHenrique <typoon@gmail.com>2019-11-23 15:31:00 -0500
commita866b424fe60928fb5336f1fa146326424763ca5 (patch)
treec2cdbebf181c3f15601daf472929210a59449f52 /mitmproxy
parent16b55f9476373347a3c2553e070497b383288360 (diff)
downloadmitmproxy-a866b424fe60928fb5336f1fa146326424763ca5.tar.gz
mitmproxy-a866b424fe60928fb5336f1fa146326424763ca5.tar.bz2
mitmproxy-a866b424fe60928fb5336f1fa146326424763ca5.zip
Moved command history to an addon and added a new feature:
* If you start typing a command and press "up" only commands starting with that string will be returned
Diffstat (limited to 'mitmproxy')
-rw-r--r--mitmproxy/addons/__init__.py2
-rw-r--r--mitmproxy/addons/command_history.py152
-rw-r--r--mitmproxy/command.py4
-rw-r--r--mitmproxy/tools/console/commander/commander.py89
-rw-r--r--mitmproxy/tools/console/statusbar.py11
-rw-r--r--mitmproxy/utils/debug.py4
6 files changed, 176 insertions, 86 deletions
diff --git a/mitmproxy/addons/__init__.py b/mitmproxy/addons/__init__.py
index 838fba9b..ee238938 100644
--- a/mitmproxy/addons/__init__.py
+++ b/mitmproxy/addons/__init__.py
@@ -4,6 +4,7 @@ from mitmproxy.addons import block
from mitmproxy.addons import browser
from mitmproxy.addons import check_ca
from mitmproxy.addons import clientplayback
+from mitmproxy.addons import command_history
from mitmproxy.addons import core
from mitmproxy.addons import cut
from mitmproxy.addons import disable_h2c
@@ -30,6 +31,7 @@ def default_addons():
anticomp.AntiComp(),
check_ca.CheckCA(),
clientplayback.ClientPlayback(),
+ command_history.CommandHistory(),
cut.Cut(),
disable_h2c.DisableH2C(),
export.Export(),
diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py
new file mode 100644
index 00000000..2c348d1e
--- /dev/null
+++ b/mitmproxy/addons/command_history.py
@@ -0,0 +1,152 @@
+import collections
+import copy
+import os
+import typing
+
+import mitmproxy.options
+import mitmproxy.types
+
+from mitmproxy import command
+from mitmproxy.tools.console.commander.commander import CommandBuffer
+
+
+class CommandHistory:
+ def __init__(self, size: int = 300) -> None:
+ self.saved_commands: typing.Deque[str] = collections.deque(
+ maxlen=size
+ )
+ self.index: int = 0
+
+ self.filter: str = ''
+ self.filtered_index: int = 0
+ self.filtered_commands: typing.Deque[str] = collections.deque()
+ self.filter_active: bool = True
+
+ _command_history_path = os.path.join(os.path.expanduser(mitmproxy.options.CONF_DIR), 'command_history')
+ _history_lines = open(_command_history_path, 'r').readlines()
+
+ self.command_history_file = open(_command_history_path, 'w')
+
+ for l in _history_lines:
+ self.add_command(l.strip(), True)
+
+ @property
+ def last_index(self):
+ return len(self.saved_commands) - 1
+
+ @property
+ def last_filtered_index(self):
+ return len(self.filtered_commands) - 1
+
+ @command.command("command_history.clear")
+ def clear_history(self):
+ self.saved_commands.clear()
+ self.index = 0
+ self.command_history_file.truncate(0)
+ self.command_history_file.seek(0)
+ self.command_history_file.flush()
+ self.filter = ''
+ self.filtered_index = 0
+ self.filtered_commands.clear()
+ self.filter_active = True
+
+ @command.command("command_history.next")
+ def get_next(self) -> str:
+ if self.last_index == -1:
+ return ''
+
+ if self.filter != '':
+ if self.filtered_index < self.last_filtered_index:
+ self.filtered_index = self.filtered_index + 1
+ ret = self.filtered_commands[self.filtered_index]
+ else:
+ if self.index == -1:
+ ret = ''
+ elif self.index < self.last_index:
+ self.index = self.index + 1
+ ret = self.saved_commands[self.index]
+ else:
+ self.index = -1
+ ret = ''
+
+
+ return ret
+
+ @command.command("command_history.prev")
+ def get_prev(self) -> str:
+ if self.last_index == -1:
+ return ''
+
+ if self.filter != '':
+ if self.filtered_index > 0:
+ self.filtered_index = self.filtered_index - 1
+ ret = self.filtered_commands[self.filtered_index]
+ else:
+ if self.index == -1:
+ self.index = self.last_index
+ elif self.index > 0:
+ self.index = self.index - 1
+
+ ret = self.saved_commands[self.index]
+
+ return ret
+
+ @command.command("command_history.filter")
+ def set_filter(self, command: str) -> None:
+ """
+ This is used when the user starts typing part of a command
+ and then press the "up" arrow. This way, the results returned are
+ only for the command that the user started typing
+ """
+ if command.strip() == '':
+ return
+
+ if self.filter != '':
+ last_filtered_command = self.filtered_commands[-1]
+ if command == last_filtered_command:
+ self.filter = ''
+ self.filtered_commands = []
+ self.filtered_index = 0
+ else:
+ self.filter = command
+ _filtered_commands = [c for c in self.saved_commands if c.startswith(command)]
+ self.filtered_commands = collections.deque(_filtered_commands)
+
+ if command not in self.filtered_commands:
+ self.filtered_commands.append(command)
+
+ self.filtered_index = self.last_filtered_index
+
+ # No commands found, so act like no filter was added
+ if len(self.filtered_commands) == 1:
+ self.add_command(command)
+ self.filter = ''
+
+ @command.command("command_history.cancel")
+ def restart(self) -> None:
+ self.index = -1
+ self.filter = ''
+ self.filtered_commands = []
+ self.filtered_index = 0
+
+ @command.command("command_history.add")
+ def add_command(self, command: str, execution: bool = False) -> None:
+ if command.strip() == '':
+ return
+
+ if execution:
+ if command in self.saved_commands:
+ self.saved_commands.remove(command)
+
+ self.saved_commands.append(command)
+
+ _history_str = "\n".join(self.saved_commands)
+ self.command_history_file.truncate(0)
+ self.command_history_file.seek(0)
+ self.command_history_file.write(_history_str)
+ self.command_history_file.flush()
+
+ self.restart()
+ else:
+ if command not in self.saved_commands:
+ self.saved_commands.append(command)
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
index 6977ff91..23c39594 100644
--- a/mitmproxy/command.py
+++ b/mitmproxy/command.py
@@ -101,7 +101,9 @@ class Command:
try:
bound_arguments = self.signature.bind(*args)
except TypeError as v:
- raise exceptions.CommandError(f"Command argument mismatch: {v.args[0]}")
+ expected = f'Expected: {str(self.signature.parameters)}'
+ received = f'Received: {str(args)}'
+ raise exceptions.CommandError(f"Command argument mismatch: \n\t{expected}\n\t{received}")
for name, value in bound_arguments.arguments.items():
convert_to = self.signature.parameters[name].annotation
diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py
index d661d530..c59c7f83 100644
--- a/mitmproxy/tools/console/commander/commander.py
+++ b/mitmproxy/tools/console/commander/commander.py
@@ -12,6 +12,7 @@ import mitmproxy.flow
import mitmproxy.master
import mitmproxy.types
+from mitmproxy import command_lexer
class Completer:
@abc.abstractmethod
@@ -148,88 +149,13 @@ class CommandBuffer:
self.completion = None
-# TODO: This class should be a Singleton
-class CommandHistory:
- def __init__(self, master: mitmproxy.master.Master, size: int = 300) -> None:
- self.saved_commands: collections.deque = collections.deque(
- [CommandBuffer(master, "")],
- maxlen=size
- )
- self.index: int = 0
- self.size: int = size
- self.master: mitmproxy.master.Master = master
-
- _command_history_path = os.path.join(os.path.expanduser(mitmproxy.options.CONF_DIR), 'command_history')
- if os.path.exists(_command_history_path):
- with open(_command_history_path, 'r') as f:
- for l in f.readlines():
- cbuf = CommandBuffer(master, l.strip())
- self.add_command(cbuf)
- f.close()
-
- self.command_history_file = open(_command_history_path, 'w')
-
- @property
- def last_index(self):
- return len(self.saved_commands) - 1
-
- def clear_history(self):
- """
- Needed for test suite.
- TODO: Maybe create a command to clear the history?
- """
- self.saved_commands: collections.deque = collections.deque(
- [CommandBuffer(self.master, "")],
- maxlen=self.size
- )
-
- self.index = 0
- self.command_history_file.truncate(0)
- self.command_history_file.seek(0)
- self.command_history_file.flush()
-
- def get_next(self) -> typing.Optional[CommandBuffer]:
- if self.index < self.last_index:
- self.index = self.index + 1
- return self.saved_commands[self.index]
-
- def get_prev(self) -> typing.Optional[CommandBuffer]:
- if self.index > 0:
- self.index = self.index - 1
- return self.saved_commands[self.index]
-
- def add_command(self, command: CommandBuffer, execution: bool = False) -> None:
- if self.index == self.last_index or execution:
- last_item = self.saved_commands[-1]
- last_item_empty = not last_item.text
- if last_item.text == command.text or (last_item_empty and execution):
- self.saved_commands[-1] = copy.copy(command)
- else:
- self.saved_commands.append(command)
- if not execution and self.index < self.last_index:
- self.index += 1
- if execution:
- self.index = self.last_index
-
- # This prevents the constructor from trying to overwrite the file
- # that it is currently reading
- if hasattr(self, 'command_history_file'):
- _history_str = "\n".join([c.text for c in self.saved_commands])
- self.command_history_file.truncate(0)
- self.command_history_file.seek(0)
- self.command_history_file.write(_history_str)
- self.command_history_file.flush()
-
-
class CommandEdit(urwid.WidgetWrap):
leader = ": "
- def __init__(self, master: mitmproxy.master.Master,
- text: str, history: CommandHistory) -> None:
+ def __init__(self, master: mitmproxy.master.Master, text: str) -> None:
super().__init__(urwid.Text(self.leader))
self.master = master
self.cbuf = CommandBuffer(master, text)
- self.history = history
self.update()
def keypress(self, size, key) -> None:
@@ -240,10 +166,15 @@ class CommandEdit(urwid.WidgetWrap):
elif key == "right":
self.cbuf.right()
elif key == "up":
- self.history.add_command(self.cbuf)
- self.cbuf = self.history.get_prev() or self.cbuf
+ _cmd = command_lexer.quote(self.cbuf.text)
+ self.master.commands.execute("command_history.filter %s" % _cmd)
+ cmd = self.master.commands.execute("command_history.prev")
+ self.cbuf = CommandBuffer(self.master, cmd)
elif key == "down":
- self.cbuf = self.history.get_next() or self.cbuf
+ _cmd = command_lexer.quote(self.cbuf.text)
+ self.master.commands.execute("command_history.filter %s" % _cmd)
+ cmd = self.master.commands.execute("command_history.next")
+ self.cbuf = CommandBuffer(self.master, cmd)
elif key == "shift tab":
self.cbuf.cycle_completion(False)
elif key == "tab":
diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py
index 43f5170d..6d040d92 100644
--- a/mitmproxy/tools/console/statusbar.py
+++ b/mitmproxy/tools/console/statusbar.py
@@ -3,10 +3,11 @@ from typing import Optional
import urwid
+import mitmproxy.tools.console.master # noqa
+from mitmproxy import command_lexer
from mitmproxy.tools.console import common
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import commandexecutor
-import mitmproxy.tools.console.master # noqa
from mitmproxy.tools.console.commander import commander
@@ -43,8 +44,6 @@ class ActionBar(urwid.WidgetWrap):
signals.status_prompt_onekey.connect(self.sig_prompt_onekey)
signals.status_prompt_command.connect(self.sig_prompt_command)
- self.command_history = commander.CommandHistory(master)
-
self.prompting = None
self.onekey = False
@@ -104,7 +103,6 @@ class ActionBar(urwid.WidgetWrap):
self._w = commander.CommandEdit(
self.master,
partial,
- self.command_history,
)
if cursor is not None:
self._w.cbuf.cursor = cursor
@@ -134,7 +132,7 @@ class ActionBar(urwid.WidgetWrap):
def keypress(self, size, k):
if self.prompting:
if k == "esc":
- self.command_history.index = self.command_history.last_index
+ self.master.commands.execute('command_history.cancel')
self.prompt_done()
elif self.onekey:
if k == "enter":
@@ -142,7 +140,8 @@ class ActionBar(urwid.WidgetWrap):
elif k in self.onekey:
self.prompt_execute(k)
elif k == "enter":
- self.command_history.add_command(self._w.cbuf, True)
+ cmd = command_lexer.quote(self._w.cbuf.text)
+ self.master.commands.execute(f"command_history.add {cmd} true")
self.prompt_execute(self._w.get_edit_text())
else:
if common.is_keypress(k):
diff --git a/mitmproxy/utils/debug.py b/mitmproxy/utils/debug.py
index c9ffb614..e36a50a4 100644
--- a/mitmproxy/utils/debug.py
+++ b/mitmproxy/utils/debug.py
@@ -10,6 +10,10 @@ from OpenSSL import SSL
from mitmproxy import version
+def remote_debug(host='localhost', port=4444):
+ import remote_pdb
+ remote_pdb.RemotePdb(host, port).set_trace()
+
def dump_system_info():
mitmproxy_version = version.get_dev_version()