aboutsummaryrefslogtreecommitdiffstats
path: root/examples/complex/webscanner_helper/urlindex.py
diff options
context:
space:
mode:
Diffstat (limited to 'examples/complex/webscanner_helper/urlindex.py')
-rw-r--r--examples/complex/webscanner_helper/urlindex.py168
1 files changed, 168 insertions, 0 deletions
diff --git a/examples/complex/webscanner_helper/urlindex.py b/examples/complex/webscanner_helper/urlindex.py
new file mode 100644
index 00000000..db8b1c56
--- /dev/null
+++ b/examples/complex/webscanner_helper/urlindex.py
@@ -0,0 +1,168 @@
+import abc
+import datetime
+import json
+import logging
+from pathlib import Path
+from typing import Type, Dict, Union, Optional
+
+from mitmproxy import flowfilter
+from mitmproxy.http import HTTPFlow
+
+logger = logging.getLogger(__name__)
+
+
+class UrlIndexWriter(abc.ABC):
+ """Abstract Add-on to write seen URLs.
+
+ For example, these URLs can be injected in a web application to improve the crawling of web application scanners.
+ The injection can be done using the URLInjection Add-on.
+ """
+
+ def __init__(self, filename: Path):
+ """Initializes the UrlIndexWriter.
+
+ Args:
+ filename: Path to file to which the URL index will be written.
+ """
+ self.filepath = filename
+
+ @abc.abstractmethod
+ def load(self):
+ """Load existing URL index."""
+ pass
+
+ @abc.abstractmethod
+ def add_url(self, flow: HTTPFlow):
+ """Add new URL to URL index."""
+ pass
+
+ @abc.abstractmethod
+ def save(self):
+ pass
+
+
+class SetEncoder(json.JSONEncoder):
+ def default(self, obj):
+ if isinstance(obj, set):
+ return list(obj)
+ return json.JSONEncoder.default(self, obj)
+
+
+class JSONUrlIndexWriter(UrlIndexWriter):
+ """Writes seen URLs as JSON."""
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.host_urls = {}
+
+ def load(self):
+ if self.filepath.exists():
+ with self.filepath.open("r") as f:
+ self.host_urls = json.load(f)
+ for host in self.host_urls.keys():
+ for path, methods in self.host_urls[host].items():
+ for method, codes in methods.items():
+ self.host_urls[host][path] = {method: set(codes)}
+
+ def add_url(self, flow: HTTPFlow):
+ req = flow.request
+ res = flow.response
+
+ if req is not None and res is not None:
+ urls = self.host_urls.setdefault(f"{req.scheme}://{req.host}:{req.port}", dict())
+ methods = urls.setdefault(req.path, {})
+ codes = methods.setdefault(req.method, set())
+ codes.add(res.status_code)
+
+ def save(self):
+ with self.filepath.open("w") as f:
+ json.dump(self.host_urls, f, cls=SetEncoder)
+
+
+class TextUrlIndexWriter(UrlIndexWriter):
+ """Writes seen URLs as text."""
+
+ def load(self):
+ pass
+
+ def add_url(self, flow: HTTPFlow):
+ res = flow.response
+ req = flow.request
+ if res is not None and req is not None:
+ with self.filepath.open("a+") as f:
+ f.write(f"{datetime.datetime.utcnow().isoformat()} STATUS: {res.status_code} METHOD: "
+ f"{req.method} URL:{req.url}\n")
+
+ def save(self):
+ pass
+
+
+WRITER: Dict[str, Type[UrlIndexWriter]] = {
+ "json": JSONUrlIndexWriter,
+ "text": TextUrlIndexWriter,
+}
+
+
+def filter_404(flow) -> bool:
+ """Filters responses with status code 404."""
+ return flow.response.status_code != 404
+
+
+class UrlIndexAddon:
+ """Add-on to write seen URLs, either as JSON or as text.
+
+ For example, these URLs can be injected in a web application to improve the crawling of web application scanners.
+ The injection can be done using the URLInjection Add-on.
+ """
+
+ index_filter: Optional[Union[str, flowfilter.TFilter]]
+ writer: UrlIndexWriter
+
+ OPT_FILEPATH = "URLINDEX_FILEPATH"
+ OPT_APPEND = "URLINDEX_APPEND"
+ OPT_INDEX_FILTER = "URLINDEX_FILTER"
+
+ def __init__(self, file_path: Union[str, Path], append: bool = True,
+ index_filter: Union[str, flowfilter.TFilter] = filter_404, index_format: str = "json"):
+ """ Initializes the urlindex add-on.
+
+ Args:
+ file_path: Path to file to which the URL index will be written. Can either be given as str or Path.
+ append: Bool to decide whether to append new URLs to the given file (as opposed to overwrite the contents
+ of the file)
+ index_filer: A mitmproxy filter with which the seen URLs will be filtered before being written. Can either
+ be given as str or as flowfilter.TFilter
+ index_format: The format of the URL index, can either be "json" or "text".
+ """
+
+ if isinstance(index_filter, str):
+ self.index_filter = flowfilter.parse(index_filter)
+ if self.index_filter is None:
+ raise ValueError("Invalid filter expression.")
+ else:
+ self.index_filter = index_filter
+
+ file_path = Path(file_path)
+ try:
+ self.writer = WRITER[index_format.lower()](file_path)
+ except KeyError:
+ raise ValueError(f"Format '{index_format}' is not supported.")
+
+ if not append and file_path.exists():
+ file_path.unlink()
+
+ self.writer.load()
+
+ def response(self, flow: HTTPFlow):
+ """Checks if the response should be included in the URL based on the index_filter and adds it to the URL index
+ if appropriate.
+ """
+ if isinstance(self.index_filter, str) or self.index_filter is None:
+ raise ValueError("Invalid filter expression.")
+ else:
+ if self.index_filter(flow):
+ self.writer.add_url(flow)
+
+ def done(self):
+ """Writes the URL index."""
+ self.writer.save()