aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2019-12-12 15:12:37 +0100
committerGitHub <noreply@github.com>2019-12-12 15:12:37 +0100
commita58b8c9cdbfea7bb77d36e646ce14e6f4f1d64f3 (patch)
treef7ed8885641f1d4623c932f05a0531bec8c119d8 /mitmproxy
parentd62835f6cd62cd0445c029f67e82afe32a82814a (diff)
parentd42e8b2fe6cad23d3aca38823439a39d33adc8cb (diff)
downloadmitmproxy-a58b8c9cdbfea7bb77d36e646ce14e6f4f1d64f3.tar.gz
mitmproxy-a58b8c9cdbfea7bb77d36e646ce14e6f4f1d64f3.tar.bz2
mitmproxy-a58b8c9cdbfea7bb77d36e646ce14e6f4f1d64f3.zip
Merge pull request #3724 from typoon/command-history-file
Save user executed commands to a history file
Diffstat (limited to 'mitmproxy')
-rw-r--r--mitmproxy/addons/__init__.py2
-rw-r--r--mitmproxy/addons/command_history.py89
-rw-r--r--mitmproxy/command.py6
-rw-r--r--mitmproxy/tools/_main.py1
-rw-r--r--mitmproxy/tools/console/commander/commander.py72
-rw-r--r--mitmproxy/tools/console/statusbar.py11
6 files changed, 126 insertions, 55 deletions
diff --git a/mitmproxy/addons/__init__.py b/mitmproxy/addons/__init__.py
index 838fba9b..ee238938 100644
--- a/mitmproxy/addons/__init__.py
+++ b/mitmproxy/addons/__init__.py
@@ -4,6 +4,7 @@ from mitmproxy.addons import block
from mitmproxy.addons import browser
from mitmproxy.addons import check_ca
from mitmproxy.addons import clientplayback
+from mitmproxy.addons import command_history
from mitmproxy.addons import core
from mitmproxy.addons import cut
from mitmproxy.addons import disable_h2c
@@ -30,6 +31,7 @@ def default_addons():
anticomp.AntiComp(),
check_ca.CheckCA(),
clientplayback.ClientPlayback(),
+ command_history.CommandHistory(),
cut.Cut(),
disable_h2c.DisableH2C(),
export.Export(),
diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py
new file mode 100644
index 00000000..2c1bf887
--- /dev/null
+++ b/mitmproxy/addons/command_history.py
@@ -0,0 +1,89 @@
+import os
+import pathlib
+import typing
+
+from mitmproxy import command
+from mitmproxy import ctx
+
+
+class CommandHistory:
+ VACUUM_SIZE = 1024
+
+ def __init__(self) -> None:
+ self.history: typing.List[str] = []
+ self.filtered_history: typing.List[str] = [""]
+ self.current_index: int = 0
+
+ def load(self, loader):
+ loader.add_option(
+ "command_history", bool, True,
+ """Persist command history between mitmproxy invocations."""
+ )
+
+ @property
+ def history_file(self) -> pathlib.Path:
+ return pathlib.Path(os.path.expanduser(ctx.options.confdir)) / "command_history"
+
+ def running(self):
+ # FIXME: We have a weird bug where the contract for configure is not followed and it is never called with
+ # confdir or command_history as updated.
+ self.configure("command_history")
+
+ def configure(self, updated):
+ if "command_history" in updated or "confdir" in updated:
+ if ctx.options.command_history and self.history_file.is_file():
+ self.history = self.history_file.read_text().splitlines()
+ self.set_filter('')
+
+ def done(self):
+ if ctx.options.command_history and len(self.history) > self.VACUUM_SIZE:
+ # vacuum history so that it doesn't grow indefinitely.
+ history_str = "\n".join(self.history[-self.VACUUM_SIZE / 2:]) + "\n"
+ self.history_file.write_text(history_str)
+
+ @command.command("commands.history.add")
+ def add_command(self, command: str) -> None:
+ if not command.strip():
+ return
+
+ self.history.append(command)
+ if ctx.options.command_history:
+ with self.history_file.open("a") as f:
+ f.write(f"{command}\n")
+ f.close()
+
+ self.set_filter('')
+
+ @command.command("commands.history.get")
+ def get_history(self) -> typing.Sequence[str]:
+ """Get the entire command history."""
+ return self.history.copy()
+
+ @command.command("commands.history.clear")
+ def clear_history(self):
+ if self.history_file.exists():
+ self.history_file.unlink()
+ self.history = []
+ self.set_filter('')
+
+ # Functionality to provide a filtered list that can be iterated through.
+
+ @command.command("commands.history.filter")
+ def set_filter(self, prefix: str) -> None:
+ self.filtered_history = [
+ cmd
+ for cmd in self.history
+ if cmd.startswith(prefix)
+ ]
+ self.filtered_history.append(prefix)
+ self.current_index = len(self.filtered_history) - 1
+
+ @command.command("commands.history.next")
+ def get_next(self) -> str:
+ self.current_index = min(self.current_index + 1, len(self.filtered_history) - 1)
+ return self.filtered_history[self.current_index]
+
+ @command.command("commands.history.prev")
+ def get_prev(self) -> str:
+ self.current_index = max(0, self.current_index - 1)
+ return self.filtered_history[self.current_index]
diff --git a/mitmproxy/command.py b/mitmproxy/command.py
index 6977ff91..48f9051e 100644
--- a/mitmproxy/command.py
+++ b/mitmproxy/command.py
@@ -100,8 +100,10 @@ class Command:
def prepare_args(self, args: typing.Sequence[str]) -> inspect.BoundArguments:
try:
bound_arguments = self.signature.bind(*args)
- except TypeError as v:
- raise exceptions.CommandError(f"Command argument mismatch: {v.args[0]}")
+ except TypeError:
+ expected = f'Expected: {str(self.signature.parameters)}'
+ received = f'Received: {str(args)}'
+ raise exceptions.CommandError(f"Command argument mismatch: \n\t{expected}\n\t{received}")
for name, value in bound_arguments.arguments.items():
convert_to = self.signature.parameters[name].annotation
diff --git a/mitmproxy/tools/_main.py b/mitmproxy/tools/_main.py
index 0163e8d3..b98bbe90 100644
--- a/mitmproxy/tools/_main.py
+++ b/mitmproxy/tools/_main.py
@@ -83,6 +83,7 @@ def run(
except SystemExit:
arg_check.check()
sys.exit(1)
+
try:
opts.set(*args.setoptions, defer=True)
optmanager.load_paths(
diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py
index 02eda3bf..0feae28e 100644
--- a/mitmproxy/tools/console/commander/commander.py
+++ b/mitmproxy/tools/console/commander/commander.py
@@ -1,6 +1,4 @@
import abc
-import collections
-import copy
import typing
import urwid
@@ -158,53 +156,15 @@ class CommandBuffer:
self.completion = None
-class CommandHistory:
- def __init__(self, master: mitmproxy.master.Master, size: int = 30) -> None:
- self.saved_commands: collections.deque = collections.deque(
- [CommandBuffer(master, "")],
- maxlen=size
- )
- self.index: int = 0
-
- @property
- def last_index(self):
- return len(self.saved_commands) - 1
-
- def get_next(self) -> typing.Optional[CommandBuffer]:
- if self.index < self.last_index:
- self.index = self.index + 1
- return self.saved_commands[self.index]
- return None
-
- def get_prev(self) -> typing.Optional[CommandBuffer]:
- if self.index > 0:
- self.index = self.index - 1
- return self.saved_commands[self.index]
- return None
-
- def add_command(self, command: CommandBuffer, execution: bool = False) -> None:
- if self.index == self.last_index or execution:
- last_item = self.saved_commands[-1]
- last_item_empty = not last_item.text
- if last_item.text == command.text or (last_item_empty and execution):
- self.saved_commands[-1] = copy.copy(command)
- else:
- self.saved_commands.append(command)
- if not execution and self.index < self.last_index:
- self.index += 1
- if execution:
- self.index = self.last_index
-
-
class CommandEdit(urwid.WidgetWrap):
leader = ": "
- def __init__(self, master: mitmproxy.master.Master,
- text: str, history: CommandHistory) -> None:
+ def __init__(self, master: mitmproxy.master.Master, text: str) -> None:
super().__init__(urwid.Text(self.leader))
self.master = master
+ self.active_filter = False
+ self.filter_str = ''
self.cbuf = CommandBuffer(master, text)
- self.history = history
self.update()
def keypress(self, size, key) -> None:
@@ -236,15 +196,35 @@ class CommandEdit(urwid.WidgetWrap):
self.cbuf.cursor = cursor_pos
elif key == "backspace":
self.cbuf.backspace()
+ if self.cbuf.text == '':
+ self.active_filter = False
+ self.master.commands.call("commands.history.filter", "")
+ self.filter_str = ''
elif key == "left" or key == "ctrl b":
self.cbuf.left()
elif key == "right" or key == "ctrl f":
self.cbuf.right()
elif key == "up" or key == "ctrl p":
- self.history.add_command(self.cbuf)
- self.cbuf = self.history.get_prev() or self.cbuf
+ if self.active_filter is False:
+ self.active_filter = True
+ self.filter_str = self.cbuf.text
+ self.master.commands.call("commands.history.filter", self.cbuf.text)
+ cmd = self.master.commands.execute("commands.history.prev")
+ self.cbuf = CommandBuffer(self.master, cmd)
elif key == "down" or key == "ctrl n":
- self.cbuf = self.history.get_next() or self.cbuf
+ prev_cmd = self.cbuf.text
+ cmd = self.master.commands.execute("commands.history.next")
+
+ if cmd == '':
+ if prev_cmd == self.filter_str:
+ self.cbuf = CommandBuffer(self.master, prev_cmd)
+ else:
+ self.active_filter = False
+ self.master.commands.call("commands.history.filter", "")
+ self.filter_str = ''
+ self.cbuf = CommandBuffer(self.master, '')
+ else:
+ self.cbuf = CommandBuffer(self.master, cmd)
elif key == "shift tab":
self.cbuf.cycle_completion(False)
elif key == "tab":
diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py
index 43f5170d..7bd92d01 100644
--- a/mitmproxy/tools/console/statusbar.py
+++ b/mitmproxy/tools/console/statusbar.py
@@ -3,10 +3,10 @@ from typing import Optional
import urwid
+import mitmproxy.tools.console.master # noqa
from mitmproxy.tools.console import common
from mitmproxy.tools.console import signals
from mitmproxy.tools.console import commandexecutor
-import mitmproxy.tools.console.master # noqa
from mitmproxy.tools.console.commander import commander
@@ -43,8 +43,6 @@ class ActionBar(urwid.WidgetWrap):
signals.status_prompt_onekey.connect(self.sig_prompt_onekey)
signals.status_prompt_command.connect(self.sig_prompt_command)
- self.command_history = commander.CommandHistory(master)
-
self.prompting = None
self.onekey = False
@@ -104,7 +102,6 @@ class ActionBar(urwid.WidgetWrap):
self._w = commander.CommandEdit(
self.master,
partial,
- self.command_history,
)
if cursor is not None:
self._w.cbuf.cursor = cursor
@@ -134,7 +131,6 @@ class ActionBar(urwid.WidgetWrap):
def keypress(self, size, k):
if self.prompting:
if k == "esc":
- self.command_history.index = self.command_history.last_index
self.prompt_done()
elif self.onekey:
if k == "enter":
@@ -142,8 +138,9 @@ class ActionBar(urwid.WidgetWrap):
elif k in self.onekey:
self.prompt_execute(k)
elif k == "enter":
- self.command_history.add_command(self._w.cbuf, True)
- self.prompt_execute(self._w.get_edit_text())
+ text = self._w.get_edit_text()
+ self.prompt_execute(text)
+ self.master.commands.call("commands.history.add", text)
else:
if common.is_keypress(k):
self._w.keypress(size, k)