aboutsummaryrefslogtreecommitdiffstats
path: root/test/examples/webscanner_helper/test_watchdog.py
blob: 43e59310da4f76446a1d69e70bc4f1ee742857b2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import time
from pathlib import Path
from unittest import mock

from mitmproxy.connections import ServerConnection
from mitmproxy.exceptions import HttpSyntaxException
from mitmproxy.test import tflow
from mitmproxy.test import tutils
import multiprocessing

from examples.complex.webscanner_helper.watchdog import WatchdogAddon, logger


class TestWatchdog:

    def test_init_file(self, tmpdir):
        tmpfile = tmpdir.join("tmpfile")
        with open(tmpfile, "w") as tfile:
            tfile.write("")
        event = multiprocessing.Event()
        try:
            WatchdogAddon(event, Path(tmpfile))
        except RuntimeError:
            assert True
        else:
            assert False

    def test_init_dir(self, tmpdir):
        event = multiprocessing.Event()
        mydir = tmpdir.join("mydir")
        assert not Path(mydir).exists()
        WatchdogAddon(event, Path(mydir))
        assert Path(mydir).exists()

    def test_serverconnect(self, tmpdir):
        event = multiprocessing.Event()
        w = WatchdogAddon(event, Path(tmpdir), timeout=10)
        with mock.patch('mitmproxy.connections.ServerConnection.settimeout') as mock_set_timeout:
            w.serverconnect(ServerConnection("127.0.0.1"))
        mock_set_timeout.assert_called()

    def test_serverconnect_None(self, tmpdir):
        event = multiprocessing.Event()
        w = WatchdogAddon(event, Path(tmpdir))
        with mock.patch('mitmproxy.connections.ServerConnection.settimeout') as mock_set_timeout:
            w.serverconnect(ServerConnection("127.0.0.1"))
        assert not mock_set_timeout.called

    def test_trigger(self, tmpdir):
        event = multiprocessing.Event()
        w = WatchdogAddon(event, Path(tmpdir))
        f = tflow.tflow(resp=tutils.tresp())
        f.error = "Test Error"

        with mock.patch.object(logger, 'error') as mock_error:
            open_mock = mock.mock_open()
            with mock.patch("pathlib.Path.open", open_mock, create=True):
                w.error(f)
            mock_error.assert_called()
            open_mock.assert_called()

    def test_trigger_http_synatx(self, tmpdir):
        event = multiprocessing.Event()
        w = WatchdogAddon(event, Path(tmpdir))
        f = tflow.tflow(resp=tutils.tresp())
        f.error = HttpSyntaxException()
        assert isinstance(f.error, HttpSyntaxException)

        with mock.patch.object(logger, 'error') as mock_error:
            open_mock = mock.mock_open()
            with mock.patch("pathlib.Path.open", open_mock, create=True):
                w.error(f)
            assert not mock_error.called
            assert not open_mock.called

    def test_timeout(self, tmpdir):
        event = multiprocessing.Event()
        w = WatchdogAddon(event, Path(tmpdir))

        assert w.not_in_timeout(None, None)
        assert w.not_in_timeout(time.time, None)
        with mock.patch('time.time', return_value=5):
            assert not w.not_in_timeout(3, 20)
            assert w.not_in_timeout(3, 1)