aboutsummaryrefslogtreecommitdiffstats
path: root/test/mitmproxy/test_controller.py
blob: 953d37e12b38082462d026a0d15298cbfaf32d59 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import asyncio
import queue
import pytest

from mitmproxy.exceptions import Kill, ControlException
from mitmproxy import controller
from mitmproxy.test import taddons
import mitmproxy.ctx


@pytest.mark.asyncio
async def test_master():

    class tAddon:
        def log(self, _):
            ctx.master.should_exit.set()

    with taddons.context(tAddon()) as ctx:
        assert not ctx.master.should_exit.is_set()

        async def test():
            mitmproxy.ctx.log("test")

        asyncio.ensure_future(test())
        assert await ctx.master.await_log("test")
        assert ctx.master.should_exit.is_set()


class TestReply:
    def test_simple(self):
        reply = controller.Reply(42)
        assert reply.state == "start"

        reply.send("foo")
        assert reply.value == "foo"

        reply.take()
        assert reply.state == "taken"

        with pytest.raises(queue.Empty):
            reply.q.get_nowait()
        reply.commit()
        assert reply.state == "committed"
        assert reply.q.get() == "foo"

    def test_kill(self):
        reply = controller.Reply(43)
        reply.kill()
        reply.take()
        reply.commit()
        assert reply.q.get() == Kill

    def test_ack(self):
        reply = controller.Reply(44)
        reply.ack()
        reply.take()
        reply.commit()
        assert reply.q.get() == 44

    def test_reply_none(self):
        reply = controller.Reply(45)
        reply.send(None)
        reply.take()
        reply.commit()
        assert reply.q.get() is None

    def test_commit_no_reply(self):
        reply = controller.Reply(46)
        reply.take()
        with pytest.raises(ControlException):
            reply.commit()
        reply.ack()
        reply.commit()

    def test_double_send(self):
        reply = controller.Reply(47)
        reply.send(1)
        with pytest.raises(ControlException):
            reply.send(2)
        reply.take()
        reply.commit()

    def test_state_transitions(self):
        states = {"start", "taken", "committed"}
        accept = {
            "take": {"start"},
            "commit": {"taken"},
            "ack": {"start", "taken"},
        }
        for fn, ok in accept.items():
            for state in states:
                r = controller.Reply(48)
                r._state = state
                if fn == "commit":
                    r.value = 49
                if state in ok:
                    getattr(r, fn)()
                else:
                    with pytest.raises(ControlException):
                        getattr(r, fn)()
                r._state = "committed"  # hide warnings on deletion

    def test_del(self):
        reply = controller.Reply(47)
        with pytest.raises(ControlException):
            reply.__del__()
        reply.ack()
        reply.take()
        reply.commit()


class TestDummyReply:
    def test_simple(self):
        reply = controller.DummyReply()
        for _ in range(2):
            reply.ack()
            reply.take()
            reply.commit()
            reply.mark_reset()
            reply.reset()
        assert reply.state == "start"

    def test_reset(self):
        reply = controller.DummyReply()
        reply.ack()
        reply.take()
        with pytest.raises(ControlException):
            reply.mark_reset()
        reply.commit()
        reply.mark_reset()
        assert reply.state == "committed"
        reply.reset()
        assert reply.state == "start"

    def test_del(self):
        reply = controller.DummyReply()
        reply.__del__()