diff options
Diffstat (limited to 'examples/complex/webscanner_helper/watchdog.py')
-rw-r--r-- | examples/complex/webscanner_helper/watchdog.py | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/examples/complex/webscanner_helper/watchdog.py b/examples/complex/webscanner_helper/watchdog.py new file mode 100644 index 00000000..867d2196 --- /dev/null +++ b/examples/complex/webscanner_helper/watchdog.py @@ -0,0 +1,71 @@ +import pathlib +import time +import typing +import logging +from datetime import datetime + +import mitmproxy.connections +import mitmproxy.http +from mitmproxy.addons.export import curl_command, raw +from mitmproxy.exceptions import HttpSyntaxException + +logger = logging.getLogger(__name__) + + +class WatchdogAddon(): + """ The Watchdog Add-on can be used in combination with web application scanners in oder to check if the device + under test responds correctls to the scanner's responses. + + The Watchdog Add-on checks if the device under test responds correctly to the scanner's responses. + If the Watchdog sees that the DUT is no longer responding correctly, an multiprocessing event is set. + This information can be used to restart the device under test if necessary. + """ + + def __init__(self, event, outdir: pathlib.Path, timeout=None): + """Initializes the Watchdog. + + Args: + event: multiprocessing.Event that will be set if the watchdog is triggered. + outdir: path to a directory in which the triggering requests will be saved (curl and raw). + timeout_conn: float that specifies the timeout for the server connection + """ + self.error_event = event + self.flow_dir = outdir + if self.flow_dir.exists() and not self.flow_dir.is_dir(): + raise RuntimeError("Watchtdog output path must be a directory.") + elif not self.flow_dir.exists(): + self.flow_dir.mkdir(parents=True) + self.last_trigger: typing.Union[None, float] = None + self.timeout: typing.Union[None, float] = timeout + + def serverconnect(self, conn: mitmproxy.connections.ServerConnection): + if self.timeout is not None: + conn.settimeout(self.timeout) + + @classmethod + def not_in_timeout(cls, last_triggered, timeout): + """Checks if current error lies not in timeout after last trigger (potential reset of connection).""" + return last_triggered is None or timeout is None or (time.time() - last_triggered > timeout) + + def error(self, flow): + """ Checks if the watchdog will be triggered. + + Only triggers watchdog for timeouts after last reset and if flow.error is set (shows that error is a server + error). Ignores HttpSyntaxException Errors since this can be triggered on purpose by web application scanner. + + Args: + flow: mitmproxy.http.flow + """ + if (self.not_in_timeout(self.last_trigger, self.timeout) + and flow.error is not None and not isinstance(flow.error, HttpSyntaxException)): + + self.last_trigger = time.time() + logger.error(f"Watchdog triggered! Cause: {flow}") + self.error_event.set() + + # save the request which might have caused the problem + if flow.request: + with (self.flow_dir / f"{datetime.utcnow().isoformat()}.curl").open("w") as f: + f.write(curl_command(flow)) + with (self.flow_dir / f"{datetime.utcnow().isoformat()}.raw").open("wb") as f: + f.write(raw(flow)) |