aboutsummaryrefslogtreecommitdiffstats
path: root/test/examples/webscanner_helper/test_urlindex.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/examples/webscanner_helper/test_urlindex.py')
-rw-r--r--test/examples/webscanner_helper/test_urlindex.py234
1 files changed, 234 insertions, 0 deletions
diff --git a/test/examples/webscanner_helper/test_urlindex.py b/test/examples/webscanner_helper/test_urlindex.py
new file mode 100644
index 00000000..0edd6cc0
--- /dev/null
+++ b/test/examples/webscanner_helper/test_urlindex.py
@@ -0,0 +1,234 @@
+import json
+from json import JSONDecodeError
+from pathlib import Path
+from unittest import mock
+from typing import List
+from unittest.mock import patch
+
+from mitmproxy.test import tflow
+from mitmproxy.test import tutils
+
+from examples.complex.webscanner_helper.urlindex import UrlIndexWriter, SetEncoder, JSONUrlIndexWriter, TextUrlIndexWriter, WRITER, \
+ filter_404, \
+ UrlIndexAddon
+
+
+class TestBaseClass:
+
+ @patch.multiple(UrlIndexWriter, __abstractmethods__=set())
+ def test_base_class(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ index_writer = UrlIndexWriter(tmpfile)
+ index_writer.load()
+ index_writer.add_url(tflow.tflow())
+ index_writer.save()
+
+
+class TestSetEncoder:
+
+ def test_set_encoder_set(self):
+ test_set = {"foo", "bar", "42"}
+ result = SetEncoder.default(SetEncoder(), test_set)
+ assert isinstance(result, List)
+ assert 'foo' in result
+ assert 'bar' in result
+ assert '42' in result
+
+ def test_set_encoder_str(self):
+ test_str = "test"
+ try:
+ SetEncoder.default(SetEncoder(), test_str)
+ except TypeError:
+ assert True
+ else:
+ assert False
+
+
+class TestJSONUrlIndexWriter:
+
+ def test_load(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(
+ "{\"http://example.com:80\": {\"/\": {\"GET\": [301]}}, \"http://www.example.com:80\": {\"/\": {\"GET\": [302]}}}")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert 'http://example.com:80' in writer.host_urls
+ assert '/' in writer.host_urls['http://example.com:80']
+ assert 'GET' in writer.host_urls['http://example.com:80']['/']
+ assert 301 in writer.host_urls['http://example.com:80']['/']['GET']
+
+ def test_load_empty(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("{}")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert len(writer.host_urls) == 0
+
+ def test_load_nonexisting(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert len(writer.host_urls) == 0
+
+ def test_add(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ url = f"{f.request.scheme}://{f.request.host}:{f.request.port}"
+ writer.add_url(f)
+ assert url in writer.host_urls
+ assert f.request.path in writer.host_urls[url]
+
+ def test_save(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = JSONUrlIndexWriter(filename=tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ url = f"{f.request.scheme}://{f.request.host}:{f.request.port}"
+ writer.add_url(f)
+ writer.save()
+
+ with open(tmpfile, "r") as results:
+ try:
+ content = json.load(results)
+ except JSONDecodeError:
+ assert False
+ assert url in content
+
+
+class TestTestUrlIndexWriter:
+ def test_load(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write(
+ "2020-04-22T05:41:08.679231 STATUS: 200 METHOD: GET URL:http://example.com")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert True
+
+ def test_load_empty(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("{}")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert True
+
+ def test_load_nonexisting(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ writer.load()
+ assert True
+
+ def test_add(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ url = f"{f.request.scheme}://{f.request.host}:{f.request.port}"
+ method = f.request.method
+ code = f.response.status_code
+ writer.add_url(f)
+
+ with open(tmpfile, "r") as results:
+ content = results.read()
+ assert url in content
+ assert method in content
+ assert str(code) in content
+
+ def test_save(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ writer = TextUrlIndexWriter(filename=tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ url = f"{f.request.scheme}://{f.request.host}:{f.request.port}"
+ method = f.request.method
+ code = f.response.status_code
+ writer.add_url(f)
+ writer.save()
+
+ with open(tmpfile, "r") as results:
+ content = results.read()
+ assert url in content
+ assert method in content
+ assert str(code) in content
+
+
+class TestWriter:
+ def test_writer_dict(self):
+ assert "json" in WRITER
+ assert isinstance(WRITER["json"], JSONUrlIndexWriter.__class__)
+ assert "text" in WRITER
+ assert isinstance(WRITER["text"], TextUrlIndexWriter.__class__)
+
+
+class TestFilter:
+ def test_filer_true(self):
+ f = tflow.tflow(resp=tutils.tresp())
+ assert filter_404(f)
+
+ def test_filter_false(self):
+ f = tflow.tflow(resp=tutils.tresp())
+ f.response.status_code = 404
+ assert not filter_404(f)
+
+
+class TestUrlIndexAddon:
+
+ def test_init(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ UrlIndexAddon(tmpfile)
+
+ def test_init_format(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ try:
+ UrlIndexAddon(tmpfile, index_format="test")
+ except ValueError:
+ assert True
+ else:
+ assert False
+
+ def test_init_filter(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ try:
+ UrlIndexAddon(tmpfile, index_filter="i~nvalid")
+ except ValueError:
+ assert True
+ else:
+ assert False
+
+ def test_init_append(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ with open(tmpfile, "w") as tfile:
+ tfile.write("")
+ url_index = UrlIndexAddon(tmpfile, append=False)
+ f = tflow.tflow(resp=tutils.tresp())
+ with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.add_url'):
+ url_index.response(f)
+ assert not Path(tmpfile).exists()
+
+ def test_response(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ url_index = UrlIndexAddon(tmpfile)
+ f = tflow.tflow(resp=tutils.tresp())
+ with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.add_url') as mock_add_url:
+ url_index.response(f)
+ mock_add_url.assert_called()
+
+ def test_response_None(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ url_index = UrlIndexAddon(tmpfile)
+ url_index.index_filter = None
+ f = tflow.tflow(resp=tutils.tresp())
+ try:
+ url_index.response(f)
+ except ValueError:
+ assert True
+ else:
+ assert False
+
+ def test_done(self, tmpdir):
+ tmpfile = tmpdir.join("tmpfile")
+ url_index = UrlIndexAddon(tmpfile)
+ with mock.patch('examples.complex.webscanner_helper.urlindex.JSONUrlIndexWriter.save') as mock_save:
+ url_index.done()
+ mock_save.assert_called()