aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/controller.py
blob: 63117ef03129f55d2f69bc9ead93e71678dadacf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import queue
from mitmproxy import exceptions


class Channel:
    """
        The only way for the proxy server to communicate with the master
        is to use the channel it has been given.
    """
    def __init__(self, q, should_exit):
        self.q = q
        self.should_exit = should_exit

    def ask(self, mtype, m):
        """
        Decorate a message with a reply attribute, and send it to the master.
        Then wait for a response.

        Raises:
            exceptions.Kill: All connections should be closed immediately.
        """
        m.reply = Reply(m)
        self.q.put((mtype, m))
        while not self.should_exit.is_set():
            try:
                # The timeout is here so we can handle a should_exit event.
                g = m.reply.q.get(timeout=0.5)
            except queue.Empty:  # pragma: no cover
                continue
            if g == exceptions.Kill:
                raise exceptions.Kill()
            return g
        m.reply._state = "committed"  # suppress error message in __del__
        raise exceptions.Kill()

    def tell(self, mtype, m):
        """
        Decorate a message with a dummy reply attribute, send it to the master,
        then return immediately.
        """
        m.reply = DummyReply()
        self.q.put((mtype, m))


NO_REPLY = object()  # special object we can distinguish from a valid "None" reply.


class Reply:
    """
    Messages sent through a channel are decorated with a "reply" attribute. This
    object is used to respond to the message through the return channel.
    """
    def __init__(self, obj):
        self.obj = obj
        self.q = queue.Queue()  # type: queue.Queue

        self._state = "start"  # "start" -> "taken" -> "committed"

        # Holds the reply value. May change before things are actually commited.
        self.value = NO_REPLY

    @property
    def state(self):
        """
        The state the reply is currently in. A normal reply object goes
        sequentially through the following lifecycle:

            1. start: Initial State.
            2. taken: The reply object has been taken to be commited.
            3. committed: The reply has been sent back to the requesting party.

        This attribute is read-only and can only be modified by calling one of
        state transition functions.
        """
        return self._state

    @property
    def has_message(self):
        return self.value != NO_REPLY

    def take(self):
        """
        Scripts or other parties make "take" a reply out of a normal flow.
        For example, intercepted flows are taken out so that the connection thread does not proceed.
        """
        if self.state != "start":
            raise exceptions.ControlException(
                "Reply is {}, but expected it to be start.".format(self.state)
            )
        self._state = "taken"

    def commit(self):
        """
        Ultimately, messages are commited. This is done either automatically by
        if the message is not taken or manually by the entity which called
        .take().
        """
        if self.state != "taken":
            raise exceptions.ControlException(
                "Reply is {}, but expected it to be taken.".format(self.state)
            )
        if not self.has_message:
            raise exceptions.ControlException("There is no reply message.")
        self._state = "committed"
        self.q.put(self.value)

    def ack(self, force=False):
        if self.state not in {"start", "taken"}:
            raise exceptions.ControlException(
                "Reply is {}, but expected it to be start or taken.".format(self.state)
            )
        self.send(self.obj, force)

    def kill(self, force=False):
        self.send(exceptions.Kill, force)

    def send(self, msg, force=False):
        if self.has_message and not force:
            raise exceptions.ControlException("There is already a reply message.")
        self.value = msg

    def __del__(self):
        if self.state != "committed":
            # This will be ignored by the interpreter, but emit a warning
            raise exceptions.ControlException("Uncommitted reply: %s" % self.obj)


class DummyReply(Reply):
    """
    A reply object that is not connected to anything. In contrast to regular
    Reply objects, DummyReply objects are reset to "start" at the end of an
    handler so that they can be used multiple times. Useful when we need an
    object to seem like it has a channel, and during testing.
    """
    def __init__(self):
        super().__init__(None)
        self._should_reset = False

    def mark_reset(self):
        if self.state != "committed":
            raise exceptions.ControlException("Uncommitted reply: %s" % self.obj)
        self._should_reset = True

    def reset(self):
        if self._should_reset:
            self._state = "start"
            self.value = NO_REPLY

    def __del__(self):
        pass