aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/addons/command_history.py
blob: fb27e80519da35caa2df18bc1bd9d7482961c5aa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import atexit
import collections
import os
import typing

from mitmproxy import command
from mitmproxy import ctx


class CommandHistory:
    def __init__(self, size: int = 300) -> None:
        self.saved_commands: typing.Deque[str] = collections.deque(maxlen=size)

        self.filtered_commands: typing.Deque[str] = collections.deque()
        self.current_index: int = -1
        self.filter_str: str = ''

        _command_history_dir = os.path.expanduser(ctx.options.confdir)
        if not os.path.exists(_command_history_dir):
            os.makedirs(_command_history_dir)

        self.command_history_path = os.path.join(_command_history_dir, 'command_history')
        _history_lines: typing.List[str] = []
        if os.path.exists(self.command_history_path):
            _history_lines = open(self.command_history_path, 'r').readlines()

        self.command_history_file = open(self.command_history_path, 'w')

        for l in _history_lines:
            self.add_command(l.strip())

        atexit.register(self.cleanup)

    def cleanup(self):
        self._reload_saved_commands()
        if self.command_history_file and not self.command_history_file.closed:
            self.command_history_file.close()

    @property
    def last_filtered_index(self):
        return len(self.filtered_commands) - 1

    @command.command("command_history.clear")
    def clear_history(self):
        self.saved_commands.clear()
        self.filtered_commands.clear()
        self.command_history_file.truncate(0)
        self.command_history_file.seek(0)
        self.command_history_file.flush()
        self.restart()

    @command.command("command_history.cancel")
    def restart(self) -> None:
        self.filtered_commands = self.saved_commands.copy()
        self.current_index = -1

    @command.command("command_history.next")
    def get_next(self) -> str:

        if self.current_index == -1 or self.current_index == self.last_filtered_index:
            self.current_index = -1
            return ''
        elif self.current_index < self.last_filtered_index:
            self.current_index += 1

        ret = self.filtered_commands[self.current_index]

        return ret

    @command.command("command_history.prev")
    def get_prev(self) -> str:

        if self.current_index == -1:
            if self.last_filtered_index >= 0:
                self.current_index = self.last_filtered_index
            else:
                return ''

        elif self.current_index > 0:
            self.current_index -= 1

        ret = self.filtered_commands[self.current_index]

        return ret

    @command.command("command_history.filter")
    def set_filter(self, command: str) -> None:
        self.filter_str = command

        _filtered_commands = [c for c in self.saved_commands if c.startswith(command)]
        self.filtered_commands = collections.deque(_filtered_commands)

        if command and command not in self.filtered_commands:
            self.filtered_commands.append(command)

        self.current_index = -1

    @command.command("command_history.add")
    def add_command(self, command: str) -> None:
        if command.strip() == '':
            return

        self._reload_saved_commands()

        if command in self.saved_commands:
            self.saved_commands.remove(command)
        self.saved_commands.append(command)

        _history_str = "\n".join(self.saved_commands)
        self.command_history_file.truncate(0)
        self.command_history_file.seek(0)
        self.command_history_file.write(_history_str)
        self.command_history_file.flush()

        self.restart()

    def _reload_saved_commands(self):
        # First read all commands from the file to merge anything that may
        # have come from a different instance of the mitmproxy or sister tools
        if not os.path.exists(self.command_history_path):
            return

        _history_lines = open(self.command_history_path, 'r').readlines()
        self.saved_commands.clear()
        for l in _history_lines:
            l = l.strip()
            if l in self.saved_commands:
                self.saved_commands.remove(l)
            self.saved_commands.append(l.strip())