From 7fdcbb09e6034ab1f76724965cfdf45f3d775129 Mon Sep 17 00:00:00 2001 From: anneborcherding <55282902+anneborcherding@users.noreply.github.com> Date: Mon, 4 May 2020 10:37:13 +0200 Subject: added add-ons that enhance the performance of web application scanners. (#3961) * added add-ons that enhance the performance of web application scanners. Co-authored-by: weichweich <14820950+weichweich@users.noreply.github.com> --- examples/complex/webscanner_helper/__init__.py | 0 examples/complex/webscanner_helper/mapping.py | 144 +++++++++++++++++ examples/complex/webscanner_helper/urldict.py | 90 +++++++++++ examples/complex/webscanner_helper/urlindex.py | 168 ++++++++++++++++++++ examples/complex/webscanner_helper/urlinjection.py | 173 +++++++++++++++++++++ examples/complex/webscanner_helper/watchdog.py | 71 +++++++++ 6 files changed, 646 insertions(+) create mode 100644 examples/complex/webscanner_helper/__init__.py create mode 100644 examples/complex/webscanner_helper/mapping.py create mode 100644 examples/complex/webscanner_helper/urldict.py create mode 100644 examples/complex/webscanner_helper/urlindex.py create mode 100644 examples/complex/webscanner_helper/urlinjection.py create mode 100644 examples/complex/webscanner_helper/watchdog.py (limited to 'examples') diff --git a/examples/complex/webscanner_helper/__init__.py b/examples/complex/webscanner_helper/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/complex/webscanner_helper/mapping.py b/examples/complex/webscanner_helper/mapping.py new file mode 100644 index 00000000..8c83bf77 --- /dev/null +++ b/examples/complex/webscanner_helper/mapping.py @@ -0,0 +1,144 @@ +import copy +import logging +import typing +from typing import Dict + +from bs4 import BeautifulSoup + +from mitmproxy.http import HTTPFlow +from examples.complex.webscanner_helper.urldict import URLDict + +NO_CONTENT = object() + + +class MappingAddonConfig: + HTML_PARSER = "html.parser" + + +class MappingAddon: + """ The mapping add-on can be used in combination with web application scanners to reduce their false positives. + + Many web application scanners produce false positives caused by dynamically changing content of web applications + such as the current time or current measurements. When testing for injection vulnerabilities, web application + scanners are tricked into thinking they changed the content with the injected payload. In realty, the content of + the web application changed notwithstanding the scanner's input. When the mapping add-on is used to map the content + to a fixed value, these false positives can be avoided. + """ + + OPT_MAPPING_FILE = "mapping_file" + """File where urls and css selector to mapped content is stored. + + Elements will be replaced with the content given in this file. If the content is none it will be set to the first + seen value. + + Example: + + { + "http://10.10.10.10": { + "body": "My Text" + }, + "URL": { + "css selector": "Replace with this" + } + } + """ + + OPT_MAP_PERSISTENT = "map_persistent" + """Whether to store all new content in the configuration file.""" + + def __init__(self, filename: str, persistent: bool = False) -> None: + """ Initializes the mapping add-on + + Args: + filename: str that provides the name of the file in which the urls and css selectors to mapped content is + stored. + persistent: bool that indicates whether to store all new content in the configuration file. + + Example: + The file in which the mapping config is given should be in the following format: + { + "http://10.10.10.10": { + "body": "My Text" + }, + "": { + "": "Replace with this" + } + } + """ + self.filename = filename + self.persistent = persistent + self.logger = logging.getLogger(self.__class__.__name__) + with open(filename, "r") as f: + self.mapping_templates = URLDict.load(f) + + def load(self, loader): + loader.add_option( + self.OPT_MAPPING_FILE, str, "", + "File where replacement configuration is stored." + ) + loader.add_option( + self.OPT_MAP_PERSISTENT, bool, False, + "Whether to store all new content in the configuration file." + ) + + def configure(self, updated): + if self.OPT_MAPPING_FILE in updated: + self.filename = updated[self.OPT_MAPPING_FILE] + with open(self.filename, "r") as f: + self.mapping_templates = URLDict.load(f) + + if self.OPT_MAP_PERSISTENT in updated: + self.persistent = updated[self.OPT_MAP_PERSISTENT] + + def replace(self, soup: BeautifulSoup, css_sel: str, replace: BeautifulSoup) -> None: + """Replaces the content of soup that matches the css selector with the given replace content.""" + for content in soup.select(css_sel): + self.logger.debug(f"replace \"{content}\" with \"{replace}\"") + content.replace_with(copy.copy(replace)) + + def apply_template(self, soup: BeautifulSoup, template: Dict[str, typing.Union[BeautifulSoup]]) -> None: + """Applies the given mapping template to the given soup.""" + for css_sel, replace in template.items(): + mapped = soup.select(css_sel) + if not mapped: + self.logger.warning(f"Could not find \"{css_sel}\", can not freeze anything.") + else: + self.replace(soup, css_sel, BeautifulSoup(replace, features=MappingAddonConfig.HTML_PARSER)) + + def response(self, flow: HTTPFlow) -> None: + """If a response is received, check if we should replace some content. """ + try: + templates = self.mapping_templates[flow] + res = flow.response + if res is not None: + encoding = res.headers.get("content-encoding", "utf-8") + content_type = res.headers.get("content-type", "text/html") + + if "text/html" in content_type and encoding == "utf-8": + content = BeautifulSoup(res.content, MappingAddonConfig.HTML_PARSER) + for template in templates: + self.apply_template(content, template) + res.content = content.encode(encoding) + else: + self.logger.warning(f"Unsupported content type '{content_type}' or content encoding '{encoding}'") + except KeyError: + pass + + def done(self) -> None: + """Dumps all new content into the configuration file if self.persistent is set.""" + if self.persistent: + + # make sure that all items are strings and not soups. + def value_dumper(value): + store = {} + if value is None: + return "None" + try: + for css_sel, soup in value.items(): + store[css_sel] = str(soup) + except: + raise RuntimeError(value) + return store + + with open(self.filename, "w") as f: + self.mapping_templates.dump(f, value_dumper) \ No newline at end of file diff --git a/examples/complex/webscanner_helper/urldict.py b/examples/complex/webscanner_helper/urldict.py new file mode 100644 index 00000000..28e6b5e6 --- /dev/null +++ b/examples/complex/webscanner_helper/urldict.py @@ -0,0 +1,90 @@ +import itertools +import json +import typing +from collections.abc import MutableMapping +from typing import Any, Dict, Generator, List, TextIO, Callable + +from mitmproxy import flowfilter +from mitmproxy.http import HTTPFlow + + +def f_id(x): + return x + + +class URLDict(MutableMapping): + """Data structure to store information using filters as keys.""" + def __init__(self): + self.store: Dict[flowfilter.TFilter, Any] = {} + + def __getitem__(self, key, *, count=0): + if count: + ret = itertools.islice(self.get_generator(key), 0, count) + else: + ret = list(self.get_generator(key)) + + if ret: + return ret + else: + raise KeyError + + def __setitem__(self, key: str, value): + fltr = flowfilter.parse(key) + if fltr: + self.store.__setitem__(fltr, value) + else: + raise ValueError("Not a valid filter") + + def __delitem__(self, key): + self.store.__delitem__(key) + + def __iter__(self): + return self.store.__iter__() + + def __len__(self): + return self.store.__len__() + + def get_generator(self, flow: HTTPFlow) -> Generator[Any, None, None]: + + for fltr, value in self.store.items(): + if flowfilter.match(fltr, flow): + yield value + + def get(self, flow: HTTPFlow, default=None, *, count=0) -> List[Any]: + try: + return self.__getitem__(flow, count=count) + except KeyError: + return default + + @classmethod + def _load(cls, json_obj, value_loader: Callable = f_id): + url_dict = cls() + for fltr, value in json_obj.items(): + url_dict[fltr] = value_loader(value) + return url_dict + + @classmethod + def load(cls, f: TextIO, value_loader: Callable = f_id): + json_obj = json.load(f) + return cls._load(json_obj, value_loader) + + @classmethod + def loads(cls, json_str: str, value_loader: Callable = f_id): + json_obj = json.loads(json_str) + return cls._load(json_obj, value_loader) + + def _dump(self, value_dumper: Callable = f_id) -> Dict: + dumped: Dict[typing.Union[flowfilter.TFilter, str], Any] = {} + for fltr, value in self.store.items(): + if hasattr(fltr, 'pattern'): + # cast necessary for mypy + dumped[typing.cast(Any, fltr).pattern] = value_dumper(value) + else: + dumped[str(fltr)] = value_dumper(value) + return dumped + + def dump(self, f: TextIO, value_dumper: Callable = f_id): + json.dump(self._dump(value_dumper), f) + + def dumps(self, value_dumper: Callable = f_id): + return json.dumps(self._dump(value_dumper)) diff --git a/examples/complex/webscanner_helper/urlindex.py b/examples/complex/webscanner_helper/urlindex.py new file mode 100644 index 00000000..db8b1c56 --- /dev/null +++ b/examples/complex/webscanner_helper/urlindex.py @@ -0,0 +1,168 @@ +import abc +import datetime +import json +import logging +from pathlib import Path +from typing import Type, Dict, Union, Optional + +from mitmproxy import flowfilter +from mitmproxy.http import HTTPFlow + +logger = logging.getLogger(__name__) + + +class UrlIndexWriter(abc.ABC): + """Abstract Add-on to write seen URLs. + + For example, these URLs can be injected in a web application to improve the crawling of web application scanners. + The injection can be done using the URLInjection Add-on. + """ + + def __init__(self, filename: Path): + """Initializes the UrlIndexWriter. + + Args: + filename: Path to file to which the URL index will be written. + """ + self.filepath = filename + + @abc.abstractmethod + def load(self): + """Load existing URL index.""" + pass + + @abc.abstractmethod + def add_url(self, flow: HTTPFlow): + """Add new URL to URL index.""" + pass + + @abc.abstractmethod + def save(self): + pass + + +class SetEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, set): + return list(obj) + return json.JSONEncoder.default(self, obj) + + +class JSONUrlIndexWriter(UrlIndexWriter): + """Writes seen URLs as JSON.""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.host_urls = {} + + def load(self): + if self.filepath.exists(): + with self.filepath.open("r") as f: + self.host_urls = json.load(f) + for host in self.host_urls.keys(): + for path, methods in self.host_urls[host].items(): + for method, codes in methods.items(): + self.host_urls[host][path] = {method: set(codes)} + + def add_url(self, flow: HTTPFlow): + req = flow.request + res = flow.response + + if req is not None and res is not None: + urls = self.host_urls.setdefault(f"{req.scheme}://{req.host}:{req.port}", dict()) + methods = urls.setdefault(req.path, {}) + codes = methods.setdefault(req.method, set()) + codes.add(res.status_code) + + def save(self): + with self.filepath.open("w") as f: + json.dump(self.host_urls, f, cls=SetEncoder) + + +class TextUrlIndexWriter(UrlIndexWriter): + """Writes seen URLs as text.""" + + def load(self): + pass + + def add_url(self, flow: HTTPFlow): + res = flow.response + req = flow.request + if res is not None and req is not None: + with self.filepath.open("a+") as f: + f.write(f"{datetime.datetime.utcnow().isoformat()} STATUS: {res.status_code} METHOD: " + f"{req.method} URL:{req.url}\n") + + def save(self): + pass + + +WRITER: Dict[str, Type[UrlIndexWriter]] = { + "json": JSONUrlIndexWriter, + "text": TextUrlIndexWriter, +} + + +def filter_404(flow) -> bool: + """Filters responses with status code 404.""" + return flow.response.status_code != 404 + + +class UrlIndexAddon: + """Add-on to write seen URLs, either as JSON or as text. + + For example, these URLs can be injected in a web application to improve the crawling of web application scanners. + The injection can be done using the URLInjection Add-on. + """ + + index_filter: Optional[Union[str, flowfilter.TFilter]] + writer: UrlIndexWriter + + OPT_FILEPATH = "URLINDEX_FILEPATH" + OPT_APPEND = "URLINDEX_APPEND" + OPT_INDEX_FILTER = "URLINDEX_FILTER" + + def __init__(self, file_path: Union[str, Path], append: bool = True, + index_filter: Union[str, flowfilter.TFilter] = filter_404, index_format: str = "json"): + """ Initializes the urlindex add-on. + + Args: + file_path: Path to file to which the URL index will be written. Can either be given as str or Path. + append: Bool to decide whether to append new URLs to the given file (as opposed to overwrite the contents + of the file) + index_filer: A mitmproxy filter with which the seen URLs will be filtered before being written. Can either + be given as str or as flowfilter.TFilter + index_format: The format of the URL index, can either be "json" or "text". + """ + + if isinstance(index_filter, str): + self.index_filter = flowfilter.parse(index_filter) + if self.index_filter is None: + raise ValueError("Invalid filter expression.") + else: + self.index_filter = index_filter + + file_path = Path(file_path) + try: + self.writer = WRITER[index_format.lower()](file_path) + except KeyError: + raise ValueError(f"Format '{index_format}' is not supported.") + + if not append and file_path.exists(): + file_path.unlink() + + self.writer.load() + + def response(self, flow: HTTPFlow): + """Checks if the response should be included in the URL based on the index_filter and adds it to the URL index + if appropriate. + """ + if isinstance(self.index_filter, str) or self.index_filter is None: + raise ValueError("Invalid filter expression.") + else: + if self.index_filter(flow): + self.writer.add_url(flow) + + def done(self): + """Writes the URL index.""" + self.writer.save() diff --git a/examples/complex/webscanner_helper/urlinjection.py b/examples/complex/webscanner_helper/urlinjection.py new file mode 100644 index 00000000..b62eca2b --- /dev/null +++ b/examples/complex/webscanner_helper/urlinjection.py @@ -0,0 +1,173 @@ +import abc +import html +import json +import logging + +from mitmproxy import flowfilter +from mitmproxy.http import HTTPFlow + +logger = logging.getLogger(__name__) + + +class InjectionGenerator: + """Abstract class for an generator of the injection content in order to inject the URL index.""" + ENCODING = "UTF8" + + @abc.abstractmethod + def inject(self, index, flow: HTTPFlow): + """Injects the given URL index into the given flow.""" + pass + + +class HTMLInjection(InjectionGenerator): + """Injects the URL index either by creating a new HTML page or by appending is to an existing page.""" + + def __init__(self, insert: bool = False): + """Initializes the HTMLInjection. + + Args: + insert: boolean to decide whether to insert the URL index to an existing page (True) or to create a new + page containing the URL index. + """ + self.insert = insert + + @classmethod + def _form_html(cls, url): + return f"
" + + @classmethod + def _link_html(cls, url): + return f"link to {url}" + + @classmethod + def index_html(cls, index): + link_htmls = [] + for scheme_netloc, paths in index.items(): + for path, methods in paths.items(): + url = scheme_netloc + path + if "POST" in methods: + link_htmls.append(cls._form_html(url)) + + if "GET" in methods: + link_htmls.append(cls._link_html(url)) + return "".join(link_htmls) + + @classmethod + def landing_page(cls, index): + return ( + "" + + cls.index_html(index) + + "" + ) + + def inject(self, index, flow: HTTPFlow): + if flow.response is not None: + if flow.response.status_code != 404 and not self.insert: + logger.warning( + f"URL '{flow.request.url}' didn't return 404 status, " + f"index page would overwrite valid page.") + elif self.insert: + content = (flow.response + .content + .decode(self.ENCODING, "backslashreplace")) + if "" in content: + content = content.replace("", self.index_html(index) + "") + else: + content += self.index_html(index) + flow.response.content = content.encode(self.ENCODING) + else: + flow.response.content = (self.landing_page(index) + .encode(self.ENCODING)) + + +class RobotsInjection(InjectionGenerator): + """Injects the URL index by creating a new robots.txt including the URLs.""" + + def __init__(self, directive="Allow"): + self.directive = directive + + @classmethod + def robots_txt(cls, index, directive="Allow"): + lines = ["User-agent: *"] + for scheme_netloc, paths in index.items(): + for path, methods in paths.items(): + lines.append(directive + ": " + path) + return "\n".join(lines) + + def inject(self, index, flow: HTTPFlow): + if flow.response is not None: + if flow.response.status_code != 404: + logger.warning( + f"URL '{flow.request.url}' didn't return 404 status, " + f"index page would overwrite valid page.") + else: + flow.response.content = self.robots_txt(index, + self.directive).encode( + self.ENCODING) + + +class SitemapInjection(InjectionGenerator): + """Injects the URL index by creating a new sitemap including the URLs.""" + + @classmethod + def sitemap(cls, index): + lines = [ + ""] + for scheme_netloc, paths in index.items(): + for path, methods in paths.items(): + url = scheme_netloc + path + lines.append(f"{html.escape(url)}") + lines.append("") + return "\n".join(lines) + + def inject(self, index, flow: HTTPFlow): + if flow.response is not None: + if flow.response.status_code != 404: + logger.warning( + f"URL '{flow.request.url}' didn't return 404 status, " + f"index page would overwrite valid page.") + else: + flow.response.content = self.sitemap(index).encode(self.ENCODING) + + +class UrlInjectionAddon: + """ The UrlInjection add-on can be used in combination with web application scanners to improve their crawling + performance. + + The given URls will be injected into the web application. With this, web application scanners can find pages to + crawl much easier. Depending on the Injection generator, the URLs will be injected at different places of the + web application. It is possible to create a landing page which includes the URL (HTMLInjection()), to inject the + URLs to an existing page (HTMLInjection(insert=True)), to create a robots.txt containing the URLs + (RobotsInjection()) or to create a sitemap.xml which includes the URLS (SitemapInjection()). + It is necessary that the web application scanner can find the newly created page containing the URL index. For + example, the newly created page can be set as starting point for the web application scanner. + The URL index needed for the injection can be generated by the UrlIndex Add-on. + """ + + def __init__(self, flt: str, url_index_file: str, + injection_gen: InjectionGenerator): + """Initializes the UrlIndex add-on. + + Args: + flt: mitmproxy filter to decide on which pages the URLs will be injected (str). + url_index_file: Path to the file which includes the URL index in JSON format (e.g. generated by the UrlIndexAddon), given + as str. + injection_gen: InjectionGenerator that should be used to inject the URLs into the web application. + """ + self.name = f"{self.__class__.__name__}-{injection_gen.__class__.__name__}-{self.__hash__()}" + self.flt = flowfilter.parse(flt) + self.injection_gen = injection_gen + with open(url_index_file, "r") as f: + self.url_store = json.load(f) + + def response(self, flow: HTTPFlow): + """Checks if the response matches the filter and such should be injected. + Injects the URL index if appropriate. + """ + if flow.response is not None: + if self.flt is not None and self.flt(flow): + self.injection_gen.inject(self.url_store, flow) + flow.response.status_code = 200 + flow.response.headers["content-type"] = "text/html" + logger.debug(f"Set status code to 200 and set content to logged " + f"urls. Method: {self.injection_gen}") diff --git a/examples/complex/webscanner_helper/watchdog.py b/examples/complex/webscanner_helper/watchdog.py new file mode 100644 index 00000000..867d2196 --- /dev/null +++ b/examples/complex/webscanner_helper/watchdog.py @@ -0,0 +1,71 @@ +import pathlib +import time +import typing +import logging +from datetime import datetime + +import mitmproxy.connections +import mitmproxy.http +from mitmproxy.addons.export import curl_command, raw +from mitmproxy.exceptions import HttpSyntaxException + +logger = logging.getLogger(__name__) + + +class WatchdogAddon(): + """ The Watchdog Add-on can be used in combination with web application scanners in oder to check if the device + under test responds correctls to the scanner's responses. + + The Watchdog Add-on checks if the device under test responds correctly to the scanner's responses. + If the Watchdog sees that the DUT is no longer responding correctly, an multiprocessing event is set. + This information can be used to restart the device under test if necessary. + """ + + def __init__(self, event, outdir: pathlib.Path, timeout=None): + """Initializes the Watchdog. + + Args: + event: multiprocessing.Event that will be set if the watchdog is triggered. + outdir: path to a directory in which the triggering requests will be saved (curl and raw). + timeout_conn: float that specifies the timeout for the server connection + """ + self.error_event = event + self.flow_dir = outdir + if self.flow_dir.exists() and not self.flow_dir.is_dir(): + raise RuntimeError("Watchtdog output path must be a directory.") + elif not self.flow_dir.exists(): + self.flow_dir.mkdir(parents=True) + self.last_trigger: typing.Union[None, float] = None + self.timeout: typing.Union[None, float] = timeout + + def serverconnect(self, conn: mitmproxy.connections.ServerConnection): + if self.timeout is not None: + conn.settimeout(self.timeout) + + @classmethod + def not_in_timeout(cls, last_triggered, timeout): + """Checks if current error lies not in timeout after last trigger (potential reset of connection).""" + return last_triggered is None or timeout is None or (time.time() - last_triggered > timeout) + + def error(self, flow): + """ Checks if the watchdog will be triggered. + + Only triggers watchdog for timeouts after last reset and if flow.error is set (shows that error is a server + error). Ignores HttpSyntaxException Errors since this can be triggered on purpose by web application scanner. + + Args: + flow: mitmproxy.http.flow + """ + if (self.not_in_timeout(self.last_trigger, self.timeout) + and flow.error is not None and not isinstance(flow.error, HttpSyntaxException)): + + self.last_trigger = time.time() + logger.error(f"Watchdog triggered! Cause: {flow}") + self.error_event.set() + + # save the request which might have caused the problem + if flow.request: + with (self.flow_dir / f"{datetime.utcnow().isoformat()}.curl").open("w") as f: + f.write(curl_command(flow)) + with (self.flow_dir / f"{datetime.utcnow().isoformat()}.raw").open("wb") as f: + f.write(raw(flow)) -- cgit v1.2.3