aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoranneborcherding <55282902+anneborcherding@users.noreply.github.com>2020-05-04 10:37:13 +0200
committerGitHub <noreply@github.com>2020-05-04 10:37:13 +0200
commit7fdcbb09e6034ab1f76724965cfdf45f3d775129 (patch)
tree9adaa530173c70d374680a510402b958ad669277
parentf4aa3ee11c01d5b8f260e57bfd7e084b7767c08e (diff)
downloadmitmproxy-7fdcbb09e6034ab1f76724965cfdf45f3d775129.tar.gz
mitmproxy-7fdcbb09e6034ab1f76724965cfdf45f3d775129.tar.bz2
mitmproxy-7fdcbb09e6034ab1f76724965cfdf45f3d775129.zip
added add-ons that enhance the performance of web application scanners. (#3961)
* added add-ons that enhance the performance of web application scanners. Co-authored-by: weichweich <14820950+weichweich@users.noreply.github.com>
-rw-r--r--examples/complex/webscanner_helper/__init__.py0
-rw-r--r--examples/complex/webscanner_helper/mapping.py144
-rw-r--r--examples/complex/webscanner_helper/urldict.py90
-rw-r--r--examples/complex/webscanner_helper/urlindex.py168
-rw-r--r--examples/complex/webscanner_helper/urlinjection.py173
-rw-r--r--examples/complex/webscanner_helper/watchdog.py71
-rw-r--r--test/examples/webscanner_helper/__init__.py0
-rw-r--r--test/examples/webscanner_helper/test_mapping.py165
-rw-r--r--test/examples/webscanner_helper/test_urldict.py89
-rw-r--r--test/examples/webscanner_helper/test_urlindex.py234
-rw-r--r--test/examples/webscanner_helper/test_urlinjection.py111
-rw-r--r--test/examples/webscanner_helper/test_watchdog.py84
12 files changed, 1329 insertions, 0 deletions
diff --git a/examples/complex/webscanner_helper/__init__.py b/examples/complex/webscanner_helper/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/examples/complex/webscanner_helper/__init__.py
diff --git a/examples/complex/webscanner_helper/mapping.py b/examples/complex/webscanner_helper/mapping.py
new file mode 100644
index 00000000..8c83bf77
--- /dev/null
+++ b/examples/complex/webscanner_helper/mapping.py
@@ -0,0 +1,144 @@
+import copy
+import logging
+import typing
+from typing import Dict
+
+from bs4 import BeautifulSoup
+
+from mitmproxy.http import HTTPFlow
+from examples.complex.webscanner_helper.urldict import URLDict
+
+NO_CONTENT = object()
+
+
+class MappingAddonConfig:
+ HTML_PARSER = "html.parser"
+
+
+class MappingAddon:
+ """ The mapping add-on can be used in combination with web application scanners to reduce their false positives.
+
+ Many web application scanners produce false positives caused by dynamically changing content of web applications
+ such as the current time or current measurements. When testing for injection vulnerabilities, web application
+ scanners are tricked into thinking they changed the content with the injected payload. In realty, the content of
+ the web application changed notwithstanding the scanner's input. When the mapping add-on is used to map the content
+ to a fixed value, these false positives can be avoided.
+ """
+
+ OPT_MAPPING_FILE = "mapping_file"
+ """File where urls and css selector to mapped content is stored.
+
+ Elements will be replaced with the content given in this file. If the content is none it will be set to the first
+ seen value.
+
+ Example:
+
+ {
+ "http://10.10.10.10": {
+ "body": "My Text"
+ },
+ "URL": {
+ "css selector": "Replace with this"
+ }
+ }
+ """
+
+ OPT_MAP_PERSISTENT = "map_persistent"
+ """Whether to store all new content in the configuration file."""
+
+ def __init__(self, filename: str, persistent: bool = False) -> None:
+ """ Initializes the mapping add-on
+
+ Args:
+ filename: str that provides the name of the file in which the urls and css selectors to mapped content is
+ stored.
+ persistent: bool that indicates whether to store all new content in the configuration file.
+
+ Example:
+ The file in which the mapping config is given should be in the following format:
+ {
+ "http://10.10.10.10": {
+ "body": "My Text"
+ },
+ "<URL>": {
+ "<css selector>": "Replace with this"
+ }
+ }
+ """
+ self.filename = filename
+ self.persistent = persistent
+ self.logger = logging.getLogger(self.__class__.__name__)
+ with open(filename, "r") as f:
+ self.mapping_templates = URLDict.load(f)
+
+ def load(self, loader):
+ loader.add_option(
+ self.OPT_MAPPING_FILE, str, "",
+ "File where replacement configuration is stored."
+ )
+ loader.add_option(
+ self.OPT_MAP_PERSISTENT, bool, False,
+ "Whether to store all new content in the configuration file."
+ )
+
+ def configure(self, updated):
+ if self.OPT_MAPPING_FILE in updated:
+ self.filename = updated[self.OPT_MAPPING_FILE]
+ with open(self.filename, "r") as f:
+ self.mapping_templates = URLDict.load(f)
+
+ if self.OPT_MAP_PERSISTENT in updated:
+ self.persistent = updated[self.OPT_MAP_PERSISTENT]
+
+ def replace(self, soup: BeautifulSoup, css_sel: str, replace: BeautifulSoup) -> None:
+ """Replaces the content of soup that matches the css selector with the given replace content."""
+ for content in soup.select(css_sel):
+ self.logger.debug(f"replace \"{content}\" with \"{replace}\"")
+ content.replace_with(copy.copy(replace))
+
+ def apply_template(self, soup: BeautifulSoup, template: Dict[str, typing.Union[BeautifulSoup]]) -> None:
+ """Applies the given mapping template to the given soup."""
+ for css_sel, replace in template.items():
+ mapped = soup.select(css_sel)
+ if not mapped:
+ self.logger.warning(f"Could not find \"{css_sel}\", can not freeze anything.")
+ else:
+ self.replace(soup, css_sel, BeautifulSoup(replace, features=MappingAddonConfig.HTML_PARSER))
+
+ def response(self, flow: HTTPFlow) -> None:
+ """If a response is received, check if we should replace some content. """
+ try:
+ templates = self.mapping_templates[flow]
+ res = flow.response
+ if res is not None:
+ encoding = res.headers.get("content-encoding", "utf-8")
+ content_type = res.headers.get("content-type", "text/html")
+
+ if "text/html" in content_type and encoding == "utf-8":
+ content = BeautifulSoup(res.content, MappingAddonConfig.HTML_PARSER)
+ for template in templates:
+ self.apply_template(content, template)
+ res.content = content.encode(encoding)
+ else:
+ self.logger.warning(f"Unsupported content type '{content_type}' or content encoding '{encoding}'")
+ except KeyError:
+ pass
+
+ def done(self) -> None:
+ """Dumps all new content into the configuration file if self.persistent is set."""
+ if self.persistent:
+
+ # make sure that all items are strings and not soups.
+ def value_dumper(value):
+ store = {}
+ if value is None:
+ return "None"
+ try:
+ for css_sel, soup in value.items():
+ store[css_sel] = str(soup)
+ except:
+ raise RuntimeError(value)
+ return store
+
+ with open(self.filename, "w") as f:
+ self.mapping_templates.dump(f, value_dumper) \ No newline at end of file
diff --git a/examples/complex/webscanner_helper/urldict.py b/examples/complex/webscanner_helper/urldict.py
new file mode 100644
index 00000000..28e6b5e6
--- /dev/null
+++ b/examples/complex/webscanner_helper/urldict.py
@@ -0,0 +1,90 @@
+import itertools
+import json
+import typing
+from collections.abc import MutableMapping
+from typing import Any, Dict, Generator, List, TextIO, Callable
+
+from mitmproxy import flowfilter
+from mitmproxy.http import HTTPFlow
+
+
+def f_id(x):
+ return x
+
+
+class URLDict(MutableMapping):
+ """Data structure to store information using filters as keys."""
+ def __init__(self):
+ self.store: Dict[flowfilter.TFilter, Any] = {}
+
+ def __getitem__(self, key, *, count=0):
+ if count:
+ ret = itertools.islice(self.get_generator(key), 0, count)
+ else:
+ ret = list(self.get_generator(key))
+
+ if ret:
+ return ret
+ else:
+ raise KeyError
+
+ def __setitem__(self, key: str, value):
+ fltr = flowfilter.parse(key)
+ if fltr:
+ self.store.__setitem__(fltr, value)
+ else:
+ raise ValueError("Not a valid filter")
+
+ def __delitem__(self, key):
+ self.store.__delitem__(key)
+
+ def __iter__(self):
+ return self.store.__iter__()
+
+ def __len__(self):
+ return self.store.__len__()
+
+ def get_generator(self, flow: HTTPFlow) -> Generator[Any, None, None]:
+
+ for fltr, value in self.store.items():
+ if flowfilter.match(fltr, flow):
+ yield value
+
+ def get(self, flow: HTTPFlow, default=None, *, count=0) -> List[Any]:
+ try:
+ return self.__getitem__(flow, count=count)
+ except KeyError:
+ return default
+
+ @classmethod
+ def _load(cls, json_obj, value_loader: Callable = f_id):
+ url_dict = cls()
+ for fltr, value in json_obj.items():
+ url_dict[fltr] = value_loader(value)
+ return url_dict
+
+ @classmethod
+ def load(cls, f: TextIO, value_loader: Callable = f_id):
+ json_obj = json.load(f)
+ return cls._load(json_obj, value_loader)
+
+ @classmethod
+ def loads(cls, json_str: str, value_loader: Callable = f_id):
+ json_obj = json.loads(json_str)
+ return cls._load(json_obj, value_loader)
+
+ def _dump(self, value_dumper: Callable = f_id) -> Dict:
+ dumped: Dict[typing.Union[flowfilter.TFilter, str], Any] = {}
+ for fltr, value in self.store.items():
+ if hasattr(fltr, 'pattern'):
+ # cast necessary for mypy
+ dumped[typing.cast(Any, fltr).pattern] = value_dumper(value)
+ else:
+ dumped[str(fltr)] = value_dumper(value)
+ return dumped
+
+ def dump(self, f: TextIO, value_dumper: Callable = f_id):
+ json.dump(self._dump(value_dumper), f)
+
+ def dumps(self, value_dumper: Callable = f_id):
+ return json.dumps(self._dump(value_dumper))
diff --git a/examples/complex/webscanner_helper/urlindex.py b/examples/complex/webscanner_helper/urlindex.py
new file mode 100644
index 00000000..db8b1c56
--- /dev/null
+++ b/examples/complex/webscanner_helper/urlindex.py
@@ -0,0 +1,168 @@
+import abc
+import datetime
+import json
+import logging
+from pathlib import Path
+from typing import Type, Dict, Union, Optional
+
+from mitmproxy import flowfilter
+from mitmproxy.http import HTTPFlow
+
+logger = logging.getLogger(__name__)
+
+
+class UrlIndexWriter(abc.ABC):
+ """Abstract Add-on to write seen URLs.
+
+ For example, these URLs can be injected in a web application to improve the crawling of web application scanners.
+ The injection can be done using the URLInjection Add-on.
+ """
+
+ def __init__(self, filename: Path):
+ """Initializes the UrlIndexWriter.
+
+ Args:
+ filename: Path to file to which the URL index will be written.
+ """
+ self.filepath = filename
+
+ @abc.abstractmethod
+ def load(self):
+ """Load existing URL index."""
+ pass
+
+ @abc.abstractmethod
+ def add_url(self, flow: HTTPFlow):
+ """Add new URL to URL index."""
+ pass
+
+ @abc.abstractmethod
+ def save(self):
+ pass
+
+
+class SetEncoder(json.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, set):
+ return list(obj)
+ return json.JSONEncoder.default(self, obj)
+
+
+class JSONUrlIndexWriter(UrlIndexWriter):
+ """Writes seen URLs as JSON."""
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.host_urls = {}
+
+ def load(self):
+ if self.filepath.exists():
+ with self.filepath.open("r") as f:
+ self.host_urls = json.load(f)
+ for host in self.host_urls.keys():
+ for path, methods in self.host_urls[host].items():
+ for method, codes in methods.items():
+ self.host_urls[host][path] = {method: set(codes)}
+
+ def add_url(self, flow: HTTPFlow):
+ req = flow.request
+ res = flow.response
+
+ if req is not None and res is not None:
+ urls = self.host_urls.setdefault(f"{req.scheme}://{req.host}:{req.port}", dict())
+ methods = urls.setdefault(req.path, {})
+ codes = methods.setdefault(req.method, set())
+ codes.add(res.status_code)
+
+ def save(self):
+ with self.filepath.open("w") as f:
+ json.dump(self.host_urls, f, cls=SetEncoder)
+
+
+class TextUrlIndexWriter(UrlIndexWriter):
+ """Writes seen URLs as text."""
+
+ def load(self):
+ pass
+
+ def add_url(self, flow: HTTPFlow):
+ res = flow.response
+ req = flow.request
+ if res is not None and req is not None:
+ with self.filepath.open("a+") as f:
+ f.write(f"{datetime.datetime.utcnow().isoformat()} STATUS: {res.status_code} METHOD: "
+ f"{req.method} URL:{req.url}\n")
+
+ def save(self):
+ pass
+
+
+WRITER: Dict[str, Type[UrlIndexWriter]] = {
+ "json": JSONUrlIndexWriter,
+ "text": TextUrlIndexWriter,
+}
+
+
+def filter_404(flow) -> bool:
+ """Filters responses with status code 404."""
+ return flow.response.status_code != 404
+
+
+class UrlIndexAddon:
+ """Add-on to write seen URLs, either as JSON or as text.
+
+ For example, these URLs can be injected in a web application to improve the crawling of web application scanners.
+ The injection can be done using the URLInjection Add-on.
+ """
+
+ index_filter: Optional[Union[str, flowfilter.TFilter]]
+ writer: UrlIndexWriter
+
+ OPT_FILEPATH = "URLINDEX_FILEPATH"
+ OPT_APPEND = "URLINDEX_APPEND"
+ OPT_INDEX_FILTER = "URLINDEX_FILTER"
+
+ def __init__(self, file_path: Union[str, Path], append: bool = True,
+ index_filter: Union[str, flowfilter.TFilter] = filter_404, index_format: str = "json"):
+ """ Initializes the urlindex add-on.
+
+ Args:
+ file_path: Path to file to which the URL index will be written. Can either be given as str or Path.
+ append: Bool to decide whether to append new URLs to the given file (as opposed to overwrite the contents
+ of the file)
+ index_filer: A mitmproxy filter with which the seen URLs will be filtered before being written. Can either
+ be given as str or as flowfilter.TFilter
+ index_format: The format of the URL index, can either be "json" or "text".
+ """
+
+ if isinstance(index_filter, str):
+ self.index_filter = flowfilter.parse(index_filter)
+ if self.index_filter is None:
+ raise ValueError("Invalid filter expression.")
+ else:
+ self.index_filter = index_filter
+
+ file_path = Path(file_path)
+ try:
+ self.writer = WRITER[index_format.lower()](file_path)
+ except KeyError:
+ raise ValueError(f"Format '{index_format}' is not supported.")
+
+ if not append and file_path.exists():
+ file_path.unlink()
+
+ self.writer.load()
+
+ def response(self, flow: HTTPFlow):
+ """Checks if the response should be included in the URL based on the index_filter and adds it to the URL index
+ if appropriate.
+ """
+ if isinstance(self.index_filter, str) or self.index_filter is None:
+ raise ValueError("Invalid filter expression.")
+ else:
+ if self.index_filter(flow):
+ self.writer.add_url(flow)
+
+ def done(self):
+ """Writes the URL index."""
+ self.writer.save()
diff --git a/examples/complex/webscanner_helper/urlinjection.py b/examples/complex/webscanner_helper/urlinjection.py
new file mode 100644
index 00000000..b62eca2b
--- /dev/null
+++ b/examples/complex/webscanner_helper/urlinjection.py
@@ -0,0 +1,173 @@
+import abc
+import html
+import json
+import logging
+
+from mitmproxy import flowfilter
+from mitmproxy.http import HTTPFlow
+
+logger = logging.getLogger(__name__)
+
+
+class InjectionGenerator:
+ """Abstract class for an generator of the injection content in order to inject the URL index."""
+ ENCODING = "UTF8"
+
+ @abc.abstractmethod
+ def inject(self, index, flow: HTTPFlow):
+ """Injects the given URL index into the given flow."""
+ pass
+
+
+class HTMLInjection(InjectionGenerator):
+ """Injects the URL index either by creating a new HTML page or by appending is to an existing page."""
+
+ def __init__(self, insert: bool = False):
+ """Initializes the HTMLInjection.
+
+ Args:
+ insert: boolean to decide whether to insert the URL index to an existing page (True) or to create a new
+ page containing the URL index.
+ """
+ self.insert = insert
+
+ @classmethod
+ def _form_html(cls, url):
+ return f"<form action=\"{url}\" method=\"POST\"></form>"
+
+ @classmethod
+ def _link_html(cls, url):
+ return f"<a href=\"{url}\">link to {url}</a>"
+
+ @classmethod
+ def index_html(cls, index):
+ link_htmls = []
+ for scheme_netloc, paths in index.items():
+ for path, methods in paths.items():
+ url = scheme_netloc + path
+ if "POST" in methods:
+ link_htmls.append(cls._form_html(url))
+
+ if "GET" in methods:
+ link_htmls.append(cls._link_html(url))
+ return "</ br>".join(link_htmls)
+
+ @classmethod
+ def landing_page(cls, index):
+ return (
+ "<head><meta charset=\"UTF-8\"></head><body>"
+ + cls.index_html(index)
+ + "</body>"
+ )
+
+ def inject(self, index, flow: HTTPFlow):
+ if flow.response is not None:
+ if flow.response.status_code != 404 and not self.insert:
+ logger.warning(
+ f"URL '{flow.request.url}' didn't return 404 status, "
+ f"index page would overwrite valid page.")
+ elif self.insert:
+ content = (flow.response
+ .content
+ .decode(self.ENCODING, "backslashreplace"))
+ if "</body>" in content:
+ content = content.replace("</body>", self.index_html(index) + "</body>")
+ else:
+ content += self.index_html(index)
+ flow.response.content = content.encode(self.ENCODING)
+ else:
+ flow.response.content = (self.landing_page(index)
+ .encode(self.ENCODING))
+
+
+class RobotsInjection(InjectionGenerator):
+ """Injects the URL index by creating a new robots.txt including the URLs."""
+
+ def __init__(self, directive="Allow"):
+ self.directive = directive
+
+ @classmethod
+ def robots_txt(cls, index, directive="Allow"):
+ lines = ["User-agent: *"]
+ for scheme_netloc, paths in index.items():
+ for path, methods in paths.items():
+ lines.append(directive + ": " + path)
+ return "\n".join(lines)
+
+ def inject(self, index, flow: HTTPFlow):
+ if flow.response is not None:
+ if flow.response.status_code != 404:
+ logger.warning(
+ f"URL '{flow.request.url}' didn't return 404 status, "
+ f"index page would overwrite valid page.")
+ else:
+ flow.response.content = self.robots_txt(index,
+ self.directive).encode(
+ self.ENCODING)
+
+
+class SitemapInjection(InjectionGenerator):
+ """Injects the URL index by creating a new sitemap including the URLs."""
+
+ @classmethod
+ def sitemap(cls, index):
+ lines = [
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?><urlset xmlns=\"http://www.sitemaps.org/schemas/sitemap/0.9\">"]
+ for scheme_netloc, paths in index.items():
+ for path, methods in paths.items():
+ url = scheme_netloc + path
+ lines.append(f"<url><loc>{html.escape(url)}</loc></url>")
+ lines.append("</urlset>")
+ return "\n".join(lines)
+
+ def inject(self, index, flow: HTTPFlow):
+ if flow.response is not None:
+ if flow.response.status_code != 404:
+ logger.warning(
+ f"URL '{flow.request.url}' didn't return 404 status, "
+ f"index page would overwrite valid page.")
+ else:
+ flow.response.content = self.sitemap(index).encode(self.ENCODING)
+
+
+class UrlInjectionAddon:
+ """ The UrlInjection add-on can be used in combination with web application scanners to improve their crawling
+ performance.
+
+ The given URls will be injected into the web application. With this, web application scanners can find pages to
+ crawl much easier. Depending on the Injection generator, the URLs will be injected at different places of the
+ web application. It is possible to create a landing page which includes the URL (HTMLInjection()), to inject the
+ URLs to an existing page (HTMLInjection(insert=True)), to create a robots.txt containing the URLs
+ (RobotsInjection()) or to create a sitemap.xml which includes the URLS (SitemapInjection()).
+ It is necessary that the web application scanner can find the newly created page containing the URL index. For
+ example, the newly created page can be set as starting point for the web application scanner.
+ The URL index needed for the injection can be generated by the UrlIndex Add-on.
+ """
+
+ def __init__(self, flt: str, url_index_file: str,
+ injection_gen: InjectionGenerator):
+ """Initializes the UrlIndex add-on.
+
+ Args:
+ flt: mitmproxy filter to decide on which pages the URLs will be injected (str).
+ url_index_file: Path to the file which includes the URL index in JSON format (e.g. generated by the UrlIndexAddon), given
+ as str.
+ injection_gen: InjectionGenerator that should be used to inject the URLs into the web application.
+ """
+ self.name = f"{self.__class__.__name__}-{injection_gen.__class__.__name__}-{self.__hash__()}"
+ self.flt = flowfilter.parse(flt)
+ self.injection_gen = injection_gen
+ with open(url_index_file, "r") as f:
+ self.url_store = json.load(f)
+
+ def response(self, flow: HTTPFlow):
+ """Checks if the response matches the filter and such should be injected.
+ Injects the URL index if appropriate.
+ """
+ if flow.response is not None:
+ if self.flt is not None and self.flt(flow):
+ self.injection_gen.inject(self.url_store, flow)
+ flow.response.status_code = 200
+ flow.response.headers["content-type"] = "text/html"
+ logger.debug(f"Set status code to 200 and set content to logged "
+ f"urls. Method: {self.injection_gen}")
diff --git a/examples/complex/webscanner_helper/watchdog.py b/examples/complex/webscanner_helper/watchdog.py
new file mode 100644
index 00000000..867d2196
--- /dev/null
+++ b/examples/complex/webscanner_helper/watchdog.py
@@ -0,0 +1,71 @@
+import pathlib
+import time
+import typing
+import logging
+from datetime import datetime
+
+import mitmproxy.connections
+import mitmproxy.http
+from mitmproxy.addons.export import curl_command, raw
+from mitmproxy.exceptions import HttpSyntaxException
+
+logger = logging.getLogger(__name__)
+
+
+class WatchdogAddon():
+ """ The Watchdog Add-on can be used in combination with web application scanners in oder to check if the device
+ under test responds correctls to the scanner's responses.
+
+ The Watchdog Add-on checks if the device under test responds correctly to the scanner's responses.
+ If the Watchdog sees that the DUT is no longer responding correctly, an multiprocessing event is set.
+ This information can be used to restart the device under test if necessary.
+ """
+
+ def __init__(self, event, outdir: pathlib.Path, timeout=None):
+ """Initializes the Watchdog.
+
+ Args:
+ event: multiprocessing.Event that will be set if the watchdog is triggered.
+ outdir: path to a directory in which the triggering requests will be saved (curl and raw).
+ timeout_conn: float that specifies the timeout for the server connection
+ """
+ self.error_event = event
+ self.flow_dir = outdir
+ if self.flow_dir.exists() and not self.flow_dir.is_dir():
+ raise RuntimeError("Watchtdog output path must be a directory.")
+ elif not self.flow_dir.exists():
+ self.flow_dir.mkdir(parents=True)
+ self.last_trigger: typing.Union[None, float] = None
+ self.timeout: typing.Union[None, float] = timeout
+
+ def serverconnect(self, conn: mitmproxy.connections.ServerConnection):
+ if self.timeout is not None:
+ conn.settimeout(self.timeout)
+
+ @classmethod
+ def not_in_timeout(cls, last_triggered, timeout):
+ """Checks if current error lies not in timeout after last trigger (potential reset of connection)."""
+ return last_triggered is None or timeout is None or (time.time() - last_triggered > timeout)
+
+ def error(self, flow):
+ """ Checks if the watchdog will be triggered.
+
+ Only triggers watchdog for timeouts after last reset and if flow.error is set (shows that error is a server
+ error). Ignores HttpSyntaxException Errors since this can be triggered on purpose by web application scanner.
+
+ Args:
+ flow: mitmproxy.http.flow
+ """
+ if (self.not_in_timeout(self.last_trigger, self.timeout)
+ and flow.error is not None and not isinstance(flow.error, HttpSyntaxException)):
+
+ self.last_trigger = time.time()
+ logger.error(f"Watchdog triggered! Cause: {flow}")
+ self.error_event.set()
+
+ # save the request which might have caused the problem
+ if flow.request:
+ with (self.flow_dir / f"{datetime.utcnow().isoformat()}.curl").open("w") as f:
+ f.write(curl_command(flow))
+ with (self.flow_dir / f"{datetime.utcnow().isoformat()}.raw").open("wb") as f:
+ f.write(raw(flow))
diff --git a/test/examples/webscanner_helper/__init__.py b/test/examples/webscanner_helper/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/examples/webscanner_helper/__init__.py
diff --git a/test/examples/webscanner_helper/test_mapping.py b/test/examples/webscanner_helper/test_mapping.py
new file mode 100644
index 00000000..e4d519fc
--- /dev/null
+++ b/test/examples/webscanner_helper/test_mapping.py
@@ -0,0 +1,165 @@
+from typing import TextIO, Callable
+from unittest import mock
+from unittest.mock import MagicMock
+
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+
+from examples.complex.webscanner_helper.mapping import MappingAddon, MappingAddonConfig
+
+
+class TestConfig:
+
+ def test_config(self):
+ assert MappingAddonConfig.HTML_PARSER == "html.parser"
+
+
+url = "http://10.10.10.10"
+new_content = "My Text"
+mapping_content = f'{{"{url}": {{"body": "{new_content}"}}}}'
+
+
+class TestMappingAddon:
+
+ def test_init(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ assert "My Text" in str(mapping.mapping_templates._dump())
+
+ def test_load(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ loader = MagicMock()
+
+ mapping.load(loader)
+ assert 'mapping_file' in str(loader.add_option.call_args_list)
+ assert 'map_persistent' in str(loader.add_option.call_args_list)
+
+ def test_configure(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ new_filename = "My new filename"
+ updated = {str(mapping.OPT_MAPPING_FILE): new_filename, str(mapping.OPT_MAP_PERSISTENT): True}
+
+ open_mock = mock.mock_open(read_data="{}")
+ with mock.patch("builtins.open", open_mock):
+ mapping.configure(updated)
+ assert new_filename in str(open_mock.mock_calls)
+ assert mapping.filename == new_filename
+ assert mapping.persistent
+
+ def test_response_filtered(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ test_content = b"Test"
+ f.response.content = test_content
+
+ mapping.response(f)
+ assert f.response.content == test_content
+
+ def test_response(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ test_content = b"<body> Test </body>"
+ f.response.content = test_content
+ f.request.url = url
+
+ mapping.response(f)
+ assert f.response.content.decode("utf-8") == new_content
+
+ def test_response_content_type(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ test_content = b"<body> Test </body>"
+ f.response.content = test_content
+ f.request.url = url
+ f.response.headers.add("content-type", "content-type")
+
+ mapping.response(f)
+ assert f.response.content == test_content
+
+ def test_response_not_existing(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ test_content = b"<title> Test </title>"
+ f.response.content = test_content
+ f.request.url = url
+ mapping.response(f)
+ assert f.response.content == test_content
+
+ def test_persistance_false(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+
+ open_mock = mock.mock_open(read_data="{}")
+ with mock.patch("builtins.open", open_mock):
+ mapping.done()
+ assert len(open_mock.mock_calls) == 0
+
+ def test_persistance_true(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile, persistent=True)
+
+ open_mock = mock.mock_open(read_data="{}")
+ with mock.patch("builtins.open", open_mock):
+ mapping.done()
+ with open(tmpfile, "r") as tfile:
+ results = tfile.read()
+ assert len(open_mock.mock_calls) != 0
+ assert results == mapping_content
+
+ def test_persistance_true_add_content(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile, persistent=True)
+
+ f = tflow.tflow(resp=tutils.tresp())
+ test_content = b"<title> Test </title>"
+ f.response.content = test_content
+ f.request.url = url
+
+ mapping.response(f)
+ mapping.done()
+ with open(tmpfile, "r") as tfile:
+ results = tfile.read()
+ assert mapping_content in results
+
+ def mock_dump(self, f: TextIO, value_dumper: Callable):
+ assert value_dumper(None) == "None"
+ try:
+ value_dumper("Test")
+ except RuntimeError:
+ assert True
+ else:
+ assert False
+
+ def test_dump(selfself, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("{}")
+ mapping = MappingAddon(tmpfile, persistent=True)
+ with mock.patch('examples.complex.webscanner_helper.urldict.URLDict.dump', selfself.mock_dump):
+ mapping.done()
diff --git a/test/examples/webscanner_helper/test_urldict.py b/test/examples/webscanner_helper/test_urldict.py
new file mode 100644
index 00000000..7bd4fb01
--- /dev/null
+++ b/test/examples/webscanner_helper/test_urldict.py
@@ -0,0 +1,89 @@
+from mitmproxy.test import tflow, tutils
+from examples.complex.webscanner_helper.urldict import URLDict
+
+url = "http://10.10.10.10"
+new_content_body = "New Body"
+new_content_title = "New Title"
+content = f'{{"body": "{new_content_body}", "title": "{new_content_title}"}}'
+url_error = "i~nvalid"
+input_file_content = f'{{"{url}": {content}}}'
+input_file_content_error = f'{{"{url_error}": {content}}}'
+
+
+class TestUrlDict:
+
+ def test_urldict_empty(self):
+ urldict = URLDict()
+ dump = urldict.dumps()
+ assert dump == '{}'
+
+ def test_urldict_loads(self):
+ urldict = URLDict.loads(input_file_content)
+ dump = urldict.dumps()
+ assert dump == input_file_content
+
+ def test_urldict_set_error(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(input_file_content_error)
+ with open(tmpfile, "r") as tfile:
+ try:
+ URLDict.load(tfile)
+ except ValueError:
+ assert True
+ else:
+ assert False
+
+ def test_urldict_get(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(input_file_content)
+ with open(tmpfile, "r") as tfile:
+ urldict = URLDict.load(tfile)
+
+ f = tflow.tflow(resp=tutils.tresp())
+ f.request.url = url
+ selection = urldict[f]
+ assert "body" in selection[0]
+ assert new_content_body in selection[0]["body"]
+ assert "title" in selection[0]
+ assert new_content_title in selection[0]["title"]
+
+ selection_get = urldict.get(f)
+ assert "body" in selection_get[0]
+ assert new_content_body in selection_get[0]["body"]
+ assert "title" in selection_get[0]
+ assert new_content_title in selection_get[0]["title"]
+
+ try:
+ urldict["body"]
+ except KeyError:
+ assert True
+ else:
+ assert False
+
+ assert urldict.get("body", default="default") == "default"
+
+ def test_urldict_dumps(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(input_file_content)
+ with open(tmpfile, "r") as tfile:
+ urldict = URLDict.load(tfile)
+
+ dump = urldict.dumps()
+ assert dump == input_file_content
+
+ def test_urldict_dump(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ outfile = tmpdir.join("outfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(input_file_content)
+ with open(tmpfile, "r") as tfile:
+ urldict = URLDict.load(tfile)
+ with open(outfile, "w") as ofile:
+ urldict.dump(ofile)
+
+ with open(outfile, "r") as ofile:
+ output = ofile.read()
+ assert output == input_file_content
diff --git a/test/examples/webscanner_helper/test_urlindex.py b/test/examples/webscanner_helper/test_urlindex.py
new file mode 100644
index 00000000..0edd6cc0
--- /dev/null
+++ b/test/examples/webscanner_helper/test_urlindex.py
@@ -0,0 +1,234 @@
+import json
+from json import JSONDecodeError
+from pathlib import Path
+from unittest import mock
+from typing import List
+from unittest.mock import patch
+
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+
+from examples.complex.webscanner_helper.urlindex import UrlIndexWriter, SetEncoder, JSONUrlIndexWriter, TextUrlIndexWriter, WRITER, \
+ filter_404, \
+ UrlIndexAddon
+
+
+class TestBaseClass:
+
+ @patch.multiple(UrlIndexWriter, __abstractmethods__=set())
+ def test_base_class(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ index_writer = UrlIndexWriter(tmpfile)
+ index_writer.load()
+ index_writer.add_url(tflow.tflow())
+ index_writer.save()
+
+
+class TestSetEncoder:
+
+ def test_set_encoder_set(self):
+ test_set = {"foo", "bar", "42"}
+ result = SetEncoder.default(SetEncoder(), test_set)
+ assert isinstance(result, List)
+ assert 'foo' in result
+ assert 'bar' in result
+ assert '42' in result
+
+ def test_set_encoder_str(self):
+ test_str = "test"
+ try:
+ SetEncoder.default(SetEncoder(), test_str)
+ except TypeError:
+ assert True
+ else:
+ assert False
+
+
+class TestJSONUrlIndexWriter:
+
+ def test_load(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(
+ "{\"http://example.com:80\": {\"/\": {\"GET\": [301]}}, \"http://www.example.com:80\": {\"/\": {\"GET\": [302]}}}")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert 'http://example.com:80' in writer.host_urls
+ assert '/' in writer.host_urls['http://example.com:80']
+ assert 'GET' in writer.host_urls['http://example.com:80']['/']
+ assert 301 in writer.host_urls['http://example.com:80']['/']['GET']
+
+ def test_load_empty(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("{}")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert len(writer.host_urls) == 0
+
+ def test_load_nonexisting(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert len(writer.host_urls) == 0
+
+ def test_add(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ url = f"{f.request.scheme}://{f.request.host}:{f.request.port}"
+ writer.add_url(f)
+ assert url in writer.host_urls
+ assert f.request.path in writer.host_urls[url]
+
+ def test_save(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ url = f"{f.request.scheme}://{f.request.host}:{f.request.port}"
+ writer.add_url(f)
+ writer.save()
+
+ with open(tmpfile, "r") as results:
+ try:
+ content = json.load(results)
+ except JSONDecodeError:
+ assert False
+ assert url in content
+
+
+class TestTestUrlIndexWriter:
+ def test_load(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(
+ "2020-04-22T05:41:08.679231 STATUS: 200 METHOD: GET URL:http://example.com")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert True
+
+ def test_load_empty(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("{}")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert True
+
+ def test_load_nonexisting(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert True
+
+ def test_add(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ url = f"{f.request.scheme}://{f.request.host}:{f.request.port}"
+ method = f.request.method
+ code = f.response.status_code
+ writer.add_url(f)
+
+ with open(tmpfile, "r") as results:
+ content = results.read()
+ assert url in content
+ assert method in content
+ assert str(code) in content
+
+ def test_save(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ url = f"{f.request.scheme}://{f.request.host}:{f.request.port}"
+ method = f.request.method
+ code = f.response.status_code
+ writer.add_url(f)
+ writer.save()
+
+ with open(tmpfile, "r") as results:
+ content = results.read()
+ assert url in content
+ assert method in content
+ assert str(code) in content
+
+
+class TestWriter:
+ def test_writer_dict(self):
+ assert "json" in WRITER
+ assert isinstance(WRITER["json"], JSONUrlIndexWriter.__class__)
+ assert "text" in WRITER
+ assert isinstance(WRITER["text"], TextUrlIndexWriter.__class__)
+
+
+class TestFilter:
+ def test_filer_true(self):
+ f = tflow.tflow(resp=tutils.tresp())
+ assert filter_404(f)
+
+ def test_filter_false(self):
+ f = tflow.tflow(resp=tutils.tresp())
+ f.response.status_code = 404
+ assert not filter_404(f)
+
+
+class TestUrlIndexAddon:
+
+ def test_init(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ UrlIndexAddon(tmpfile)
+
+ def test_init_format(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ try:
+ UrlIndexAddon(tmpfile, index_format="test")
+ except ValueError:
+ assert True
+ else:
+ assert False
+
+ def test_init_filter(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ try:
+ UrlIndexAddon(tmpfile, index_filter="i~nvalid")
+ except ValueError:
+ assert True
+ else:
+ assert False
+
+ def test_init_append(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("")
+ url_index = UrlIndexAddon(tmpfile, append=False)
+ f = tflow.tflow(resp=tutils.tresp())
+ with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.add_url'):
+ url_index.response(f)
+ assert not Path(tmpfile).exists()
+
+ def test_response(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ url_index = UrlIndexAddon(tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.add_url') as mock_add_url:
+ url_index.response(f)
+ mock_add_url.assert_called()
+
+ def test_response_None(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ url_index = UrlIndexAddon(tmpfile)
+ url_index.index_filter = None
+ f = tflow.tflow(resp=tutils.tresp())
+ try:
+ url_index.response(f)
+ except ValueError:
+ assert True
+ else:
+ assert False
+
+ def test_done(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ url_index = UrlIndexAddon(tmpfile)
+ with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.save') as mock_save:
+ url_index.done()
+ mock_save.assert_called()
diff --git a/test/examples/webscanner_helper/test_urlinjection.py b/test/examples/webscanner_helper/test_urlinjection.py
new file mode 100644
index 00000000..4b87296d
--- /dev/null
+++ b/test/examples/webscanner_helper/test_urlinjection.py
@@ -0,0 +1,111 @@
+import json
+from unittest import mock
+
+from mitmproxy import flowfilter
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+
+from examples.complex.webscanner_helper.urlinjection import InjectionGenerator, HTMLInjection, RobotsInjection, SitemapInjection, \
+ UrlInjectionAddon, logger
+
+index = json.loads(
+ "{\"http://example.com:80\": {\"/\": {\"GET\": [301]}}, \"http://www.example.com:80\": {\"/test\": {\"POST\": [302]}}}")
+
+
+class TestInjectionGenerator:
+
+ def test_inject(self):
+ f = tflow.tflow(resp=tutils.tresp())
+ injection_generator = InjectionGenerator()
+ injection_generator.inject(index=index, flow=f)
+ assert True
+
+
+class TestHTMLInjection:
+
+ def test_inject_not404(self):
+ html_injection = HTMLInjection()
+ f = tflow.tflow(resp=tutils.tresp())
+
+ with mock.patch.object(logger, 'warning') as mock_warning:
+ html_injection.inject(index, f)
+ assert mock_warning.called
+
+ def test_inject_insert(self):
+ html_injection = HTMLInjection(insert=True)
+ f = tflow.tflow(resp=tutils.tresp())
+ assert "example.com" not in str(f.response.content)
+ html_injection.inject(index, f)
+ assert "example.com" in str(f.response.content)
+
+ def test_inject_insert_body(self):
+ html_injection = HTMLInjection(insert=True)
+ f = tflow.tflow(resp=tutils.tresp())
+ f.response.text = "<body></body>"
+ assert "example.com" not in str(f.response.content)
+ html_injection.inject(index, f)
+ assert "example.com" in str(f.response.content)
+
+ def test_inject_404(self):
+ html_injection = HTMLInjection()
+ f = tflow.tflow(resp=tutils.tresp())
+ f.response.status_code = 404
+ assert "example.com" not in str(f.response.content)
+ html_injection.inject(index, f)
+ assert "example.com" in str(f.response.content)
+
+
+class TestRobotsInjection:
+
+ def test_inject_not404(self):
+ robots_injection = RobotsInjection()
+ f = tflow.tflow(resp=tutils.tresp())
+
+ with mock.patch.object(logger, 'warning') as mock_warning:
+ robots_injection.inject(index, f)
+ assert mock_warning.called
+
+ def test_inject_404(self):
+ robots_injection = RobotsInjection()
+ f = tflow.tflow(resp=tutils.tresp())
+ f.response.status_code = 404
+ assert "Allow: /test" not in str(f.response.content)
+ robots_injection.inject(index, f)
+ assert "Allow: /test" in str(f.response.content)
+
+
+class TestSitemapInjection:
+
+ def test_inject_not404(self):
+ sitemap_injection = SitemapInjection()
+ f = tflow.tflow(resp=tutils.tresp())
+
+ with mock.patch.object(logger, 'warning') as mock_warning:
+ sitemap_injection.inject(index, f)
+ assert mock_warning.called
+
+ def test_inject_404(self):
+ sitemap_injection = SitemapInjection()
+ f = tflow.tflow(resp=tutils.tresp())
+ f.response.status_code = 404
+ assert "<url><loc>http://example.com:80/</loc></url>" not in str(f.response.content)
+ sitemap_injection.inject(index, f)
+ assert "<url><loc>http://example.com:80/</loc></url>" in str(f.response.content)
+
+
+class TestUrlInjectionAddon:
+
+ def test_init(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ json.dump(index, tfile)
+ flt = f"~u .*/site.html$"
+ url_injection = UrlInjectionAddon(f"~u .*/site.html$", tmpfile, HTMLInjection(insert=True))
+ assert "http://example.com:80" in url_injection.url_store
+ fltr = flowfilter.parse(flt)
+ f = tflow.tflow(resp=tutils.tresp())
+ f.request.url = "http://example.com/site.html"
+ assert fltr(f)
+ assert "http://example.com:80" not in str(f.response.content)
+ url_injection.response(f)
+ assert "http://example.com:80" in str(f.response.content)
diff --git a/test/examples/webscanner_helper/test_watchdog.py b/test/examples/webscanner_helper/test_watchdog.py
new file mode 100644
index 00000000..43e59310
--- /dev/null
+++ b/test/examples/webscanner_helper/test_watchdog.py
@@ -0,0 +1,84 @@
+import time
+from pathlib import Path
+from unittest import mock
+
+from mitmproxy.connections import ServerConnection
+from mitmproxy.exceptions import HttpSyntaxException
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+import multiprocessing
+
+from examples.complex.webscanner_helper.watchdog import WatchdogAddon, logger
+
+
+class TestWatchdog:
+
+ def test_init_file(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("")
+ event = multiprocessing.Event()
+ try:
+ WatchdogAddon(event, Path(tmpfile))
+ except RuntimeError:
+ assert True
+ else:
+ assert False
+
+ def test_init_dir(self, tmpdir):
+ event = multiprocessing.Event()
+ mydir = tmpdir.join("mydir")
+ assert not Path(mydir).exists()
+ WatchdogAddon(event, Path(mydir))
+ assert Path(mydir).exists()
+
+ def test_serverconnect(self, tmpdir):
+ event = multiprocessing.Event()
+ w = WatchdogAddon(event, Path(tmpdir), timeout=10)
+ with mock.patch('mitmproxy.connections.ServerConnection.settimeout') as mock_set_timeout:
+ w.serverconnect(ServerConnection("127.0.0.1"))
+ mock_set_timeout.assert_called()
+
+ def test_serverconnect_None(self, tmpdir):
+ event = multiprocessing.Event()
+ w = WatchdogAddon(event, Path(tmpdir))
+ with mock.patch('mitmproxy.connections.ServerConnection.settimeout') as mock_set_timeout:
+ w.serverconnect(ServerConnection("127.0.0.1"))
+ assert not mock_set_timeout.called
+
+ def test_trigger(self, tmpdir):
+ event = multiprocessing.Event()
+ w = WatchdogAddon(event, Path(tmpdir))
+ f = tflow.tflow(resp=tutils.tresp())
+ f.error = "Test Error"
+
+ with mock.patch.object(logger, 'error') as mock_error:
+ open_mock = mock.mock_open()
+ with mock.patch("pathlib.Path.open", open_mock, create=True):
+ w.error(f)
+ mock_error.assert_called()
+ open_mock.assert_called()
+
+ def test_trigger_http_synatx(self, tmpdir):
+ event = multiprocessing.Event()
+ w = WatchdogAddon(event, Path(tmpdir))
+ f = tflow.tflow(resp=tutils.tresp())
+ f.error = HttpSyntaxException()
+ assert isinstance(f.error, HttpSyntaxException)
+
+ with mock.patch.object(logger, 'error') as mock_error:
+ open_mock = mock.mock_open()
+ with mock.patch("pathlib.Path.open", open_mock, create=True):
+ w.error(f)
+ assert not mock_error.called
+ assert not open_mock.called
+
+ def test_timeout(self, tmpdir):
+ event = multiprocessing.Event()
+ w = WatchdogAddon(event, Path(tmpdir))
+
+ assert w.not_in_timeout(None, None)
+ assert w.not_in_timeout(time.time, None)
+ with mock.patch('time.time', return_value=5):
+ assert not w.not_in_timeout(3, 20)
+ assert w.not_in_timeout(3, 1)