aboutsummaryrefslogtreecommitdiffstats
path: root/examples/complex/webscanner_helper/watchdog.py
blob: 867d219681e4634785b7311ec9e03d7b4666105f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import pathlib
import time
import typing
import logging
from datetime import datetime

import mitmproxy.connections
import mitmproxy.http
from mitmproxy.addons.export import curl_command, raw
from mitmproxy.exceptions import HttpSyntaxException

logger = logging.getLogger(__name__)


class WatchdogAddon():
    """ The Watchdog Add-on can be used in combination with web application scanners in oder to check if the device
        under test responds correctls to the scanner's responses.

    The Watchdog Add-on checks if the device under test responds correctly to the scanner's responses.
    If the Watchdog sees that the DUT is no longer responding correctly, an multiprocessing event is set.
    This information can be used to restart the device under test if necessary.
    """

    def __init__(self, event, outdir: pathlib.Path, timeout=None):
        """Initializes the Watchdog.

        Args:
            event: multiprocessing.Event that will be set if the watchdog is triggered.
            outdir: path to a directory in which the triggering requests will be saved (curl and raw).
            timeout_conn: float that specifies the timeout for the server connection
        """
        self.error_event = event
        self.flow_dir = outdir
        if self.flow_dir.exists() and not self.flow_dir.is_dir():
            raise RuntimeError("Watchtdog output path must be a directory.")
        elif not self.flow_dir.exists():
            self.flow_dir.mkdir(parents=True)
        self.last_trigger: typing.Union[None, float] = None
        self.timeout: typing.Union[None, float] = timeout

    def serverconnect(self, conn: mitmproxy.connections.ServerConnection):
        if self.timeout is not None:
            conn.settimeout(self.timeout)

    @classmethod
    def not_in_timeout(cls, last_triggered, timeout):
        """Checks if current error lies not in timeout after last trigger (potential reset of connection)."""
        return last_triggered is None or timeout is None or (time.time() - last_triggered > timeout)

    def error(self, flow):
        """ Checks if the watchdog will be triggered.

        Only triggers watchdog for timeouts after last reset and if flow.error is set (shows that error is a server
        error). Ignores HttpSyntaxException Errors since this can be triggered on purpose by web application scanner.

        Args:
            flow: mitmproxy.http.flow
        """
        if (self.not_in_timeout(self.last_trigger, self.timeout)
                and flow.error is not None and not isinstance(flow.error, HttpSyntaxException)):

            self.last_trigger = time.time()
            logger.error(f"Watchdog triggered! Cause: {flow}")
            self.error_event.set()

            # save the request which might have caused the problem
            if flow.request:
                with (self.flow_dir / f"{datetime.utcnow().isoformat()}.curl").open("w") as f:
                    f.write(curl_command(flow))
                with (self.flow_dir / f"{datetime.utcnow().isoformat()}.raw").open("wb") as f:
                    f.write(raw(flow))