blob: ad4c234682f9425ed72b7c95514ea3b76217aeb9 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
import atexit
import collections
import os
import typing
from mitmproxy import command
from mitmproxy import ctx
class CommandHistory:
def __init__(self, size: int = 300) -> None:
self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size)
self.is_configured = False
self.filtered_commands: typing.Deque[str] = collections.deque()
self.current_index: int = -1
self.filter_str: str = ''
self.command_history_path: str = ''
atexit.register(self.cleanup)
def cleanup(self):
self._sync_saved_commands()
@property
def last_filtered_index(self):
return len(self.filtered_commands) - 1
@command.command("command_history.clear")
def clear_history(self):
self.saved_commands.clear()
self.filtered_commands.clear()
with open(self.command_history_path, 'w') as f:
f.truncate(0)
f.seek(0)
f.flush()
f.close()
self.restart()
@command.command("command_history.cancel")
def restart(self) -> None:
self.filtered_commands = self.saved_commands.copy()
self.current_index = -1
@command.command("command_history.next")
def get_next(self) -> str:
if self.current_index == -1 or self.current_index == self.last_filtered_index:
self.current_index = -1
return ''
elif self.current_index < self.last_filtered_index:
self.current_index += 1
ret = self.filtered_commands[self.current_index]
return ret
@command.command("command_history.prev")
def get_prev(self) -> str:
if self.current_index == -1:
if self.last_filtered_index >= 0:
self.current_index = self.last_filtered_index
else:
return ''
elif self.current_index > 0:
self.current_index -= 1
ret = self.filtered_commands[self.current_index]
return ret
@command.command("command_history.filter")
def set_filter(self, command: str) -> None:
self.filter_str = command
_filtered_commands = [c for c in self.saved_commands if c.startswith(command)]
self.filtered_commands = collections.deque(_filtered_commands)
if command and command not in self.filtered_commands:
self.filtered_commands.append(command)
self.current_index = -1
@command.command("command_history.add")
def add_command(self, command: str) -> None:
if command.strip() == '':
return
self._sync_saved_commands()
if command in self.saved_commands:
self.saved_commands.remove(command)
self.saved_commands.append(command)
_history_str = "\n".join(self.saved_commands)
with open(self.command_history_path, 'w') as f:
f.truncate(0)
f.seek(0)
f.write(_history_str)
f.flush()
f.close()
self.restart()
def _sync_saved_commands(self):
# First read all commands from the file to merge anything that may
# have come from a different instance of the mitmproxy or sister tools
if not os.path.exists(self.command_history_path):
return
with open(self.command_history_path, 'r') as f:
_history_lines = f.readlines()
f.close()
self.saved_commands.clear()
for l in _history_lines:
l = l.strip()
if l in self.saved_commands:
self.saved_commands.remove(l)
self.saved_commands.append(l.strip())
def configure(self, updated: typing.Set[str]):
if self.is_configured:
return
_command_history_dir = os.path.expanduser(ctx.options.confdir)
if not os.path.exists(_command_history_dir):
os.makedirs(_command_history_dir)
self.command_history_path = os.path.join(_command_history_dir, 'command_history')
_history_lines: typing.List[str] = []
if os.path.exists(self.command_history_path):
with open(self.command_history_path, 'r') as f:
_history_lines = f.readlines()
f.close()
for l in _history_lines:
self.add_command(l.strip())
self.is_configured = True
|