aboutsummaryrefslogtreecommitdiffstats
path: root/examples/complex/webscanner_helper/urlindex.py
blob: db8b1c562203b1b086836bf3f3d3330b0ceb3383 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import abc
import datetime
import json
import logging
from pathlib import Path
from typing import Type, Dict, Union, Optional

from mitmproxy import flowfilter
from mitmproxy.http import HTTPFlow

logger = logging.getLogger(__name__)


class UrlIndexWriter(abc.ABC):
    """Abstract Add-on to write seen URLs.

    For example, these URLs can be injected in a web application to improve the crawling of web application scanners.
    The injection can be done using the URLInjection Add-on.
    """

    def __init__(self, filename: Path):
        """Initializes the UrlIndexWriter.

        Args:
            filename: Path to file to which the URL index will be written.
        """
        self.filepath = filename

    @abc.abstractmethod
    def load(self):
        """Load existing URL index."""
        pass

    @abc.abstractmethod
    def add_url(self, flow: HTTPFlow):
        """Add new URL to URL index."""
        pass

    @abc.abstractmethod
    def save(self):
        pass


class SetEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, set):
            return list(obj)
        return json.JSONEncoder.default(self, obj)


class JSONUrlIndexWriter(UrlIndexWriter):
    """Writes seen URLs as JSON."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.host_urls = {}

    def load(self):
        if self.filepath.exists():
            with self.filepath.open("r") as f:
                self.host_urls = json.load(f)
            for host in self.host_urls.keys():
                for path, methods in self.host_urls[host].items():
                    for method, codes in methods.items():
                        self.host_urls[host][path] = {method: set(codes)}

    def add_url(self, flow: HTTPFlow):
        req = flow.request
        res = flow.response

        if req is not None and res is not None:
            urls = self.host_urls.setdefault(f"{req.scheme}://{req.host}:{req.port}", dict())
            methods = urls.setdefault(req.path, {})
            codes = methods.setdefault(req.method, set())
            codes.add(res.status_code)

    def save(self):
        with self.filepath.open("w") as f:
            json.dump(self.host_urls, f, cls=SetEncoder)


class TextUrlIndexWriter(UrlIndexWriter):
    """Writes seen URLs as text."""

    def load(self):
        pass

    def add_url(self, flow: HTTPFlow):
        res = flow.response
        req = flow.request
        if res is not None and req is not None:
            with self.filepath.open("a+") as f:
                f.write(f"{datetime.datetime.utcnow().isoformat()} STATUS: {res.status_code} METHOD: "
                        f"{req.method} URL:{req.url}\n")

    def save(self):
        pass


WRITER: Dict[str, Type[UrlIndexWriter]] = {
    "json": JSONUrlIndexWriter,
    "text": TextUrlIndexWriter,
}


def filter_404(flow) -> bool:
    """Filters responses with status code 404."""
    return flow.response.status_code != 404


class UrlIndexAddon:
    """Add-on to write seen URLs, either as JSON or as text.

    For example, these URLs can be injected in a web application to improve the crawling of web application scanners.
    The injection can be done using the URLInjection Add-on.
    """

    index_filter: Optional[Union[str, flowfilter.TFilter]]
    writer: UrlIndexWriter

    OPT_FILEPATH = "URLINDEX_FILEPATH"
    OPT_APPEND = "URLINDEX_APPEND"
    OPT_INDEX_FILTER = "URLINDEX_FILTER"

    def __init__(self, file_path: Union[str, Path], append: bool = True,
                 index_filter: Union[str, flowfilter.TFilter] = filter_404, index_format: str = "json"):
        """ Initializes the urlindex add-on.

        Args:
            file_path: Path to file to which the URL index will be written. Can either be given as str or Path.
            append: Bool to decide whether to append new URLs to the given file (as opposed to overwrite the contents
                of the file)
            index_filer: A mitmproxy filter with which the seen URLs will be filtered before being written. Can either
                be given as str or as flowfilter.TFilter
            index_format: The format of the URL index, can either be "json" or "text".
        """

        if isinstance(index_filter, str):
            self.index_filter = flowfilter.parse(index_filter)
            if self.index_filter is None:
                raise ValueError("Invalid filter expression.")
        else:
            self.index_filter = index_filter

        file_path = Path(file_path)
        try:
            self.writer = WRITER[index_format.lower()](file_path)
        except KeyError:
            raise ValueError(f"Format '{index_format}' is not supported.")

        if not append and file_path.exists():
            file_path.unlink()

        self.writer.load()

    def response(self, flow: HTTPFlow):
        """Checks if the response should be included in the URL based on the index_filter and adds it to the URL index
            if appropriate.
        """
        if isinstance(self.index_filter, str) or self.index_filter is None:
            raise ValueError("Invalid filter expression.")
        else:
            if self.index_filter(flow):
                self.writer.add_url(flow)

    def done(self):
        """Writes the URL index."""
        self.writer.save()