From 7fdcbb09e6034ab1f76724965cfdf45f3d775129 Mon Sep 17 00:00:00 2001 From: anneborcherding <55282902+anneborcherding@users.noreply.github.com> Date: Mon, 4 May 2020 10:37:13 +0200 Subject: added add-ons that enhance the performance of web application scanners. (#3961) * added add-ons that enhance the performance of web application scanners. Co-authored-by: weichweich <14820950+weichweich@users.noreply.github.com> --- examples/complex/webscanner_helper/__init__.py | 0 examples/complex/webscanner_helper/mapping.py | 144 +++++++++++++ examples/complex/webscanner_helper/urldict.py | 90 ++++++++ examples/complex/webscanner_helper/urlindex.py | 168 +++++++++++++++ examples/complex/webscanner_helper/urlinjection.py | 173 +++++++++++++++ examples/complex/webscanner_helper/watchdog.py | 71 +++++++ test/examples/webscanner_helper/__init__.py | 0 test/examples/webscanner_helper/test_mapping.py | 165 +++++++++++++++ test/examples/webscanner_helper/test_urldict.py | 89 ++++++++ test/examples/webscanner_helper/test_urlindex.py | 234 +++++++++++++++++++++ .../webscanner_helper/test_urlinjection.py | 111 ++++++++++ test/examples/webscanner_helper/test_watchdog.py | 84 ++++++++ 12 files changed, 1329 insertions(+) create mode 100644 examples/complex/webscanner_helper/__init__.py create mode 100644 examples/complex/webscanner_helper/mapping.py create mode 100644 examples/complex/webscanner_helper/urldict.py create mode 100644 examples/complex/webscanner_helper/urlindex.py create mode 100644 examples/complex/webscanner_helper/urlinjection.py create mode 100644 examples/complex/webscanner_helper/watchdog.py create mode 100644 test/examples/webscanner_helper/__init__.py create mode 100644 test/examples/webscanner_helper/test_mapping.py create mode 100644 test/examples/webscanner_helper/test_urldict.py create mode 100644 test/examples/webscanner_helper/test_urlindex.py create mode 100644 test/examples/webscanner_helper/test_urlinjection.py create mode 100644 test/examples/webscanner_helper/test_watchdog.py diff --git a/examples/complex/webscanner_helper/__init__.py b/examples/complex/webscanner_helper/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/complex/webscanner_helper/mapping.py b/examples/complex/webscanner_helper/mapping.py new file mode 100644 index 00000000..8c83bf77 --- /dev/null +++ b/examples/complex/webscanner_helper/mapping.py @@ -0,0 +1,144 @@ +import copy +import logging +import typing +from typing import Dict + +from bs4 import BeautifulSoup + +from mitmproxy.http import HTTPFlow +from examples.complex.webscanner_helper.urldict import URLDict + +NO_CONTENT = object() + + +class MappingAddonConfig: + HTML_PARSER = "html.parser" + + +class MappingAddon: + """ The mapping add-on can be used in combination with web application scanners to reduce their false positives. + + Many web application scanners produce false positives caused by dynamically changing content of web applications + such as the current time or current measurements. When testing for injection vulnerabilities, web application + scanners are tricked into thinking they changed the content with the injected payload. In realty, the content of + the web application changed notwithstanding the scanner's input. When the mapping add-on is used to map the content + to a fixed value, these false positives can be avoided. + """ + + OPT_MAPPING_FILE = "mapping_file" + """File where urls and css selector to mapped content is stored. + + Elements will be replaced with the content given in this file. If the content is none it will be set to the first + seen value. + + Example: + + { + "http://10.10.10.10": { + "body": "My Text" + }, + "URL": { + "css selector": "Replace with this" + } + } + """ + + OPT_MAP_PERSISTENT = "map_persistent" + """Whether to store all new content in the configuration file.""" + + def __init__(self, filename: str, persistent: bool = False) -> None: + """ Initializes the mapping add-on + + Args: + filename: str that provides the name of the file in which the urls and css selectors to mapped content is + stored. + persistent: bool that indicates whether to store all new content in the configuration file. + + Example: + The file in which the mapping config is given should be in the following format: + { + "http://10.10.10.10": { + "body": "My Text" + }, + "": { + "": "Replace with this" + } + } + """ + self.filename = filename + self.persistent = persistent + self.logger = logging.getLogger(self.__class__.__name__) + with open(filename, "r") as f: + self.mapping_templates = URLDict.load(f) + + def load(self, loader): + loader.add_option( + self.OPT_MAPPING_FILE, str, "", + "File where replacement configuration is stored." + ) + loader.add_option( + self.OPT_MAP_PERSISTENT, bool, False, + "Whether to store all new content in the configuration file." + ) + + def configure(self, updated): + if self.OPT_MAPPING_FILE in updated: + self.filename = updated[self.OPT_MAPPING_FILE] + with open(self.filename, "r") as f: + self.mapping_templates = URLDict.load(f) + + if self.OPT_MAP_PERSISTENT in updated: + self.persistent = updated[self.OPT_MAP_PERSISTENT] + + def replace(self, soup: BeautifulSoup, css_sel: str, replace: BeautifulSoup) -> None: + """Replaces the content of soup that matches the css selector with the given replace content.""" + for content in soup.select(css_sel): + self.logger.debug(f"replace \"{content}\" with \"{replace}\"") + content.replace_with(copy.copy(replace)) + + def apply_template(self, soup: BeautifulSoup, template: Dict[str, typing.Union[BeautifulSoup]]) -> None: + """Applies the given mapping template to the given soup.""" + for css_sel, replace in template.items(): + mapped = soup.select(css_sel) + if not mapped: + self.logger.warning(f"Could not find \"{css_sel}\", can not freeze anything.") + else: + self.replace(soup, css_sel, BeautifulSoup(replace, features=MappingAddonConfig.HTML_PARSER)) + + def response(self, flow: HTTPFlow) -> None: + """If a response is received, check if we should replace some content. """ + try: + templates = self.mapping_templates[flow] + res = flow.response + if res is not None: + encoding = res.headers.get("content-encoding", "utf-8") + content_type = res.headers.get("content-type", "text/html") + + if "text/html" in content_type and encoding == "utf-8": + content = BeautifulSoup(res.content, MappingAddonConfig.HTML_PARSER) + for template in templates: + self.apply_template(content, template) + res.content = content.encode(encoding) + else: + self.logger.warning(f"Unsupported content type '{content_type}' or content encoding '{encoding}'") + except KeyError: + pass + + def done(self) -> None: + """Dumps all new content into the configuration file if self.persistent is set.""" + if self.persistent: + + # make sure that all items are strings and not soups. + def value_dumper(value): + store = {} + if value is None: + return "None" + try: + for css_sel, soup in value.items(): + store[css_sel] = str(soup) + except: + raise RuntimeError(value) + return store + + with open(self.filename, "w") as f: + self.mapping_templates.dump(f, value_dumper) \ No newline at end of file diff --git a/examples/complex/webscanner_helper/urldict.py b/examples/complex/webscanner_helper/urldict.py new file mode 100644 index 00000000..28e6b5e6 --- /dev/null +++ b/examples/complex/webscanner_helper/urldict.py @@ -0,0 +1,90 @@ +import itertools +import json +import typing +from collections.abc import MutableMapping +from typing import Any, Dict, Generator, List, TextIO, Callable + +from mitmproxy import flowfilter +from mitmproxy.http import HTTPFlow + + +def f_id(x): + return x + + +class URLDict(MutableMapping): + """Data structure to store information using filters as keys.""" + def __init__(self): + self.store: Dict[flowfilter.TFilter, Any] = {} + + def __getitem__(self, key, *, count=0): + if count: + ret = itertools.islice(self.get_generator(key), 0, count) + else: + ret = list(self.get_generator(key)) + + if ret: + return ret + else: + raise KeyError + + def __setitem__(self, key: str, value): + fltr = flowfilter.parse(key) + if fltr: + self.store.__setitem__(fltr, value) + else: + raise ValueError("Not a valid filter") + + def __delitem__(self, key): + self.store.__delitem__(key) + + def __iter__(self): + return self.store.__iter__() + + def __len__(self): + return self.store.__len__() + + def get_generator(self, flow: HTTPFlow) -> Generator[Any, None, None]: + + for fltr, value in self.store.items(): + if flowfilter.match(fltr, flow): + yield value + + def get(self, flow: HTTPFlow, default=None, *, count=0) -> List[Any]: + try: + return self.__getitem__(flow, count=count) + except KeyError: + return default + + @classmethod + def _load(cls, json_obj, value_loader: Callable = f_id): + url_dict = cls() + for fltr, value in json_obj.items(): + url_dict[fltr] = value_loader(value) + return url_dict + + @classmethod + def load(cls, f: TextIO, value_loader: Callable = f_id): + json_obj = json.load(f) + return cls._load(json_obj, value_loader) + + @classmethod + def loads(cls, json_str: str, value_loader: Callable = f_id): + json_obj = json.loads(json_str) + return cls._load(json_obj, value_loader) + + def _dump(self, value_dumper: Callable = f_id) -> Dict: + dumped: Dict[typing.Union[flowfilter.TFilter, str], Any] = {} + for fltr, value in self.store.items(): + if hasattr(fltr, 'pattern'): + # cast necessary for mypy + dumped[typing.cast(Any, fltr).pattern] = value_dumper(value) + else: + dumped[str(fltr)] = value_dumper(value) + return dumped + + def dump(self, f: TextIO, value_dumper: Callable = f_id): + json.dump(self._dump(value_dumper), f) + + def dumps(self, value_dumper: Callable = f_id): + return json.dumps(self._dump(value_dumper)) diff --git a/examples/complex/webscanner_helper/urlindex.py b/examples/complex/webscanner_helper/urlindex.py new file mode 100644 index 00000000..db8b1c56 --- /dev/null +++ b/examples/complex/webscanner_helper/urlindex.py @@ -0,0 +1,168 @@ +import abc +import datetime +import json +import logging +from pathlib import Path +from typing import Type, Dict, Union, Optional + +from mitmproxy import flowfilter +from mitmproxy.http import HTTPFlow + +logger = logging.getLogger(__name__) + + +class UrlIndexWriter(abc.ABC): + """Abstract Add-on to write seen URLs. + + For example, these URLs can be injected in a web application to improve the crawling of web application scanners. + The injection can be done using the URLInjection Add-on. + """ + + def __init__(self, filename: Path): + """Initializes the UrlIndexWriter. + + Args: + filename: Path to file to which the URL index will be written. + """ + self.filepath = filename + + @abc.abstractmethod + def load(self): + """Load existing URL index.""" + pass + + @abc.abstractmethod + def add_url(self, flow: HTTPFlow): + """Add new URL to URL index.""" + pass + + @abc.abstractmethod + def save(self): + pass + + +class SetEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, set): + return list(obj) + return json.JSONEncoder.default(self, obj) + + +class JSONUrlIndexWriter(UrlIndexWriter): + """Writes seen URLs as JSON.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.host_urls = {} + + def load(self): + if self.filepath.exists(): + with self.filepath.open("r") as f: + self.host_urls = json.load(f) + for host in self.host_urls.keys(): + for path, methods in self.host_urls[host].items(): + for method, codes in methods.items(): + self.host_urls[host][path] = {method: set(codes)} + + def add_url(self, flow: HTTPFlow): + req = flow.request + res = flow.response + + if req is not None and res is not None: + urls = self.host_urls.setdefault(f"{req.scheme}://{req.host}:{req.port}", dict()) + methods = urls.setdefault(req.path, {}) + codes = methods.setdefault(req.method, set()) + codes.add(res.status_code) + + def save(self): + with self.filepath.open("w") as f: + json.dump(self.host_urls, f, cls=SetEncoder) + + +class TextUrlIndexWriter(UrlIndexWriter): + """Writes seen URLs as text.""" + + def load(self): + pass + + def add_url(self, flow: HTTPFlow): + res = flow.response + req = flow.request + if res is not None and req is not None: + with self.filepath.open("a+") as f: + f.write(f"{datetime.datetime.utcnow().isoformat()} STATUS: {res.status_code} METHOD: " + f"{req.method} URL:{req.url}\n") + + def save(self): + pass + + +WRITER: Dict[str, Type[UrlIndexWriter]] = { + "json": JSONUrlIndexWriter, + "text": TextUrlIndexWriter, +} + + +def filter_404(flow) -> bool: + """Filters responses with status code 404.""" + return flow.response.status_code != 404 + + +class UrlIndexAddon: + """Add-on to write seen URLs, either as JSON or as text. + + For example, these URLs can be injected in a web application to improve the crawling of web application scanners. + The injection can be done using the URLInjection Add-on. + """ + + index_filter: Optional[Union[str, flowfilter.TFilter]] + writer: UrlIndexWriter + + OPT_FILEPATH = "URLINDEX_FILEPATH" + OPT_APPEND = "URLINDEX_APPEND" + OPT_INDEX_FILTER = "URLINDEX_FILTER" + + def __init__(self, file_path: Union[str, Path], append: bool = True, + index_filter: Union[str, flowfilter.TFilter] = filter_404, index_format: str = "json"): + """ Initializes the urlindex add-on. + + Args: + file_path: Path to file to which the URL index will be written. Can either be given as str or Path. + append: Bool to decide whether to append new URLs to the given file (as opposed to overwrite the contents + of the file) + index_filer: A mitmproxy filter with which the seen URLs will be filtered before being written. Can either + be given as str or as flowfilter.TFilter + index_format: The format of the URL index, can either be "json" or "text". + """ + + if isinstance(index_filter, str): + self.index_filter = flowfilter.parse(index_filter) + if self.index_filter is None: + raise ValueError("Invalid filter expression.") + else: + self.index_filter = index_filter + + file_path = Path(file_path) + try: + self.writer = WRITER[index_format.lower()](file_path) + except KeyError: + raise ValueError(f"Format '{index_format}' is not supported.") + + if not append and file_path.exists(): + file_path.unlink() + + self.writer.load() + + def response(self, flow: HTTPFlow): + """Checks if the response should be included in the URL based on the index_filter and adds it to the URL index + if appropriate. + """ + if isinstance(self.index_filter, str) or self.index_filter is None: + raise ValueError("Invalid filter expression.") + else: + if self.index_filter(flow): + self.writer.add_url(flow) + + def done(self): + """Writes the URL index.""" + self.writer.save() diff --git a/examples/complex/webscanner_helper/urlinjection.py b/examples/complex/webscanner_helper/urlinjection.py new file mode 100644 index 00000000..b62eca2b --- /dev/null +++ b/examples/complex/webscanner_helper/urlinjection.py @@ -0,0 +1,173 @@ +import abc +import html +import json +import logging + +from mitmproxy import flowfilter +from mitmproxy.http import HTTPFlow + +logger = logging.getLogger(__name__) + + +class InjectionGenerator: + """Abstract class for an generator of the injection content in order to inject the URL index.""" + ENCODING = "UTF8" + + @abc.abstractmethod + def inject(self, index, flow: HTTPFlow): + """Injects the given URL index into the given flow.""" + pass + + +class HTMLInjection(InjectionGenerator): + """Injects the URL index either by creating a new HTML page or by appending is to an existing page.""" + + def __init__(self, insert: bool = False): + """Initializes the HTMLInjection. + + Args: + insert: boolean to decide whether to insert the URL index to an existing page (True) or to create a new + page containing the URL index. + """ + self.insert = insert + + @classmethod + def _form_html(cls, url): + return f"
" + + @classmethod + def _link_html(cls, url): + return f"link to {url}" + + @classmethod + def index_html(cls, index): + link_htmls = [] + for scheme_netloc, paths in index.items(): + for path, methods in paths.items(): + url = scheme_netloc + path + if "POST" in methods: + link_htmls.append(cls._form_html(url)) + + if "GET" in methods: + link_htmls.append(cls._link_html(url)) + return "".join(link_htmls) + + @classmethod + def landing_page(cls, index): + return ( + "" + + cls.index_html(index) + + "" + ) + + def inject(self, index, flow: HTTPFlow): + if flow.response is not None: + if flow.response.status_code != 404 and not self.insert: + logger.warning( + f"URL '{flow.request.url}' didn't return 404 status, " + f"index page would overwrite valid page.") + elif self.insert: + content = (flow.response + .content + .decode(self.ENCODING, "backslashreplace")) + if "" in content: + content = content.replace("", self.index_html(index) + "") + else: + content += self.index_html(index) + flow.response.content = content.encode(self.ENCODING) + else: + flow.response.content = (self.landing_page(index) + .encode(self.ENCODING)) + + +class RobotsInjection(InjectionGenerator): + """Injects the URL index by creating a new robots.txt including the URLs.""" + + def __init__(self, directive="Allow"): + self.directive = directive + + @classmethod + def robots_txt(cls, index, directive="Allow"): + lines = ["User-agent: *"] + for scheme_netloc, paths in index.items(): + for path, methods in paths.items(): + lines.append(directive + ": " + path) + return "\n".join(lines) + + def inject(self, index, flow: HTTPFlow): + if flow.response is not None: + if flow.response.status_code != 404: + logger.warning( + f"URL '{flow.request.url}' didn't return 404 status, " + f"index page would overwrite valid page.") + else: + flow.response.content = self.robots_txt(index, + self.directive).encode( + self.ENCODING) + + +class SitemapInjection(InjectionGenerator): + """Injects the URL index by creating a new sitemap including the URLs.""" + + @classmethod + def sitemap(cls, index): + lines = [ + ""] + for scheme_netloc, paths in index.items(): + for path, methods in paths.items(): + url = scheme_netloc + path + lines.append(f"{html.escape(url)}") + lines.append("") + return "\n".join(lines) + + def inject(self, index, flow: HTTPFlow): + if flow.response is not None: + if flow.response.status_code != 404: + logger.warning( + f"URL '{flow.request.url}' didn't return 404 status, " + f"index page would overwrite valid page.") + else: + flow.response.content = self.sitemap(index).encode(self.ENCODING) + + +class UrlInjectionAddon: + """ The UrlInjection add-on can be used in combination with web application scanners to improve their crawling + performance. + + The given URls will be injected into the web application. With this, web application scanners can find pages to + crawl much easier. Depending on the Injection generator, the URLs will be injected at different places of the + web application. It is possible to create a landing page which includes the URL (HTMLInjection()), to inject the + URLs to an existing page (HTMLInjection(insert=True)), to create a robots.txt containing the URLs + (RobotsInjection()) or to create a sitemap.xml which includes the URLS (SitemapInjection()). + It is necessary that the web application scanner can find the newly created page containing the URL index. For + example, the newly created page can be set as starting point for the web application scanner. + The URL index needed for the injection can be generated by the UrlIndex Add-on. + """ + + def __init__(self, flt: str, url_index_file: str, + injection_gen: InjectionGenerator): + """Initializes the UrlIndex add-on. + + Args: + flt: mitmproxy filter to decide on which pages the URLs will be injected (str). + url_index_file: Path to the file which includes the URL index in JSON format (e.g. generated by the UrlIndexAddon), given + as str. + injection_gen: InjectionGenerator that should be used to inject the URLs into the web application. + """ + self.name = f"{self.__class__.__name__}-{injection_gen.__class__.__name__}-{self.__hash__()}" + self.flt = flowfilter.parse(flt) + self.injection_gen = injection_gen + with open(url_index_file, "r") as f: + self.url_store = json.load(f) + + def response(self, flow: HTTPFlow): + """Checks if the response matches the filter and such should be injected. + Injects the URL index if appropriate. + """ + if flow.response is not None: + if self.flt is not None and self.flt(flow): + self.injection_gen.inject(self.url_store, flow) + flow.response.status_code = 200 + flow.response.headers["content-type"] = "text/html" + logger.debug(f"Set status code to 200 and set content to logged " + f"urls. Method: {self.injection_gen}") diff --git a/examples/complex/webscanner_helper/watchdog.py b/examples/complex/webscanner_helper/watchdog.py new file mode 100644 index 00000000..867d2196 --- /dev/null +++ b/examples/complex/webscanner_helper/watchdog.py @@ -0,0 +1,71 @@ +import pathlib +import time +import typing +import logging +from datetime import datetime + +import mitmproxy.connections +import mitmproxy.http +from mitmproxy.addons.export import curl_command, raw +from mitmproxy.exceptions import HttpSyntaxException + +logger = logging.getLogger(__name__) + + +class WatchdogAddon(): + """ The Watchdog Add-on can be used in combination with web application scanners in oder to check if the device + under test responds correctls to the scanner's responses. + + The Watchdog Add-on checks if the device under test responds correctly to the scanner's responses. + If the Watchdog sees that the DUT is no longer responding correctly, an multiprocessing event is set. + This information can be used to restart the device under test if necessary. + """ + + def __init__(self, event, outdir: pathlib.Path, timeout=None): + """Initializes the Watchdog. + + Args: + event: multiprocessing.Event that will be set if the watchdog is triggered. + outdir: path to a directory in which the triggering requests will be saved (curl and raw). + timeout_conn: float that specifies the timeout for the server connection + """ + self.error_event = event + self.flow_dir = outdir + if self.flow_dir.exists() and not self.flow_dir.is_dir(): + raise RuntimeError("Watchtdog output path must be a directory.") + elif not self.flow_dir.exists(): + self.flow_dir.mkdir(parents=True) + self.last_trigger: typing.Union[None, float] = None + self.timeout: typing.Union[None, float] = timeout + + def serverconnect(self, conn: mitmproxy.connections.ServerConnection): + if self.timeout is not None: + conn.settimeout(self.timeout) + + @classmethod + def not_in_timeout(cls, last_triggered, timeout): + """Checks if current error lies not in timeout after last trigger (potential reset of connection).""" + return last_triggered is None or timeout is None or (time.time() - last_triggered > timeout) + + def error(self, flow): + """ Checks if the watchdog will be triggered. + + Only triggers watchdog for timeouts after last reset and if flow.error is set (shows that error is a server + error). Ignores HttpSyntaxException Errors since this can be triggered on purpose by web application scanner. + + Args: + flow: mitmproxy.http.flow + """ + if (self.not_in_timeout(self.last_trigger, self.timeout) + and flow.error is not None and not isinstance(flow.error, HttpSyntaxException)): + + self.last_trigger = time.time() + logger.error(f"Watchdog triggered! Cause: {flow}") + self.error_event.set() + + # save the request which might have caused the problem + if flow.request: + with (self.flow_dir / f"{datetime.utcnow().isoformat()}.curl").open("w") as f: + f.write(curl_command(flow)) + with (self.flow_dir / f"{datetime.utcnow().isoformat()}.raw").open("wb") as f: + f.write(raw(flow)) diff --git a/test/examples/webscanner_helper/__init__.py b/test/examples/webscanner_helper/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/examples/webscanner_helper/test_mapping.py b/test/examples/webscanner_helper/test_mapping.py new file mode 100644 index 00000000..e4d519fc --- /dev/null +++ b/test/examples/webscanner_helper/test_mapping.py @@ -0,0 +1,165 @@ +from typing import TextIO, Callable +from unittest import mock +from unittest.mock import MagicMock + +from mitmproxy.test import tflow +from mitmproxy.test import tutils + +from examples.complex.webscanner_helper.mapping import MappingAddon, MappingAddonConfig + + +class TestConfig: + + def test_config(self): + assert MappingAddonConfig.HTML_PARSER == "html.parser" + + +url = "http://10.10.10.10" +new_content = "My Text" +mapping_content = f'{{"{url}": {{"body": "{new_content}"}}}}' + + +class TestMappingAddon: + + def test_init(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + assert "My Text" in str(mapping.mapping_templates._dump()) + + def test_load(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + loader = MagicMock() + + mapping.load(loader) + assert 'mapping_file' in str(loader.add_option.call_args_list) + assert 'map_persistent' in str(loader.add_option.call_args_list) + + def test_configure(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + new_filename = "My new filename" + updated = {str(mapping.OPT_MAPPING_FILE): new_filename, str(mapping.OPT_MAP_PERSISTENT): True} + + open_mock = mock.mock_open(read_data="{}") + with mock.patch("builtins.open", open_mock): + mapping.configure(updated) + assert new_filename in str(open_mock.mock_calls) + assert mapping.filename == new_filename + assert mapping.persistent + + def test_response_filtered(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + test_content = b"Test" + f.response.content = test_content + + mapping.response(f) + assert f.response.content == test_content + + def test_response(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + test_content = b" Test " + f.response.content = test_content + f.request.url = url + + mapping.response(f) + assert f.response.content.decode("utf-8") == new_content + + def test_response_content_type(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + test_content = b" Test " + f.response.content = test_content + f.request.url = url + f.response.headers.add("content-type", "content-type") + + mapping.response(f) + assert f.response.content == test_content + + def test_response_not_existing(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + test_content = b" Test " + f.response.content = test_content + f.request.url = url + mapping.response(f) + assert f.response.content == test_content + + def test_persistance_false(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + + open_mock = mock.mock_open(read_data="{}") + with mock.patch("builtins.open", open_mock): + mapping.done() + assert len(open_mock.mock_calls) == 0 + + def test_persistance_true(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile, persistent=True) + + open_mock = mock.mock_open(read_data="{}") + with mock.patch("builtins.open", open_mock): + mapping.done() + with open(tmpfile, "r") as tfile: + results = tfile.read() + assert len(open_mock.mock_calls) != 0 + assert results == mapping_content + + def test_persistance_true_add_content(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile, persistent=True) + + f = tflow.tflow(resp=tutils.tresp()) + test_content = b" Test " + f.response.content = test_content + f.request.url = url + + mapping.response(f) + mapping.done() + with open(tmpfile, "r") as tfile: + results = tfile.read() + assert mapping_content in results + + def mock_dump(self, f: TextIO, value_dumper: Callable): + assert value_dumper(None) == "None" + try: + value_dumper("Test") + except RuntimeError: + assert True + else: + assert False + + def test_dump(selfself, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write("{}") + mapping = MappingAddon(tmpfile, persistent=True) + with mock.patch('examples.complex.webscanner_helper.urldict.URLDict.dump', selfself.mock_dump): + mapping.done() diff --git a/test/examples/webscanner_helper/test_urldict.py b/test/examples/webscanner_helper/test_urldict.py new file mode 100644 index 00000000..7bd4fb01 --- /dev/null +++ b/test/examples/webscanner_helper/test_urldict.py @@ -0,0 +1,89 @@ +from mitmproxy.test import tflow, tutils +from examples.complex.webscanner_helper.urldict import URLDict + +url = "http://10.10.10.10" +new_content_body = "New Body" +new_content_title = "New Title" +content = f'{{"body": "{new_content_body}", "title": "{new_content_title}"}}' +url_error = "i~nvalid" +input_file_content = f'{{"{url}": {content}}}' +input_file_content_error = f'{{"{url_error}": {content}}}' + + +class TestUrlDict: + + def test_urldict_empty(self): + urldict = URLDict() + dump = urldict.dumps() + assert dump == '{}' + + def test_urldict_loads(self): + urldict = URLDict.loads(input_file_content) + dump = urldict.dumps() + assert dump == input_file_content + + def test_urldict_set_error(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(input_file_content_error) + with open(tmpfile, "r") as tfile: + try: + URLDict.load(tfile) + except ValueError: + assert True + else: + assert False + + def test_urldict_get(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(input_file_content) + with open(tmpfile, "r") as tfile: + urldict = URLDict.load(tfile) + + f = tflow.tflow(resp=tutils.tresp()) + f.request.url = url + selection = urldict[f] + assert "body" in selection[0] + assert new_content_body in selection[0]["body"] + assert "title" in selection[0] + assert new_content_title in selection[0]["title"] + + selection_get = urldict.get(f) + assert "body" in selection_get[0] + assert new_content_body in selection_get[0]["body"] + assert "title" in selection_get[0] + assert new_content_title in selection_get[0]["title"] + + try: + urldict["body"] + except KeyError: + assert True + else: + assert False + + assert urldict.get("body", default="default") == "default" + + def test_urldict_dumps(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(input_file_content) + with open(tmpfile, "r") as tfile: + urldict = URLDict.load(tfile) + + dump = urldict.dumps() + assert dump == input_file_content + + def test_urldict_dump(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + outfile = tmpdir.join("outfile") + with open(tmpfile, "w") as tfile: + tfile.write(input_file_content) + with open(tmpfile, "r") as tfile: + urldict = URLDict.load(tfile) + with open(outfile, "w") as ofile: + urldict.dump(ofile) + + with open(outfile, "r") as ofile: + output = ofile.read() + assert output == input_file_content diff --git a/test/examples/webscanner_helper/test_urlindex.py b/test/examples/webscanner_helper/test_urlindex.py new file mode 100644 index 00000000..0edd6cc0 --- /dev/null +++ b/test/examples/webscanner_helper/test_urlindex.py @@ -0,0 +1,234 @@ +import json +from json import JSONDecodeError +from pathlib import Path +from unittest import mock +from typing import List +from unittest.mock import patch + +from mitmproxy.test import tflow +from mitmproxy.test import tutils + +from examples.complex.webscanner_helper.urlindex import UrlIndexWriter, SetEncoder, JSONUrlIndexWriter, TextUrlIndexWriter, WRITER, \ + filter_404, \ + UrlIndexAddon + + +class TestBaseClass: + + @patch.multiple(UrlIndexWriter, __abstractmethods__=set()) + def test_base_class(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + index_writer = UrlIndexWriter(tmpfile) + index_writer.load() + index_writer.add_url(tflow.tflow()) + index_writer.save() + + +class TestSetEncoder: + + def test_set_encoder_set(self): + test_set = {"foo", "bar", "42"} + result = SetEncoder.default(SetEncoder(), test_set) + assert isinstance(result, List) + assert 'foo' in result + assert 'bar' in result + assert '42' in result + + def test_set_encoder_str(self): + test_str = "test" + try: + SetEncoder.default(SetEncoder(), test_str) + except TypeError: + assert True + else: + assert False + + +class TestJSONUrlIndexWriter: + + def test_load(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write( + "{\"http://example.com:80\": {\"/\": {\"GET\": [301]}}, \"http://www.example.com:80\": {\"/\": {\"GET\": [302]}}}") + writer = JSONUrlIndexWriter(filename=tmpfile) + writer.load() + assert 'http://example.com:80' in writer.host_urls + assert '/' in writer.host_urls['http://example.com:80'] + assert 'GET' in writer.host_urls['http://example.com:80']['/'] + assert 301 in writer.host_urls['http://example.com:80']['/']['GET'] + + def test_load_empty(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write("{}") + writer = JSONUrlIndexWriter(filename=tmpfile) + writer.load() + assert len(writer.host_urls) == 0 + + def test_load_nonexisting(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + writer = JSONUrlIndexWriter(filename=tmpfile) + writer.load() + assert len(writer.host_urls) == 0 + + def test_add(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + writer = JSONUrlIndexWriter(filename=tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + url = f"{f.request.scheme}://{f.request.host}:{f.request.port}" + writer.add_url(f) + assert url in writer.host_urls + assert f.request.path in writer.host_urls[url] + + def test_save(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + writer = JSONUrlIndexWriter(filename=tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + url = f"{f.request.scheme}://{f.request.host}:{f.request.port}" + writer.add_url(f) + writer.save() + + with open(tmpfile, "r") as results: + try: + content = json.load(results) + except JSONDecodeError: + assert False + assert url in content + + +class TestTestUrlIndexWriter: + def test_load(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write( + "2020-04-22T05:41:08.679231 STATUS: 200 METHOD: GET URL:http://example.com") + writer = TextUrlIndexWriter(filename=tmpfile) + writer.load() + assert True + + def test_load_empty(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write("{}") + writer = TextUrlIndexWriter(filename=tmpfile) + writer.load() + assert True + + def test_load_nonexisting(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + writer = TextUrlIndexWriter(filename=tmpfile) + writer.load() + assert True + + def test_add(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + writer = TextUrlIndexWriter(filename=tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + url = f"{f.request.scheme}://{f.request.host}:{f.request.port}" + method = f.request.method + code = f.response.status_code + writer.add_url(f) + + with open(tmpfile, "r") as results: + content = results.read() + assert url in content + assert method in content + assert str(code) in content + + def test_save(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + writer = TextUrlIndexWriter(filename=tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + url = f"{f.request.scheme}://{f.request.host}:{f.request.port}" + method = f.request.method + code = f.response.status_code + writer.add_url(f) + writer.save() + + with open(tmpfile, "r") as results: + content = results.read() + assert url in content + assert method in content + assert str(code) in content + + +class TestWriter: + def test_writer_dict(self): + assert "json" in WRITER + assert isinstance(WRITER["json"], JSONUrlIndexWriter.__class__) + assert "text" in WRITER + assert isinstance(WRITER["text"], TextUrlIndexWriter.__class__) + + +class TestFilter: + def test_filer_true(self): + f = tflow.tflow(resp=tutils.tresp()) + assert filter_404(f) + + def test_filter_false(self): + f = tflow.tflow(resp=tutils.tresp()) + f.response.status_code = 404 + assert not filter_404(f) + + +class TestUrlIndexAddon: + + def test_init(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + UrlIndexAddon(tmpfile) + + def test_init_format(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + try: + UrlIndexAddon(tmpfile, index_format="test") + except ValueError: + assert True + else: + assert False + + def test_init_filter(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + try: + UrlIndexAddon(tmpfile, index_filter="i~nvalid") + except ValueError: + assert True + else: + assert False + + def test_init_append(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write("") + url_index = UrlIndexAddon(tmpfile, append=False) + f = tflow.tflow(resp=tutils.tresp()) + with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.add_url'): + url_index.response(f) + assert not Path(tmpfile).exists() + + def test_response(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + url_index = UrlIndexAddon(tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.add_url') as mock_add_url: + url_index.response(f) + mock_add_url.assert_called() + + def test_response_None(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + url_index = UrlIndexAddon(tmpfile) + url_index.index_filter = None + f = tflow.tflow(resp=tutils.tresp()) + try: + url_index.response(f) + except ValueError: + assert True + else: + assert False + + def test_done(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + url_index = UrlIndexAddon(tmpfile) + with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.save') as mock_save: + url_index.done() + mock_save.assert_called() diff --git a/test/examples/webscanner_helper/test_urlinjection.py b/test/examples/webscanner_helper/test_urlinjection.py new file mode 100644 index 00000000..4b87296d --- /dev/null +++ b/test/examples/webscanner_helper/test_urlinjection.py @@ -0,0 +1,111 @@ +import json +from unittest import mock + +from mitmproxy import flowfilter +from mitmproxy.test import tflow +from mitmproxy.test import tutils + +from examples.complex.webscanner_helper.urlinjection import InjectionGenerator, HTMLInjection, RobotsInjection, SitemapInjection, \ + UrlInjectionAddon, logger + +index = json.loads( + "{\"http://example.com:80\": {\"/\": {\"GET\": [301]}}, \"http://www.example.com:80\": {\"/test\": {\"POST\": [302]}}}") + + +class TestInjectionGenerator: + + def test_inject(self): + f = tflow.tflow(resp=tutils.tresp()) + injection_generator = InjectionGenerator() + injection_generator.inject(index=index, flow=f) + assert True + + +class TestHTMLInjection: + + def test_inject_not404(self): + html_injection = HTMLInjection() + f = tflow.tflow(resp=tutils.tresp()) + + with mock.patch.object(logger, 'warning') as mock_warning: + html_injection.inject(index, f) + assert mock_warning.called + + def test_inject_insert(self): + html_injection = HTMLInjection(insert=True) + f = tflow.tflow(resp=tutils.tresp()) + assert "example.com" not in str(f.response.content) + html_injection.inject(index, f) + assert "example.com" in str(f.response.content) + + def test_inject_insert_body(self): + html_injection = HTMLInjection(insert=True) + f = tflow.tflow(resp=tutils.tresp()) + f.response.text = "" + assert "example.com" not in str(f.response.content) + html_injection.inject(index, f) + assert "example.com" in str(f.response.content) + + def test_inject_404(self): + html_injection = HTMLInjection() + f = tflow.tflow(resp=tutils.tresp()) + f.response.status_code = 404 + assert "example.com" not in str(f.response.content) + html_injection.inject(index, f) + assert "example.com" in str(f.response.content) + + +class TestRobotsInjection: + + def test_inject_not404(self): + robots_injection = RobotsInjection() + f = tflow.tflow(resp=tutils.tresp()) + + with mock.patch.object(logger, 'warning') as mock_warning: + robots_injection.inject(index, f) + assert mock_warning.called + + def test_inject_404(self): + robots_injection = RobotsInjection() + f = tflow.tflow(resp=tutils.tresp()) + f.response.status_code = 404 + assert "Allow: /test" not in str(f.response.content) + robots_injection.inject(index, f) + assert "Allow: /test" in str(f.response.content) + + +class TestSitemapInjection: + + def test_inject_not404(self): + sitemap_injection = SitemapInjection() + f = tflow.tflow(resp=tutils.tresp()) + + with mock.patch.object(logger, 'warning') as mock_warning: + sitemap_injection.inject(index, f) + assert mock_warning.called + + def test_inject_404(self): + sitemap_injection = SitemapInjection() + f = tflow.tflow(resp=tutils.tresp()) + f.response.status_code = 404 + assert "http://example.com:80/" not in str(f.response.content) + sitemap_injection.inject(index, f) + assert "http://example.com:80/" in str(f.response.content) + + +class TestUrlInjectionAddon: + + def test_init(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + json.dump(index, tfile) + flt = f"~u .*/site.html$" + url_injection = UrlInjectionAddon(f"~u .*/site.html$", tmpfile, HTMLInjection(insert=True)) + assert "http://example.com:80" in url_injection.url_store + fltr = flowfilter.parse(flt) + f = tflow.tflow(resp=tutils.tresp()) + f.request.url = "http://example.com/site.html" + assert fltr(f) + assert "http://example.com:80" not in str(f.response.content) + url_injection.response(f) + assert "http://example.com:80" in str(f.response.content) diff --git a/test/examples/webscanner_helper/test_watchdog.py b/test/examples/webscanner_helper/test_watchdog.py new file mode 100644 index 00000000..43e59310 --- /dev/null +++ b/test/examples/webscanner_helper/test_watchdog.py @@ -0,0 +1,84 @@ +import time +from pathlib import Path +from unittest import mock + +from mitmproxy.connections import ServerConnection +from mitmproxy.exceptions import HttpSyntaxException +from mitmproxy.test import tflow +from mitmproxy.test import tutils +import multiprocessing + +from examples.complex.webscanner_helper.watchdog import WatchdogAddon, logger + + +class TestWatchdog: + + def test_init_file(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write("") + event = multiprocessing.Event() + try: + WatchdogAddon(event, Path(tmpfile)) + except RuntimeError: + assert True + else: + assert False + + def test_init_dir(self, tmpdir): + event = multiprocessing.Event() + mydir = tmpdir.join("mydir") + assert not Path(mydir).exists() + WatchdogAddon(event, Path(mydir)) + assert Path(mydir).exists() + + def test_serverconnect(self, tmpdir): + event = multiprocessing.Event() + w = WatchdogAddon(event, Path(tmpdir), timeout=10) + with mock.patch('mitmproxy.connections.ServerConnection.settimeout') as mock_set_timeout: + w.serverconnect(ServerConnection("127.0.0.1")) + mock_set_timeout.assert_called() + + def test_serverconnect_None(self, tmpdir): + event = multiprocessing.Event() + w = WatchdogAddon(event, Path(tmpdir)) + with mock.patch('mitmproxy.connections.ServerConnection.settimeout') as mock_set_timeout: + w.serverconnect(ServerConnection("127.0.0.1")) + assert not mock_set_timeout.called + + def test_trigger(self, tmpdir): + event = multiprocessing.Event() + w = WatchdogAddon(event, Path(tmpdir)) + f = tflow.tflow(resp=tutils.tresp()) + f.error = "Test Error" + + with mock.patch.object(logger, 'error') as mock_error: + open_mock = mock.mock_open() + with mock.patch("pathlib.Path.open", open_mock, create=True): + w.error(f) + mock_error.assert_called() + open_mock.assert_called() + + def test_trigger_http_synatx(self, tmpdir): + event = multiprocessing.Event() + w = WatchdogAddon(event, Path(tmpdir)) + f = tflow.tflow(resp=tutils.tresp()) + f.error = HttpSyntaxException() + assert isinstance(f.error, HttpSyntaxException) + + with mock.patch.object(logger, 'error') as mock_error: + open_mock = mock.mock_open() + with mock.patch("pathlib.Path.open", open_mock, create=True): + w.error(f) + assert not mock_error.called + assert not open_mock.called + + def test_timeout(self, tmpdir): + event = multiprocessing.Event() + w = WatchdogAddon(event, Path(tmpdir)) + + assert w.not_in_timeout(None, None) + assert w.not_in_timeout(time.time, None) + with mock.patch('time.time', return_value=5): + assert not w.not_in_timeout(3, 20) + assert w.not_in_timeout(3, 1) -- cgit v1.2.3