From 7fdcbb09e6034ab1f76724965cfdf45f3d775129 Mon Sep 17 00:00:00 2001 From: anneborcherding <55282902+anneborcherding@users.noreply.github.com> Date: Mon, 4 May 2020 10:37:13 +0200 Subject: added add-ons that enhance the performance of web application scanners. (#3961) * added add-ons that enhance the performance of web application scanners. Co-authored-by: weichweich <14820950+weichweich@users.noreply.github.com> --- test/examples/webscanner_helper/__init__.py | 0 test/examples/webscanner_helper/test_mapping.py | 165 +++++++++++++++ test/examples/webscanner_helper/test_urldict.py | 89 ++++++++ test/examples/webscanner_helper/test_urlindex.py | 234 +++++++++++++++++++++ .../webscanner_helper/test_urlinjection.py | 111 ++++++++++ test/examples/webscanner_helper/test_watchdog.py | 84 ++++++++ 6 files changed, 683 insertions(+) create mode 100644 test/examples/webscanner_helper/__init__.py create mode 100644 test/examples/webscanner_helper/test_mapping.py create mode 100644 test/examples/webscanner_helper/test_urldict.py create mode 100644 test/examples/webscanner_helper/test_urlindex.py create mode 100644 test/examples/webscanner_helper/test_urlinjection.py create mode 100644 test/examples/webscanner_helper/test_watchdog.py (limited to 'test') diff --git a/test/examples/webscanner_helper/__init__.py b/test/examples/webscanner_helper/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/test/examples/webscanner_helper/test_mapping.py b/test/examples/webscanner_helper/test_mapping.py new file mode 100644 index 00000000..e4d519fc --- /dev/null +++ b/test/examples/webscanner_helper/test_mapping.py @@ -0,0 +1,165 @@ +from typing import TextIO, Callable +from unittest import mock +from unittest.mock import MagicMock + +from mitmproxy.test import tflow +from mitmproxy.test import tutils + +from examples.complex.webscanner_helper.mapping import MappingAddon, MappingAddonConfig + + +class TestConfig: + + def test_config(self): + assert MappingAddonConfig.HTML_PARSER == "html.parser" + + +url = "http://10.10.10.10" +new_content = "My Text" +mapping_content = f'{{"{url}": {{"body": "{new_content}"}}}}' + + +class TestMappingAddon: + + def test_init(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + assert "My Text" in str(mapping.mapping_templates._dump()) + + def test_load(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + loader = MagicMock() + + mapping.load(loader) + assert 'mapping_file' in str(loader.add_option.call_args_list) + assert 'map_persistent' in str(loader.add_option.call_args_list) + + def test_configure(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + new_filename = "My new filename" + updated = {str(mapping.OPT_MAPPING_FILE): new_filename, str(mapping.OPT_MAP_PERSISTENT): True} + + open_mock = mock.mock_open(read_data="{}") + with mock.patch("builtins.open", open_mock): + mapping.configure(updated) + assert new_filename in str(open_mock.mock_calls) + assert mapping.filename == new_filename + assert mapping.persistent + + def test_response_filtered(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + test_content = b"Test" + f.response.content = test_content + + mapping.response(f) + assert f.response.content == test_content + + def test_response(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + test_content = b" Test " + f.response.content = test_content + f.request.url = url + + mapping.response(f) + assert f.response.content.decode("utf-8") == new_content + + def test_response_content_type(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + test_content = b" Test " + f.response.content = test_content + f.request.url = url + f.response.headers.add("content-type", "content-type") + + mapping.response(f) + assert f.response.content == test_content + + def test_response_not_existing(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + test_content = b" Test " + f.response.content = test_content + f.request.url = url + mapping.response(f) + assert f.response.content == test_content + + def test_persistance_false(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile) + + open_mock = mock.mock_open(read_data="{}") + with mock.patch("builtins.open", open_mock): + mapping.done() + assert len(open_mock.mock_calls) == 0 + + def test_persistance_true(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile, persistent=True) + + open_mock = mock.mock_open(read_data="{}") + with mock.patch("builtins.open", open_mock): + mapping.done() + with open(tmpfile, "r") as tfile: + results = tfile.read() + assert len(open_mock.mock_calls) != 0 + assert results == mapping_content + + def test_persistance_true_add_content(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(mapping_content) + mapping = MappingAddon(tmpfile, persistent=True) + + f = tflow.tflow(resp=tutils.tresp()) + test_content = b" Test " + f.response.content = test_content + f.request.url = url + + mapping.response(f) + mapping.done() + with open(tmpfile, "r") as tfile: + results = tfile.read() + assert mapping_content in results + + def mock_dump(self, f: TextIO, value_dumper: Callable): + assert value_dumper(None) == "None" + try: + value_dumper("Test") + except RuntimeError: + assert True + else: + assert False + + def test_dump(selfself, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write("{}") + mapping = MappingAddon(tmpfile, persistent=True) + with mock.patch('examples.complex.webscanner_helper.urldict.URLDict.dump', selfself.mock_dump): + mapping.done() diff --git a/test/examples/webscanner_helper/test_urldict.py b/test/examples/webscanner_helper/test_urldict.py new file mode 100644 index 00000000..7bd4fb01 --- /dev/null +++ b/test/examples/webscanner_helper/test_urldict.py @@ -0,0 +1,89 @@ +from mitmproxy.test import tflow, tutils +from examples.complex.webscanner_helper.urldict import URLDict + +url = "http://10.10.10.10" +new_content_body = "New Body" +new_content_title = "New Title" +content = f'{{"body": "{new_content_body}", "title": "{new_content_title}"}}' +url_error = "i~nvalid" +input_file_content = f'{{"{url}": {content}}}' +input_file_content_error = f'{{"{url_error}": {content}}}' + + +class TestUrlDict: + + def test_urldict_empty(self): + urldict = URLDict() + dump = urldict.dumps() + assert dump == '{}' + + def test_urldict_loads(self): + urldict = URLDict.loads(input_file_content) + dump = urldict.dumps() + assert dump == input_file_content + + def test_urldict_set_error(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(input_file_content_error) + with open(tmpfile, "r") as tfile: + try: + URLDict.load(tfile) + except ValueError: + assert True + else: + assert False + + def test_urldict_get(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(input_file_content) + with open(tmpfile, "r") as tfile: + urldict = URLDict.load(tfile) + + f = tflow.tflow(resp=tutils.tresp()) + f.request.url = url + selection = urldict[f] + assert "body" in selection[0] + assert new_content_body in selection[0]["body"] + assert "title" in selection[0] + assert new_content_title in selection[0]["title"] + + selection_get = urldict.get(f) + assert "body" in selection_get[0] + assert new_content_body in selection_get[0]["body"] + assert "title" in selection_get[0] + assert new_content_title in selection_get[0]["title"] + + try: + urldict["body"] + except KeyError: + assert True + else: + assert False + + assert urldict.get("body", default="default") == "default" + + def test_urldict_dumps(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write(input_file_content) + with open(tmpfile, "r") as tfile: + urldict = URLDict.load(tfile) + + dump = urldict.dumps() + assert dump == input_file_content + + def test_urldict_dump(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + outfile = tmpdir.join("outfile") + with open(tmpfile, "w") as tfile: + tfile.write(input_file_content) + with open(tmpfile, "r") as tfile: + urldict = URLDict.load(tfile) + with open(outfile, "w") as ofile: + urldict.dump(ofile) + + with open(outfile, "r") as ofile: + output = ofile.read() + assert output == input_file_content diff --git a/test/examples/webscanner_helper/test_urlindex.py b/test/examples/webscanner_helper/test_urlindex.py new file mode 100644 index 00000000..0edd6cc0 --- /dev/null +++ b/test/examples/webscanner_helper/test_urlindex.py @@ -0,0 +1,234 @@ +import json +from json import JSONDecodeError +from pathlib import Path +from unittest import mock +from typing import List +from unittest.mock import patch + +from mitmproxy.test import tflow +from mitmproxy.test import tutils + +from examples.complex.webscanner_helper.urlindex import UrlIndexWriter, SetEncoder, JSONUrlIndexWriter, TextUrlIndexWriter, WRITER, \ + filter_404, \ + UrlIndexAddon + + +class TestBaseClass: + + @patch.multiple(UrlIndexWriter, __abstractmethods__=set()) + def test_base_class(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + index_writer = UrlIndexWriter(tmpfile) + index_writer.load() + index_writer.add_url(tflow.tflow()) + index_writer.save() + + +class TestSetEncoder: + + def test_set_encoder_set(self): + test_set = {"foo", "bar", "42"} + result = SetEncoder.default(SetEncoder(), test_set) + assert isinstance(result, List) + assert 'foo' in result + assert 'bar' in result + assert '42' in result + + def test_set_encoder_str(self): + test_str = "test" + try: + SetEncoder.default(SetEncoder(), test_str) + except TypeError: + assert True + else: + assert False + + +class TestJSONUrlIndexWriter: + + def test_load(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write( + "{\"http://example.com:80\": {\"/\": {\"GET\": [301]}}, \"http://www.example.com:80\": {\"/\": {\"GET\": [302]}}}") + writer = JSONUrlIndexWriter(filename=tmpfile) + writer.load() + assert 'http://example.com:80' in writer.host_urls + assert '/' in writer.host_urls['http://example.com:80'] + assert 'GET' in writer.host_urls['http://example.com:80']['/'] + assert 301 in writer.host_urls['http://example.com:80']['/']['GET'] + + def test_load_empty(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write("{}") + writer = JSONUrlIndexWriter(filename=tmpfile) + writer.load() + assert len(writer.host_urls) == 0 + + def test_load_nonexisting(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + writer = JSONUrlIndexWriter(filename=tmpfile) + writer.load() + assert len(writer.host_urls) == 0 + + def test_add(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + writer = JSONUrlIndexWriter(filename=tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + url = f"{f.request.scheme}://{f.request.host}:{f.request.port}" + writer.add_url(f) + assert url in writer.host_urls + assert f.request.path in writer.host_urls[url] + + def test_save(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + writer = JSONUrlIndexWriter(filename=tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + url = f"{f.request.scheme}://{f.request.host}:{f.request.port}" + writer.add_url(f) + writer.save() + + with open(tmpfile, "r") as results: + try: + content = json.load(results) + except JSONDecodeError: + assert False + assert url in content + + +class TestTestUrlIndexWriter: + def test_load(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write( + "2020-04-22T05:41:08.679231 STATUS: 200 METHOD: GET URL:http://example.com") + writer = TextUrlIndexWriter(filename=tmpfile) + writer.load() + assert True + + def test_load_empty(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write("{}") + writer = TextUrlIndexWriter(filename=tmpfile) + writer.load() + assert True + + def test_load_nonexisting(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + writer = TextUrlIndexWriter(filename=tmpfile) + writer.load() + assert True + + def test_add(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + writer = TextUrlIndexWriter(filename=tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + url = f"{f.request.scheme}://{f.request.host}:{f.request.port}" + method = f.request.method + code = f.response.status_code + writer.add_url(f) + + with open(tmpfile, "r") as results: + content = results.read() + assert url in content + assert method in content + assert str(code) in content + + def test_save(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + writer = TextUrlIndexWriter(filename=tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + url = f"{f.request.scheme}://{f.request.host}:{f.request.port}" + method = f.request.method + code = f.response.status_code + writer.add_url(f) + writer.save() + + with open(tmpfile, "r") as results: + content = results.read() + assert url in content + assert method in content + assert str(code) in content + + +class TestWriter: + def test_writer_dict(self): + assert "json" in WRITER + assert isinstance(WRITER["json"], JSONUrlIndexWriter.__class__) + assert "text" in WRITER + assert isinstance(WRITER["text"], TextUrlIndexWriter.__class__) + + +class TestFilter: + def test_filer_true(self): + f = tflow.tflow(resp=tutils.tresp()) + assert filter_404(f) + + def test_filter_false(self): + f = tflow.tflow(resp=tutils.tresp()) + f.response.status_code = 404 + assert not filter_404(f) + + +class TestUrlIndexAddon: + + def test_init(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + UrlIndexAddon(tmpfile) + + def test_init_format(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + try: + UrlIndexAddon(tmpfile, index_format="test") + except ValueError: + assert True + else: + assert False + + def test_init_filter(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + try: + UrlIndexAddon(tmpfile, index_filter="i~nvalid") + except ValueError: + assert True + else: + assert False + + def test_init_append(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write("") + url_index = UrlIndexAddon(tmpfile, append=False) + f = tflow.tflow(resp=tutils.tresp()) + with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.add_url'): + url_index.response(f) + assert not Path(tmpfile).exists() + + def test_response(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + url_index = UrlIndexAddon(tmpfile) + f = tflow.tflow(resp=tutils.tresp()) + with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.add_url') as mock_add_url: + url_index.response(f) + mock_add_url.assert_called() + + def test_response_None(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + url_index = UrlIndexAddon(tmpfile) + url_index.index_filter = None + f = tflow.tflow(resp=tutils.tresp()) + try: + url_index.response(f) + except ValueError: + assert True + else: + assert False + + def test_done(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + url_index = UrlIndexAddon(tmpfile) + with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.save') as mock_save: + url_index.done() + mock_save.assert_called() diff --git a/test/examples/webscanner_helper/test_urlinjection.py b/test/examples/webscanner_helper/test_urlinjection.py new file mode 100644 index 00000000..4b87296d --- /dev/null +++ b/test/examples/webscanner_helper/test_urlinjection.py @@ -0,0 +1,111 @@ +import json +from unittest import mock + +from mitmproxy import flowfilter +from mitmproxy.test import tflow +from mitmproxy.test import tutils + +from examples.complex.webscanner_helper.urlinjection import InjectionGenerator, HTMLInjection, RobotsInjection, SitemapInjection, \ + UrlInjectionAddon, logger + +index = json.loads( + "{\"http://example.com:80\": {\"/\": {\"GET\": [301]}}, \"http://www.example.com:80\": {\"/test\": {\"POST\": [302]}}}") + + +class TestInjectionGenerator: + + def test_inject(self): + f = tflow.tflow(resp=tutils.tresp()) + injection_generator = InjectionGenerator() + injection_generator.inject(index=index, flow=f) + assert True + + +class TestHTMLInjection: + + def test_inject_not404(self): + html_injection = HTMLInjection() + f = tflow.tflow(resp=tutils.tresp()) + + with mock.patch.object(logger, 'warning') as mock_warning: + html_injection.inject(index, f) + assert mock_warning.called + + def test_inject_insert(self): + html_injection = HTMLInjection(insert=True) + f = tflow.tflow(resp=tutils.tresp()) + assert "example.com" not in str(f.response.content) + html_injection.inject(index, f) + assert "example.com" in str(f.response.content) + + def test_inject_insert_body(self): + html_injection = HTMLInjection(insert=True) + f = tflow.tflow(resp=tutils.tresp()) + f.response.text = "" + assert "example.com" not in str(f.response.content) + html_injection.inject(index, f) + assert "example.com" in str(f.response.content) + + def test_inject_404(self): + html_injection = HTMLInjection() + f = tflow.tflow(resp=tutils.tresp()) + f.response.status_code = 404 + assert "example.com" not in str(f.response.content) + html_injection.inject(index, f) + assert "example.com" in str(f.response.content) + + +class TestRobotsInjection: + + def test_inject_not404(self): + robots_injection = RobotsInjection() + f = tflow.tflow(resp=tutils.tresp()) + + with mock.patch.object(logger, 'warning') as mock_warning: + robots_injection.inject(index, f) + assert mock_warning.called + + def test_inject_404(self): + robots_injection = RobotsInjection() + f = tflow.tflow(resp=tutils.tresp()) + f.response.status_code = 404 + assert "Allow: /test" not in str(f.response.content) + robots_injection.inject(index, f) + assert "Allow: /test" in str(f.response.content) + + +class TestSitemapInjection: + + def test_inject_not404(self): + sitemap_injection = SitemapInjection() + f = tflow.tflow(resp=tutils.tresp()) + + with mock.patch.object(logger, 'warning') as mock_warning: + sitemap_injection.inject(index, f) + assert mock_warning.called + + def test_inject_404(self): + sitemap_injection = SitemapInjection() + f = tflow.tflow(resp=tutils.tresp()) + f.response.status_code = 404 + assert "http://example.com:80/" not in str(f.response.content) + sitemap_injection.inject(index, f) + assert "http://example.com:80/" in str(f.response.content) + + +class TestUrlInjectionAddon: + + def test_init(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + json.dump(index, tfile) + flt = f"~u .*/site.html$" + url_injection = UrlInjectionAddon(f"~u .*/site.html$", tmpfile, HTMLInjection(insert=True)) + assert "http://example.com:80" in url_injection.url_store + fltr = flowfilter.parse(flt) + f = tflow.tflow(resp=tutils.tresp()) + f.request.url = "http://example.com/site.html" + assert fltr(f) + assert "http://example.com:80" not in str(f.response.content) + url_injection.response(f) + assert "http://example.com:80" in str(f.response.content) diff --git a/test/examples/webscanner_helper/test_watchdog.py b/test/examples/webscanner_helper/test_watchdog.py new file mode 100644 index 00000000..43e59310 --- /dev/null +++ b/test/examples/webscanner_helper/test_watchdog.py @@ -0,0 +1,84 @@ +import time +from pathlib import Path +from unittest import mock + +from mitmproxy.connections import ServerConnection +from mitmproxy.exceptions import HttpSyntaxException +from mitmproxy.test import tflow +from mitmproxy.test import tutils +import multiprocessing + +from examples.complex.webscanner_helper.watchdog import WatchdogAddon, logger + + +class TestWatchdog: + + def test_init_file(self, tmpdir): + tmpfile = tmpdir.join("tmpfile") + with open(tmpfile, "w") as tfile: + tfile.write("") + event = multiprocessing.Event() + try: + WatchdogAddon(event, Path(tmpfile)) + except RuntimeError: + assert True + else: + assert False + + def test_init_dir(self, tmpdir): + event = multiprocessing.Event() + mydir = tmpdir.join("mydir") + assert not Path(mydir).exists() + WatchdogAddon(event, Path(mydir)) + assert Path(mydir).exists() + + def test_serverconnect(self, tmpdir): + event = multiprocessing.Event() + w = WatchdogAddon(event, Path(tmpdir), timeout=10) + with mock.patch('mitmproxy.connections.ServerConnection.settimeout') as mock_set_timeout: + w.serverconnect(ServerConnection("127.0.0.1")) + mock_set_timeout.assert_called() + + def test_serverconnect_None(self, tmpdir): + event = multiprocessing.Event() + w = WatchdogAddon(event, Path(tmpdir)) + with mock.patch('mitmproxy.connections.ServerConnection.settimeout') as mock_set_timeout: + w.serverconnect(ServerConnection("127.0.0.1")) + assert not mock_set_timeout.called + + def test_trigger(self, tmpdir): + event = multiprocessing.Event() + w = WatchdogAddon(event, Path(tmpdir)) + f = tflow.tflow(resp=tutils.tresp()) + f.error = "Test Error" + + with mock.patch.object(logger, 'error') as mock_error: + open_mock = mock.mock_open() + with mock.patch("pathlib.Path.open", open_mock, create=True): + w.error(f) + mock_error.assert_called() + open_mock.assert_called() + + def test_trigger_http_synatx(self, tmpdir): + event = multiprocessing.Event() + w = WatchdogAddon(event, Path(tmpdir)) + f = tflow.tflow(resp=tutils.tresp()) + f.error = HttpSyntaxException() + assert isinstance(f.error, HttpSyntaxException) + + with mock.patch.object(logger, 'error') as mock_error: + open_mock = mock.mock_open() + with mock.patch("pathlib.Path.open", open_mock, create=True): + w.error(f) + assert not mock_error.called + assert not open_mock.called + + def test_timeout(self, tmpdir): + event = multiprocessing.Event() + w = WatchdogAddon(event, Path(tmpdir)) + + assert w.not_in_timeout(None, None) + assert w.not_in_timeout(time.time, None) + with mock.patch('time.time', return_value=5): + assert not w.not_in_timeout(3, 20) + assert w.not_in_timeout(3, 1) -- cgit v1.2.3