aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenrique <typoon@gmail.com>2019-11-27 08:13:19 -0500
committerHenrique <typoon@gmail.com>2019-11-27 08:13:19 -0500
commit863d2fbcb20b3f94df2ef2cf2478e86a736b7b65 (patch)
tree945e191b5815c1f98ad429cc66ed789ae6e473b2
parent09c0162e8d1ee1480945725762557b8c8f02c2f2 (diff)
parent819d5e631757f33a751eab8491b39eaacdb0c0c9 (diff)
downloadmitmproxy-863d2fbcb20b3f94df2ef2cf2478e86a736b7b65.tar.gz
mitmproxy-863d2fbcb20b3f94df2ef2cf2478e86a736b7b65.tar.bz2
mitmproxy-863d2fbcb20b3f94df2ef2cf2478e86a736b7b65.zip
Merge branch 'command-history-file' of github.com:typoon/mitmproxy into command-history-file
-rw-r--r--mitmproxy/addons/command_history.py191
-rw-r--r--mitmproxy/tools/console/commander/commander.py13
-rw-r--r--mitmproxy/tools/console/statusbar.py7
-rw-r--r--test/mitmproxy/addons/test_command_history.py103
-rw-r--r--test/mitmproxy/tools/console/test_commander.py12
5 files changed, 105 insertions, 221 deletions
diff --git a/mitmproxy/addons/command_history.py b/mitmproxy/addons/command_history.py
index ad4c2346..f86a6196 100644
--- a/mitmproxy/addons/command_history.py
+++ b/mitmproxy/addons/command_history.py
@@ -1,6 +1,5 @@
-import atexit
-import collections
import os
+import pathlib
import typing
from mitmproxy import command
@@ -8,137 +7,77 @@ from mitmproxy import ctx
class CommandHistory:
- def __init__(self, size: int = 300) -> None:
- self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size)
- self.is_configured = False
+ VACUUM_SIZE = 1024
- self.filtered_commands: typing.Deque[str] = collections.deque()
- self.current_index: int = -1
- self.filter_str: str = ''
- self.command_history_path: str = ''
+ def __init__(self) -> None:
+ self.history: typing.List[str] = []
+ self.filtered_history: typing.List[str] = [""]
+ self.current_index: int = 0
- atexit.register(self.cleanup)
-
- def cleanup(self):
- self._sync_saved_commands()
+ def load(self, loader):
+ loader.add_option(
+ "command_history", bool, True,
+ """Persist command history between mitmproxy invocations."""
+ )
@property
- def last_filtered_index(self):
- return len(self.filtered_commands) - 1
-
- @command.command("command_history.clear")
- def clear_history(self):
- self.saved_commands.clear()
- self.filtered_commands.clear()
-
- with open(self.command_history_path, 'w') as f:
- f.truncate(0)
- f.seek(0)
- f.flush()
- f.close()
-
- self.restart()
-
- @command.command("command_history.cancel")
- def restart(self) -> None:
- self.filtered_commands = self.saved_commands.copy()
- self.current_index = -1
-
- @command.command("command_history.next")
- def get_next(self) -> str:
-
- if self.current_index == -1 or self.current_index == self.last_filtered_index:
- self.current_index = -1
- return ''
- elif self.current_index < self.last_filtered_index:
- self.current_index += 1
-
- ret = self.filtered_commands[self.current_index]
-
- return ret
-
- @command.command("command_history.prev")
- def get_prev(self) -> str:
-
- if self.current_index == -1:
- if self.last_filtered_index >= 0:
- self.current_index = self.last_filtered_index
- else:
- return ''
-
- elif self.current_index > 0:
- self.current_index -= 1
-
- ret = self.filtered_commands[self.current_index]
-
- return ret
-
- @command.command("command_history.filter")
- def set_filter(self, command: str) -> None:
- self.filter_str = command
-
- _filtered_commands = [c for c in self.saved_commands if c.startswith(command)]
- self.filtered_commands = collections.deque(_filtered_commands)
-
- if command and command not in self.filtered_commands:
- self.filtered_commands.append(command)
-
- self.current_index = -1
-
- @command.command("command_history.add")
+ def history_file(self) -> pathlib.Path:
+ return pathlib.Path(os.path.expanduser(ctx.options.confdir)) / "command_history"
+
+ def running(self):
+ # FIXME: We have a weird bug where the contract for configure is not followed and it is never called with
+ # confdir or command_history as updated.
+ self.configure("command_history")
+
+ def configure(self, updated):
+ if "command_history" in updated or "confdir" in updated:
+ if ctx.options.command_history and self.history_file.is_file():
+ self.history = self.history_file.read_text().splitlines()
+
+ def done(self):
+ if ctx.options.command_history and len(self.history) > self.VACUUM_SIZE:
+ # vacuum history so that it doesn't grow indefinitely.
+ history_str = "\n".join(self.history[-self.VACUUM_SIZE/2:]) + "\n"
+ self.history_file.write_text(history_str)
+
+ @command.command("commands.history.add")
def add_command(self, command: str) -> None:
- if command.strip() == '':
+ if not command.strip():
return
- self._sync_saved_commands()
-
- if command in self.saved_commands:
- self.saved_commands.remove(command)
- self.saved_commands.append(command)
-
- _history_str = "\n".join(self.saved_commands)
- with open(self.command_history_path, 'w') as f:
- f.truncate(0)
- f.seek(0)
- f.write(_history_str)
- f.flush()
- f.close()
-
- self.restart()
-
- def _sync_saved_commands(self):
- # First read all commands from the file to merge anything that may
- # have come from a different instance of the mitmproxy or sister tools
- if not os.path.exists(self.command_history_path):
- return
+ self.history.append(command)
+ if ctx.options.command_history:
+ with self.history_file.open("a") as f:
+ f.write(f"{command}\n")
- with open(self.command_history_path, 'r') as f:
- _history_lines = f.readlines()
- f.close()
+ @command.command("commands.history.get")
+ def get_history(self) -> typing.Sequence[str]:
+ """Get the entire command history."""
+ return self.history.copy()
- self.saved_commands.clear()
- for l in _history_lines:
- l = l.strip()
- if l in self.saved_commands:
- self.saved_commands.remove(l)
- self.saved_commands.append(l.strip())
-
- def configure(self, updated: typing.Set[str]):
- if self.is_configured:
- return
-
- _command_history_dir = os.path.expanduser(ctx.options.confdir)
- if not os.path.exists(_command_history_dir):
- os.makedirs(_command_history_dir)
-
- self.command_history_path = os.path.join(_command_history_dir, 'command_history')
- _history_lines: typing.List[str] = []
- if os.path.exists(self.command_history_path):
- with open(self.command_history_path, 'r') as f:
- _history_lines = f.readlines()
- f.close()
-
- for l in _history_lines:
- self.add_command(l.strip())
+ @command.command("commands.history.clear")
+ def clear_history(self):
+ self.history_file.unlink()
+ self.history = []
+
+ # Functionality to provide a filtered list that can be iterated through.
+
+ @command.command("commands.history.filter")
+ def set_filter(self, prefix: str) -> None:
+ self.filtered_history = [
+ cmd
+ for cmd in self.history
+ if cmd.startswith(prefix)
+ ]
+ self.filtered_history.append(prefix)
+ self.current_index = len(self.filtered_history) - 1
+
+ @command.command("commands.history.next")
+ def get_next(self) -> str:
+ self.current_index = min(self.current_index + 1, len(self.filtered_history) - 1)
+ return self.filtered_history[self.current_index]
- self.is_configured = True
+ @command.command("commands.history.prev")
+ def get_prev(self) -> str:
+ self.current_index = max(0, self.current_index - 1)
+ return self.filtered_history[self.current_index]
diff --git a/mitmproxy/tools/console/commander/commander.py b/mitmproxy/tools/console/commander/commander.py
index 0f45aa1f..0feae28e 100644
--- a/mitmproxy/tools/console/commander/commander.py
+++ b/mitmproxy/tools/console/commander/commander.py
@@ -9,8 +9,6 @@ import mitmproxy.flow
import mitmproxy.master
import mitmproxy.types
-from mitmproxy import command_lexer
-
class Completer:
@abc.abstractmethod
@@ -200,7 +198,7 @@ class CommandEdit(urwid.WidgetWrap):
self.cbuf.backspace()
if self.cbuf.text == '':
self.active_filter = False
- self.master.commands.execute("command_history.filter ''")
+ self.master.commands.call("commands.history.filter", "")
self.filter_str = ''
elif key == "left" or key == "ctrl b":
self.cbuf.left()
@@ -210,20 +208,19 @@ class CommandEdit(urwid.WidgetWrap):
if self.active_filter is False:
self.active_filter = True
self.filter_str = self.cbuf.text
- _cmd = command_lexer.quote(self.cbuf.text)
- self.master.commands.execute("command_history.filter %s" % _cmd)
- cmd = self.master.commands.execute("command_history.prev")
+ self.master.commands.call("commands.history.filter", self.cbuf.text)
+ cmd = self.master.commands.execute("commands.history.prev")
self.cbuf = CommandBuffer(self.master, cmd)
elif key == "down" or key == "ctrl n":
prev_cmd = self.cbuf.text
- cmd = self.master.commands.execute("command_history.next")
+ cmd = self.master.commands.execute("commands.history.next")
if cmd == '':
if prev_cmd == self.filter_str:
self.cbuf = CommandBuffer(self.master, prev_cmd)
else:
self.active_filter = False
- self.master.commands.execute("command_history.filter ''")
+ self.master.commands.call("commands.history.filter", "")
self.filter_str = ''
self.cbuf = CommandBuffer(self.master, '')
else:
diff --git a/mitmproxy/tools/console/statusbar.py b/mitmproxy/tools/console/statusbar.py
index 39141b97..43b81d22 100644
--- a/mitmproxy/tools/console/statusbar.py
+++ b/mitmproxy/tools/console/statusbar.py
@@ -132,7 +132,6 @@ class ActionBar(urwid.WidgetWrap):
def keypress(self, size, k):
if self.prompting:
if k == "esc":
- self.master.commands.execute('command_history.cancel')
self.prompt_done()
elif self.onekey:
if k == "enter":
@@ -140,9 +139,9 @@ class ActionBar(urwid.WidgetWrap):
elif k in self.onekey:
self.prompt_execute(k)
elif k == "enter":
- cmd = command_lexer.quote(self._w.cbuf.text)
- self.master.commands.execute(f"command_history.add {cmd}")
- self.prompt_execute(self._w.get_edit_text())
+ text = self._w.get_edit_text()
+ self.prompt_execute(text)
+ self.master.commands.call("commands.history.add", text)
else:
if common.is_keypress(k):
self._w.keypress(size, k)
diff --git a/test/mitmproxy/addons/test_command_history.py b/test/mitmproxy/addons/test_command_history.py
index 026ce53e..e38b4061 100644
--- a/test/mitmproxy/addons/test_command_history.py
+++ b/test/mitmproxy/addons/test_command_history.py
@@ -1,81 +1,30 @@
import os
-import pytest
-from mitmproxy import options
from mitmproxy.addons import command_history
from mitmproxy.test import taddons
-@pytest.fixture(autouse=True)
-def tctx(tmpdir):
- # This runs before each test
- dir_name = tmpdir.mkdir('mitmproxy').dirname
- confdir = dir_name
-
- opts = options.Options()
- opts.set(*[f"confdir={confdir}"])
- tctx = taddons.context(options=opts)
- ch = command_history.CommandHistory()
- tctx.master.addons.add(ch)
- ch.configure([])
-
- yield tctx
-
- # This runs after each test
- ch.cleanup()
-
-
class TestCommandHistory:
- def test_existing_command_history(self, tctx):
+ def test_load_from_file(self, tmpdir):
commands = ['cmd1', 'cmd2', 'cmd3']
- confdir = tctx.options.confdir
- f = open(os.path.join(confdir, 'command_history'), 'w')
- f.write("\n".join(commands))
- f.close()
-
- history = command_history.CommandHistory()
- history.configure([])
-
- saved_commands = [cmd for cmd in history.saved_commands]
- assert saved_commands == ['cmd1', 'cmd2', 'cmd3']
+ with open(tmpdir.join('command_history'), 'w') as f:
+ f.write("\n".join(commands))
- history.cleanup()
+ ch = command_history.CommandHistory()
+ with taddons.context(ch) as tctx:
+ tctx.options.confdir = str(tmpdir)
+ assert ch.history == commands
- def test_add_command(self, tctx):
- history = command_history.CommandHistory(3)
- history.configure([])
+ def test_add_command(self):
+ history = command_history.CommandHistory()
history.add_command('cmd1')
history.add_command('cmd2')
- saved_commands = [cmd for cmd in history.saved_commands]
- assert saved_commands == ['cmd1', 'cmd2']
-
- history.add_command('')
- saved_commands = [cmd for cmd in history.saved_commands]
- assert saved_commands == ['cmd1', 'cmd2']
-
- # The history size is only 3. So, we forget the first
- # one command, when adding fourth command
- history.add_command('cmd3')
- history.add_command('cmd4')
- saved_commands = [cmd for cmd in history.saved_commands]
- assert saved_commands == ['cmd2', 'cmd3', 'cmd4']
+ assert history.history == ['cmd1', 'cmd2']
history.add_command('')
- saved_commands = [cmd for cmd in history.saved_commands]
- assert saved_commands == ['cmd2', 'cmd3', 'cmd4']
-
- # Commands with the same text are not repeated in the history one by one
- history.add_command('cmd3')
- saved_commands = [cmd for cmd in history.saved_commands]
- assert saved_commands == ['cmd2', 'cmd4', 'cmd3']
-
- history.add_command('cmd2')
- saved_commands = [cmd for cmd in history.saved_commands]
- assert saved_commands == ['cmd4', 'cmd3', 'cmd2']
-
- history.cleanup()
+ assert history.history == ['cmd1', 'cmd2']
def test_get_next_and_prev(self, tctx):
history = command_history.CommandHistory(5)
@@ -161,7 +110,7 @@ class TestCommandHistory:
history.add_command('cmd2')
history.clear_history()
- saved_commands = [cmd for cmd in history.saved_commands]
+ saved_commands = [cmd for cmd in history.history]
assert saved_commands == []
assert history.get_next() == ''
@@ -215,57 +164,57 @@ class TestCommandHistory:
for i in instances:
i.configure([])
- saved_commands = [cmd for cmd in i.saved_commands]
+ saved_commands = [cmd for cmd in i.history]
assert saved_commands == []
instances[0].add_command('cmd1')
- saved_commands = [cmd for cmd in instances[0].saved_commands]
+ saved_commands = [cmd for cmd in instances[0].history]
assert saved_commands == ['cmd1']
# These instances haven't yet added a new command, so they haven't
# yet reloaded their commands from the command file.
# This is expected, because if the user is filtering a command on
# another window, we don't want to interfere with that
- saved_commands = [cmd for cmd in instances[1].saved_commands]
+ saved_commands = [cmd for cmd in instances[1].history]
assert saved_commands == []
- saved_commands = [cmd for cmd in instances[2].saved_commands]
+ saved_commands = [cmd for cmd in instances[2].history]
assert saved_commands == []
# Since the second instanced added a new command, its list of
# saved commands has been updated to have the commands from the
# first instance + its own commands
instances[1].add_command('cmd2')
- saved_commands = [cmd for cmd in instances[1].saved_commands]
+ saved_commands = [cmd for cmd in instances[1].history]
assert saved_commands == ['cmd1', 'cmd2']
- saved_commands = [cmd for cmd in instances[0].saved_commands]
+ saved_commands = [cmd for cmd in instances[0].history]
assert saved_commands == ['cmd1']
# Third instance is still empty as it has not yet ran any command
- saved_commands = [cmd for cmd in instances[2].saved_commands]
+ saved_commands = [cmd for cmd in instances[2].history]
assert saved_commands == []
instances[2].add_command('cmd3')
- saved_commands = [cmd for cmd in instances[2].saved_commands]
+ saved_commands = [cmd for cmd in instances[2].history]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3']
instances[0].add_command('cmd4')
- saved_commands = [cmd for cmd in instances[0].saved_commands]
+ saved_commands = [cmd for cmd in instances[0].history]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4']
instances.append(command_history.CommandHistory(10))
instances[3].configure([])
- saved_commands = [cmd for cmd in instances[3].saved_commands]
+ saved_commands = [cmd for cmd in instances[3].history]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4']
instances[0].add_command('cmd_before_close')
instances.pop(0)
- saved_commands = [cmd for cmd in instances[0].saved_commands]
+ saved_commands = [cmd for cmd in instances[0].history]
assert saved_commands == ['cmd1', 'cmd2']
instances[0].add_command('new_cmd')
- saved_commands = [cmd for cmd in instances[0].saved_commands]
+ saved_commands = [cmd for cmd in instances[0].history]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd_before_close', 'new_cmd']
instances.pop(0)
@@ -285,7 +234,7 @@ class TestCommandHistory:
for i in instances:
i.configure([])
i.clear_history()
- saved_commands = [cmd for cmd in i.saved_commands]
+ saved_commands = [cmd for cmd in i.history]
assert saved_commands == []
instances[0].add_command('cmd1')
@@ -294,7 +243,7 @@ class TestCommandHistory:
instances[1].add_command('cmd4')
instances[1].add_command('cmd5')
- saved_commands = [cmd for cmd in instances[1].saved_commands]
+ saved_commands = [cmd for cmd in instances[1].history]
assert saved_commands == ['cmd1', 'cmd2', 'cmd3', 'cmd4', 'cmd5']
instances.pop()
diff --git a/test/mitmproxy/tools/console/test_commander.py b/test/mitmproxy/tools/console/test_commander.py
index 4fa10eb8..aa859092 100644
--- a/test/mitmproxy/tools/console/test_commander.py
+++ b/test/mitmproxy/tools/console/test_commander.py
@@ -114,8 +114,8 @@ class TestCommandEdit:
def test_up_and_down(self, tctx):
edit = commander.CommandEdit(tctx.master, '')
- tctx.master.commands.execute('command_history.clear')
- tctx.master.commands.execute('command_history.add "cmd1"')
+ tctx.master.commands.execute('commands.history.clear')
+ tctx.master.commands.execute('commands.history.add "cmd1"')
edit.keypress(1, 'up')
assert edit.get_edit_text() == 'cmd1'
@@ -131,9 +131,9 @@ class TestCommandEdit:
edit = commander.CommandEdit(tctx.master, '')
- tctx.master.commands.execute('command_history.clear')
- tctx.master.commands.execute('command_history.add "cmd1"')
- tctx.master.commands.execute('command_history.add "cmd2"')
+ tctx.master.commands.execute('commands.history.clear')
+ tctx.master.commands.execute('commands.history.add "cmd1"')
+ tctx.master.commands.execute('commands.history.add "cmd2"')
edit.keypress(1, 'up')
assert edit.get_edit_text() == 'cmd2'
@@ -168,7 +168,7 @@ class TestCommandEdit:
assert edit.get_edit_text() == 'abc'
edit = commander.CommandEdit(tctx.master, '')
- tctx.master.commands.execute('command_history.add "cmd3"')
+ tctx.master.commands.execute('commands.history.add "cmd3"')
edit.keypress(1, 'z')
edit.keypress(1, 'up')