aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authoranneborcherding <55282902+anneborcherding@users.noreply.github.com>2020-05-04 10:37:13 +0200
committerGitHub <noreply@github.com>2020-05-04 10:37:13 +0200
commit7fdcbb09e6034ab1f76724965cfdf45f3d775129 (patch)
tree9adaa530173c70d374680a510402b958ad669277 /test
parentf4aa3ee11c01d5b8f260e57bfd7e084b7767c08e (diff)
downloadmitmproxy-7fdcbb09e6034ab1f76724965cfdf45f3d775129.tar.gz
mitmproxy-7fdcbb09e6034ab1f76724965cfdf45f3d775129.tar.bz2
mitmproxy-7fdcbb09e6034ab1f76724965cfdf45f3d775129.zip
added add-ons that enhance the performance of web application scanners. (#3961)
* added add-ons that enhance the performance of web application scanners. Co-authored-by: weichweich <14820950+weichweich@users.noreply.github.com>
Diffstat (limited to 'test')
-rw-r--r--test/examples/webscanner_helper/__init__.py0
-rw-r--r--test/examples/webscanner_helper/test_mapping.py165
-rw-r--r--test/examples/webscanner_helper/test_urldict.py89
-rw-r--r--test/examples/webscanner_helper/test_urlindex.py234
-rw-r--r--test/examples/webscanner_helper/test_urlinjection.py111
-rw-r--r--test/examples/webscanner_helper/test_watchdog.py84
6 files changed, 683 insertions, 0 deletions
diff --git a/test/examples/webscanner_helper/__init__.py b/test/examples/webscanner_helper/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/test/examples/webscanner_helper/__init__.py
diff --git a/test/examples/webscanner_helper/test_mapping.py b/test/examples/webscanner_helper/test_mapping.py
new file mode 100644
index 00000000..e4d519fc
--- /dev/null
+++ b/test/examples/webscanner_helper/test_mapping.py
@@ -0,0 +1,165 @@
+from typing import TextIO, Callable
+from unittest import mock
+from unittest.mock import MagicMock
+
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+
+from examples.complex.webscanner_helper.mapping import MappingAddon, MappingAddonConfig
+
+
+class TestConfig:
+
+ def test_config(self):
+ assert MappingAddonConfig.HTML_PARSER == "html.parser"
+
+
+url = "http://10.10.10.10"
+new_content = "My Text"
+mapping_content = f'{{"{url}": {{"body": "{new_content}"}}}}'
+
+
+class TestMappingAddon:
+
+ def test_init(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ assert "My Text" in str(mapping.mapping_templates._dump())
+
+ def test_load(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ loader = MagicMock()
+
+ mapping.load(loader)
+ assert 'mapping_file' in str(loader.add_option.call_args_list)
+ assert 'map_persistent' in str(loader.add_option.call_args_list)
+
+ def test_configure(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ new_filename = "My new filename"
+ updated = {str(mapping.OPT_MAPPING_FILE): new_filename, str(mapping.OPT_MAP_PERSISTENT): True}
+
+ open_mock = mock.mock_open(read_data="{}")
+ with mock.patch("builtins.open", open_mock):
+ mapping.configure(updated)
+ assert new_filename in str(open_mock.mock_calls)
+ assert mapping.filename == new_filename
+ assert mapping.persistent
+
+ def test_response_filtered(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ test_content = b"Test"
+ f.response.content = test_content
+
+ mapping.response(f)
+ assert f.response.content == test_content
+
+ def test_response(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ test_content = b"<body> Test </body>"
+ f.response.content = test_content
+ f.request.url = url
+
+ mapping.response(f)
+ assert f.response.content.decode("utf-8") == new_content
+
+ def test_response_content_type(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ test_content = b"<body> Test </body>"
+ f.response.content = test_content
+ f.request.url = url
+ f.response.headers.add("content-type", "content-type")
+
+ mapping.response(f)
+ assert f.response.content == test_content
+
+ def test_response_not_existing(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ test_content = b"<title> Test </title>"
+ f.response.content = test_content
+ f.request.url = url
+ mapping.response(f)
+ assert f.response.content == test_content
+
+ def test_persistance_false(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile)
+
+ open_mock = mock.mock_open(read_data="{}")
+ with mock.patch("builtins.open", open_mock):
+ mapping.done()
+ assert len(open_mock.mock_calls) == 0
+
+ def test_persistance_true(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile, persistent=True)
+
+ open_mock = mock.mock_open(read_data="{}")
+ with mock.patch("builtins.open", open_mock):
+ mapping.done()
+ with open(tmpfile, "r") as tfile:
+ results = tfile.read()
+ assert len(open_mock.mock_calls) != 0
+ assert results == mapping_content
+
+ def test_persistance_true_add_content(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(mapping_content)
+ mapping = MappingAddon(tmpfile, persistent=True)
+
+ f = tflow.tflow(resp=tutils.tresp())
+ test_content = b"<title> Test </title>"
+ f.response.content = test_content
+ f.request.url = url
+
+ mapping.response(f)
+ mapping.done()
+ with open(tmpfile, "r") as tfile:
+ results = tfile.read()
+ assert mapping_content in results
+
+ def mock_dump(self, f: TextIO, value_dumper: Callable):
+ assert value_dumper(None) == "None"
+ try:
+ value_dumper("Test")
+ except RuntimeError:
+ assert True
+ else:
+ assert False
+
+ def test_dump(selfself, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("{}")
+ mapping = MappingAddon(tmpfile, persistent=True)
+ with mock.patch('examples.complex.webscanner_helper.urldict.URLDict.dump', selfself.mock_dump):
+ mapping.done()
diff --git a/test/examples/webscanner_helper/test_urldict.py b/test/examples/webscanner_helper/test_urldict.py
new file mode 100644
index 00000000..7bd4fb01
--- /dev/null
+++ b/test/examples/webscanner_helper/test_urldict.py
@@ -0,0 +1,89 @@
+from mitmproxy.test import tflow, tutils
+from examples.complex.webscanner_helper.urldict import URLDict
+
+url = "http://10.10.10.10"
+new_content_body = "New Body"
+new_content_title = "New Title"
+content = f'{{"body": "{new_content_body}", "title": "{new_content_title}"}}'
+url_error = "i~nvalid"
+input_file_content = f'{{"{url}": {content}}}'
+input_file_content_error = f'{{"{url_error}": {content}}}'
+
+
+class TestUrlDict:
+
+ def test_urldict_empty(self):
+ urldict = URLDict()
+ dump = urldict.dumps()
+ assert dump == '{}'
+
+ def test_urldict_loads(self):
+ urldict = URLDict.loads(input_file_content)
+ dump = urldict.dumps()
+ assert dump == input_file_content
+
+ def test_urldict_set_error(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(input_file_content_error)
+ with open(tmpfile, "r") as tfile:
+ try:
+ URLDict.load(tfile)
+ except ValueError:
+ assert True
+ else:
+ assert False
+
+ def test_urldict_get(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(input_file_content)
+ with open(tmpfile, "r") as tfile:
+ urldict = URLDict.load(tfile)
+
+ f = tflow.tflow(resp=tutils.tresp())
+ f.request.url = url
+ selection = urldict[f]
+ assert "body" in selection[0]
+ assert new_content_body in selection[0]["body"]
+ assert "title" in selection[0]
+ assert new_content_title in selection[0]["title"]
+
+ selection_get = urldict.get(f)
+ assert "body" in selection_get[0]
+ assert new_content_body in selection_get[0]["body"]
+ assert "title" in selection_get[0]
+ assert new_content_title in selection_get[0]["title"]
+
+ try:
+ urldict["body"]
+ except KeyError:
+ assert True
+ else:
+ assert False
+
+ assert urldict.get("body", default="default") == "default"
+
+ def test_urldict_dumps(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(input_file_content)
+ with open(tmpfile, "r") as tfile:
+ urldict = URLDict.load(tfile)
+
+ dump = urldict.dumps()
+ assert dump == input_file_content
+
+ def test_urldict_dump(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ outfile = tmpdir.join("outfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(input_file_content)
+ with open(tmpfile, "r") as tfile:
+ urldict = URLDict.load(tfile)
+ with open(outfile, "w") as ofile:
+ urldict.dump(ofile)
+
+ with open(outfile, "r") as ofile:
+ output = ofile.read()
+ assert output == input_file_content
diff --git a/test/examples/webscanner_helper/test_urlindex.py b/test/examples/webscanner_helper/test_urlindex.py
new file mode 100644
index 00000000..0edd6cc0
--- /dev/null
+++ b/test/examples/webscanner_helper/test_urlindex.py
@@ -0,0 +1,234 @@
+import json
+from json import JSONDecodeError
+from pathlib import Path
+from unittest import mock
+from typing import List
+from unittest.mock import patch
+
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+
+from examples.complex.webscanner_helper.urlindex import UrlIndexWriter, SetEncoder, JSONUrlIndexWriter, TextUrlIndexWriter, WRITER, \
+ filter_404, \
+ UrlIndexAddon
+
+
+class TestBaseClass:
+
+ @patch.multiple(UrlIndexWriter, __abstractmethods__=set())
+ def test_base_class(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ index_writer = UrlIndexWriter(tmpfile)
+ index_writer.load()
+ index_writer.add_url(tflow.tflow())
+ index_writer.save()
+
+
+class TestSetEncoder:
+
+ def test_set_encoder_set(self):
+ test_set = {"foo", "bar", "42"}
+ result = SetEncoder.default(SetEncoder(), test_set)
+ assert isinstance(result, List)
+ assert 'foo' in result
+ assert 'bar' in result
+ assert '42' in result
+
+ def test_set_encoder_str(self):
+ test_str = "test"
+ try:
+ SetEncoder.default(SetEncoder(), test_str)
+ except TypeError:
+ assert True
+ else:
+ assert False
+
+
+class TestJSONUrlIndexWriter:
+
+ def test_load(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(
+ "{\"http://example.com:80\": {\"/\": {\"GET\": [301]}}, \"http://www.example.com:80\": {\"/\": {\"GET\": [302]}}}")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert 'http://example.com:80' in writer.host_urls
+ assert '/' in writer.host_urls['http://example.com:80']
+ assert 'GET' in writer.host_urls['http://example.com:80']['/']
+ assert 301 in writer.host_urls['http://example.com:80']['/']['GET']
+
+ def test_load_empty(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("{}")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert len(writer.host_urls) == 0
+
+ def test_load_nonexisting(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert len(writer.host_urls) == 0
+
+ def test_add(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ url = f"{f.request.scheme}://{f.request.host}:{f.request.port}"
+ writer.add_url(f)
+ assert url in writer.host_urls
+ assert f.request.path in writer.host_urls[url]
+
+ def test_save(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ url = f"{f.request.scheme}://{f.request.host}:{f.request.port}"
+ writer.add_url(f)
+ writer.save()
+
+ with open(tmpfile, "r") as results:
+ try:
+ content = json.load(results)
+ except JSONDecodeError:
+ assert False
+ assert url in content
+
+
+class TestTestUrlIndexWriter:
+ def test_load(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(
+ "2020-04-22T05:41:08.679231 STATUS: 200 METHOD: GET URL:http://example.com")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert True
+
+ def test_load_empty(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("{}")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert True
+
+ def test_load_nonexisting(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert True
+
+ def test_add(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ url = f"{f.request.scheme}://{f.request.host}:{f.request.port}"
+ method = f.request.method
+ code = f.response.status_code
+ writer.add_url(f)
+
+ with open(tmpfile, "r") as results:
+ content = results.read()
+ assert url in content
+ assert method in content
+ assert str(code) in content
+
+ def test_save(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ url = f"{f.request.scheme}://{f.request.host}:{f.request.port}"
+ method = f.request.method
+ code = f.response.status_code
+ writer.add_url(f)
+ writer.save()
+
+ with open(tmpfile, "r") as results:
+ content = results.read()
+ assert url in content
+ assert method in content
+ assert str(code) in content
+
+
+class TestWriter:
+ def test_writer_dict(self):
+ assert "json" in WRITER
+ assert isinstance(WRITER["json"], JSONUrlIndexWriter.__class__)
+ assert "text" in WRITER
+ assert isinstance(WRITER["text"], TextUrlIndexWriter.__class__)
+
+
+class TestFilter:
+ def test_filer_true(self):
+ f = tflow.tflow(resp=tutils.tresp())
+ assert filter_404(f)
+
+ def test_filter_false(self):
+ f = tflow.tflow(resp=tutils.tresp())
+ f.response.status_code = 404
+ assert not filter_404(f)
+
+
+class TestUrlIndexAddon:
+
+ def test_init(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ UrlIndexAddon(tmpfile)
+
+ def test_init_format(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ try:
+ UrlIndexAddon(tmpfile, index_format="test")
+ except ValueError:
+ assert True
+ else:
+ assert False
+
+ def test_init_filter(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ try:
+ UrlIndexAddon(tmpfile, index_filter="i~nvalid")
+ except ValueError:
+ assert True
+ else:
+ assert False
+
+ def test_init_append(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("")
+ url_index = UrlIndexAddon(tmpfile, append=False)
+ f = tflow.tflow(resp=tutils.tresp())
+ with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.add_url'):
+ url_index.response(f)
+ assert not Path(tmpfile).exists()
+
+ def test_response(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ url_index = UrlIndexAddon(tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.add_url') as mock_add_url:
+ url_index.response(f)
+ mock_add_url.assert_called()
+
+ def test_response_None(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ url_index = UrlIndexAddon(tmpfile)
+ url_index.index_filter = None
+ f = tflow.tflow(resp=tutils.tresp())
+ try:
+ url_index.response(f)
+ except ValueError:
+ assert True
+ else:
+ assert False
+
+ def test_done(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ url_index = UrlIndexAddon(tmpfile)
+ with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.save') as mock_save:
+ url_index.done()
+ mock_save.assert_called()
diff --git a/test/examples/webscanner_helper/test_urlinjection.py b/test/examples/webscanner_helper/test_urlinjection.py
new file mode 100644
index 00000000..4b87296d
--- /dev/null
+++ b/test/examples/webscanner_helper/test_urlinjection.py
@@ -0,0 +1,111 @@
+import json
+from unittest import mock
+
+from mitmproxy import flowfilter
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+
+from examples.complex.webscanner_helper.urlinjection import InjectionGenerator, HTMLInjection, RobotsInjection, SitemapInjection, \
+ UrlInjectionAddon, logger
+
+index = json.loads(
+ "{\"http://example.com:80\": {\"/\": {\"GET\": [301]}}, \"http://www.example.com:80\": {\"/test\": {\"POST\": [302]}}}")
+
+
+class TestInjectionGenerator:
+
+ def test_inject(self):
+ f = tflow.tflow(resp=tutils.tresp())
+ injection_generator = InjectionGenerator()
+ injection_generator.inject(index=index, flow=f)
+ assert True
+
+
+class TestHTMLInjection:
+
+ def test_inject_not404(self):
+ html_injection = HTMLInjection()
+ f = tflow.tflow(resp=tutils.tresp())
+
+ with mock.patch.object(logger, 'warning') as mock_warning:
+ html_injection.inject(index, f)
+ assert mock_warning.called
+
+ def test_inject_insert(self):
+ html_injection = HTMLInjection(insert=True)
+ f = tflow.tflow(resp=tutils.tresp())
+ assert "example.com" not in str(f.response.content)
+ html_injection.inject(index, f)
+ assert "example.com" in str(f.response.content)
+
+ def test_inject_insert_body(self):
+ html_injection = HTMLInjection(insert=True)
+ f = tflow.tflow(resp=tutils.tresp())
+ f.response.text = "<body></body>"
+ assert "example.com" not in str(f.response.content)
+ html_injection.inject(index, f)
+ assert "example.com" in str(f.response.content)
+
+ def test_inject_404(self):
+ html_injection = HTMLInjection()
+ f = tflow.tflow(resp=tutils.tresp())
+ f.response.status_code = 404
+ assert "example.com" not in str(f.response.content)
+ html_injection.inject(index, f)
+ assert "example.com" in str(f.response.content)
+
+
+class TestRobotsInjection:
+
+ def test_inject_not404(self):
+ robots_injection = RobotsInjection()
+ f = tflow.tflow(resp=tutils.tresp())
+
+ with mock.patch.object(logger, 'warning') as mock_warning:
+ robots_injection.inject(index, f)
+ assert mock_warning.called
+
+ def test_inject_404(self):
+ robots_injection = RobotsInjection()
+ f = tflow.tflow(resp=tutils.tresp())
+ f.response.status_code = 404
+ assert "Allow: /test" not in str(f.response.content)
+ robots_injection.inject(index, f)
+ assert "Allow: /test" in str(f.response.content)
+
+
+class TestSitemapInjection:
+
+ def test_inject_not404(self):
+ sitemap_injection = SitemapInjection()
+ f = tflow.tflow(resp=tutils.tresp())
+
+ with mock.patch.object(logger, 'warning') as mock_warning:
+ sitemap_injection.inject(index, f)
+ assert mock_warning.called
+
+ def test_inject_404(self):
+ sitemap_injection = SitemapInjection()
+ f = tflow.tflow(resp=tutils.tresp())
+ f.response.status_code = 404
+ assert "<url><loc>http://example.com:80/</loc></url>" not in str(f.response.content)
+ sitemap_injection.inject(index, f)
+ assert "<url><loc>http://example.com:80/</loc></url>" in str(f.response.content)
+
+
+class TestUrlInjectionAddon:
+
+ def test_init(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ json.dump(index, tfile)
+ flt = f"~u .*/site.html$"
+ url_injection = UrlInjectionAddon(f"~u .*/site.html$", tmpfile, HTMLInjection(insert=True))
+ assert "http://example.com:80" in url_injection.url_store
+ fltr = flowfilter.parse(flt)
+ f = tflow.tflow(resp=tutils.tresp())
+ f.request.url = "http://example.com/site.html"
+ assert fltr(f)
+ assert "http://example.com:80" not in str(f.response.content)
+ url_injection.response(f)
+ assert "http://example.com:80" in str(f.response.content)
diff --git a/test/examples/webscanner_helper/test_watchdog.py b/test/examples/webscanner_helper/test_watchdog.py
new file mode 100644
index 00000000..43e59310
--- /dev/null
+++ b/test/examples/webscanner_helper/test_watchdog.py
@@ -0,0 +1,84 @@
+import time
+from pathlib import Path
+from unittest import mock
+
+from mitmproxy.connections import ServerConnection
+from mitmproxy.exceptions import HttpSyntaxException
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+import multiprocessing
+
+from examples.complex.webscanner_helper.watchdog import WatchdogAddon, logger
+
+
+class TestWatchdog:
+
+ def test_init_file(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("")
+ event = multiprocessing.Event()
+ try:
+ WatchdogAddon(event, Path(tmpfile))
+ except RuntimeError:
+ assert True
+ else:
+ assert False
+
+ def test_init_dir(self, tmpdir):
+ event = multiprocessing.Event()
+ mydir = tmpdir.join("mydir")
+ assert not Path(mydir).exists()
+ WatchdogAddon(event, Path(mydir))
+ assert Path(mydir).exists()
+
+ def test_serverconnect(self, tmpdir):
+ event = multiprocessing.Event()
+ w = WatchdogAddon(event, Path(tmpdir), timeout=10)
+ with mock.patch('mitmproxy.connections.ServerConnection.settimeout') as mock_set_timeout:
+ w.serverconnect(ServerConnection("127.0.0.1"))
+ mock_set_timeout.assert_called()
+
+ def test_serverconnect_None(self, tmpdir):
+ event = multiprocessing.Event()
+ w = WatchdogAddon(event, Path(tmpdir))
+ with mock.patch('mitmproxy.connections.ServerConnection.settimeout') as mock_set_timeout:
+ w.serverconnect(ServerConnection("127.0.0.1"))
+ assert not mock_set_timeout.called
+
+ def test_trigger(self, tmpdir):
+ event = multiprocessing.Event()
+ w = WatchdogAddon(event, Path(tmpdir))
+ f = tflow.tflow(resp=tutils.tresp())
+ f.error = "Test Error"
+
+ with mock.patch.object(logger, 'error') as mock_error:
+ open_mock = mock.mock_open()
+ with mock.patch("pathlib.Path.open", open_mock, create=True):
+ w.error(f)
+ mock_error.assert_called()
+ open_mock.assert_called()
+
+ def test_trigger_http_synatx(self, tmpdir):
+ event = multiprocessing.Event()
+ w = WatchdogAddon(event, Path(tmpdir))
+ f = tflow.tflow(resp=tutils.tresp())
+ f.error = HttpSyntaxException()
+ assert isinstance(f.error, HttpSyntaxException)
+
+ with mock.patch.object(logger, 'error') as mock_error:
+ open_mock = mock.mock_open()
+ with mock.patch("pathlib.Path.open", open_mock, create=True):
+ w.error(f)
+ assert not mock_error.called
+ assert not open_mock.called
+
+ def test_timeout(self, tmpdir):
+ event = multiprocessing.Event()
+ w = WatchdogAddon(event, Path(tmpdir))
+
+ assert w.not_in_timeout(None, None)
+ assert w.not_in_timeout(time.time, None)
+ with mock.patch('time.time', return_value=5):
+ assert not w.not_in_timeout(3, 20)
+ assert w.not_in_timeout(3, 1)