blob: fb27e80519da35caa2df18bc1bd9d7482961c5aa (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
import atexit
import collections
import os
import typing
from mitmproxy import command
from mitmproxy import ctx
class CommandHistory:
def __init__(self, size: int = 300) -> None:
self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size)
self.filtered_commands: typing.Deque[str] = collections.deque()
self.current_index: int = -1
self.filter_str: str = ''
_command_history_dir = os.path.expanduser(ctx.options.confdir)
if not os.path.exists(_command_history_dir):
os.makedirs(_command_history_dir)
self.command_history_path = os.path.join(_command_history_dir, 'command_history')
_history_lines: typing.List[str] = []
if os.path.exists(self.command_history_path):
_history_lines = open(self.command_history_path, 'r').readlines()
self.command_history_file = open(self.command_history_path, 'w')
for l in _history_lines:
self.add_command(l.strip())
atexit.register(self.cleanup)
def cleanup(self):
self._reload_saved_commands()
if self.command_history_file and not self.command_history_file.closed:
self.command_history_file.close()
@property
def last_filtered_index(self):
return len(self.filtered_commands) - 1
@command.command("command_history.clear")
def clear_history(self):
self.saved_commands.clear()
self.filtered_commands.clear()
self.command_history_file.truncate(0)
self.command_history_file.seek(0)
self.command_history_file.flush()
self.restart()
@command.command("command_history.cancel")
def restart(self) -> None:
self.filtered_commands = self.saved_commands.copy()
self.current_index = -1
@command.command("command_history.next")
def get_next(self) -> str:
if self.current_index == -1 or self.current_index == self.last_filtered_index:
self.current_index = -1
return ''
elif self.current_index < self.last_filtered_index:
self.current_index += 1
ret = self.filtered_commands[self.current_index]
return ret
@command.command("command_history.prev")
def get_prev(self) -> str:
if self.current_index == -1:
if self.last_filtered_index >= 0:
self.current_index = self.last_filtered_index
else:
return ''
elif self.current_index > 0:
self.current_index -= 1
ret = self.filtered_commands[self.current_index]
return ret
@command.command("command_history.filter")
def set_filter(self, command: str) -> None:
self.filter_str = command
_filtered_commands = [c for c in self.saved_commands if c.startswith(command)]
self.filtered_commands = collections.deque(_filtered_commands)
if command and command not in self.filtered_commands:
self.filtered_commands.append(command)
self.current_index = -1
@command.command("command_history.add")
def add_command(self, command: str) -> None:
if command.strip() == '':
return
self._reload_saved_commands()
if command in self.saved_commands:
self.saved_commands.remove(command)
self.saved_commands.append(command)
_history_str = "\n".join(self.saved_commands)
self.command_history_file.truncate(0)
self.command_history_file.seek(0)
self.command_history_file.write(_history_str)
self.command_history_file.flush()
self.restart()
def _reload_saved_commands(self):
# First read all commands from the file to merge anything that may
# have come from a different instance of the mitmproxy or sister tools
if not os.path.exists(self.command_history_path):
return
_history_lines = open(self.command_history_path, 'r').readlines()
self.saved_commands.clear()
for l in _history_lines:
l = l.strip()
if l in self.saved_commands:
self.saved_commands.remove(l)
self.saved_commands.append(l.strip())
|