diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/mitmproxy/addons/test_evenstore.py | 32 | ||||
| -rw-r--r-- | test/mitmproxy/data/addonscripts/duplicate_flow.py | 6 | ||||
| -rw-r--r-- | test/mitmproxy/test_web_app.py | 251 | 
3 files changed, 278 insertions, 11 deletions
diff --git a/test/mitmproxy/addons/test_evenstore.py b/test/mitmproxy/addons/test_evenstore.py new file mode 100644 index 00000000..78eb3287 --- /dev/null +++ b/test/mitmproxy/addons/test_evenstore.py @@ -0,0 +1,32 @@ +import mock +from mitmproxy import log +from mitmproxy.addons import eventstore + + +def test_simple(): +    store = eventstore.EventStore() +    assert not store.data + +    sig_add = mock.Mock(spec=lambda: 42) +    sig_refresh = mock.Mock(spec=lambda: 42) +    store.sig_add.connect(sig_add) +    store.sig_refresh.connect(sig_refresh) + +    assert not sig_add.called +    assert not sig_refresh.called + +    # test .log() +    store.log(log.LogEntry("test", "info")) +    assert store.data + +    assert sig_add.called +    assert not sig_refresh.called + +    # test .clear() +    sig_add.reset_mock() + +    store.clear() +    assert not store.data + +    assert not sig_add.called +    assert sig_refresh.called diff --git a/test/mitmproxy/data/addonscripts/duplicate_flow.py b/test/mitmproxy/data/addonscripts/duplicate_flow.py deleted file mode 100644 index 02fb8dce..00000000 --- a/test/mitmproxy/data/addonscripts/duplicate_flow.py +++ /dev/null @@ -1,6 +0,0 @@ -from mitmproxy import ctx - - -def request(flow): -    f = ctx.master.state.duplicate_flow(flow) -    ctx.master.replay_request(f, block=True) diff --git a/test/mitmproxy/test_web_app.py b/test/mitmproxy/test_web_app.py index 8fc3378a..be195528 100644 --- a/test/mitmproxy/test_web_app.py +++ b/test/mitmproxy/test_web_app.py @@ -1,15 +1,47 @@ -import tornado.testing +import json as _json +import mock +import tornado.testing +from mitmproxy import exceptions  from mitmproxy import proxy +from mitmproxy.test import tflow  from mitmproxy.tools.web import app  from mitmproxy.tools.web import master as webmaster +from tornado import httpclient +from tornado import websocket + + +def json(resp: httpclient.HTTPResponse): +    return _json.loads(resp.body.decode())  class TestApp(tornado.testing.AsyncHTTPTestCase):      def get_app(self):          o = webmaster.Options()          m = webmaster.WebMaster(o, proxy.DummyServer()) -        return app.Application(m, None, None) +        f = tflow.tflow(resp=True) +        f.id = "42" +        m.view.add(f) +        m.view.add(tflow.tflow(err=True)) +        m.add_log("test log", "info") +        self.master = m +        self.view = m.view +        self.events = m.events +        webapp = app.Application(m, None) +        webapp.settings["xsrf_cookies"] = False +        return webapp + +    def fetch(self, *args, **kwargs) -> httpclient.HTTPResponse: +        # tornado disallows POST without content by default. +        return super().fetch(*args, **kwargs, allow_nonstandard_methods=True) + +    def put_json(self, url, data: dict) -> httpclient.HTTPResponse: +        return self.fetch( +            url, +            method="PUT", +            body=_json.dumps(data), +            headers={"Content-Type": "application/json"}, +        )      def test_index(self):          assert self.fetch("/").code == 200 @@ -17,8 +49,217 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):      def test_filter_help(self):          assert self.fetch("/filter-help").code == 200 +    def test_flows(self): +        resp = self.fetch("/flows") +        assert resp.code == 200 +        assert json(resp)[0]["request"]["contentHash"] +        assert json(resp)[1]["error"] + +    def test_flows_dump(self): +        resp = self.fetch("/flows/dump") +        assert b"address" in resp.body + +        self.view.clear() +        assert not len(self.view) + +        assert self.fetch("/flows/dump", method="POST", body=resp.body).code == 200 +        assert len(self.view) + +    def test_clear(self): +        events = self.events.data.copy() +        flows = list(self.view) + +        assert self.fetch("/clear", method="POST").code == 200 + +        assert not len(self.view) +        assert not len(self.events.data) + +        # restore +        for f in flows: +            self.view.add(f) +        self.events.data = events + +    def test_accept(self): +        for f in self.view: +            f.reply.handle() +            f.intercept(self.master) + +        assert self.fetch( +            "/flows/42/accept", method="POST").code == 200 +        assert sum(f.intercepted for f in self.view) == 1 +        assert self.fetch("/flows/accept", method="POST").code == 200 +        assert all(not f.intercepted for f in self.view) + +    def test_flow_delete(self): +        f = self.view.get_by_id("42") +        assert f + +        f.reply.handle() +        assert self.fetch("/flows/42", method="DELETE").code == 200 + +        assert not self.view.get_by_id("42") +        self.view.add(f) + +        assert self.fetch("/flows/1234", method="DELETE").code == 404 + +    def test_flow_update(self): +        f = self.view.get_by_id("42") +        assert f.request.method == "GET" +        f.backup() + +        upd = { +            "request": { +                "method": "PATCH", +                "port": 123, +                "headers": [("foo", "bar")], +                "content": "req", +            }, +            "response": { +                "msg": "Not Found", +                "code": 404, +                "headers": [("bar", "baz")], +                "content": "resp", +            } +        } +        assert self.put_json("/flows/42", upd).code == 200 +        assert f.request.method == "PATCH" +        assert f.request.port == 123 +        assert f.request.headers["foo"] == "bar" +        assert f.request.text == "req" +        assert f.response.msg == "Not Found" +        assert f.response.status_code == 404 +        assert f.response.headers["bar"] == "baz" +        assert f.response.text == "resp" + +        f.revert() + +        assert self.put_json("/flows/42", {"foo": 42}).code == 400 +        assert self.put_json("/flows/42", {"request": {"foo": 42}}).code == 400 +        assert self.put_json("/flows/42", {"response": {"foo": 42}}).code == 400 +        assert self.fetch("/flows/42", method="PUT", body="{}").code == 400 +        assert self.fetch( +            "/flows/42", +            method="PUT", +            headers={"Content-Type": "application/json"}, +            body="!!" +        ).code == 400 + +    def test_flow_duplicate(self): +        resp = self.fetch("/flows/42/duplicate", method="POST") +        assert resp.code == 200 +        f = self.view.get_by_id(resp.body.decode()) +        assert f +        assert f.id != "42" +        self.view.remove(f) + +    def test_flow_revert(self): +        f = self.view.get_by_id("42") +        f.backup() +        f.request.method = "PATCH" +        self.fetch("/flows/42/revert", method="POST") +        assert not f._backup + +    def test_flow_replay(self): +        with mock.patch("mitmproxy.master.Master.replay_request") as replay_request: +            assert self.fetch("/flows/42/replay", method="POST").code == 200 +            assert replay_request.called +            replay_request.side_effect = exceptions.ReplayException( +                "out of replays" +            ) +            assert self.fetch("/flows/42/replay", method="POST").code == 400 + +    def test_flow_content(self): +        f = self.view.get_by_id("42") +        f.backup() +        f.response.headers["Content-Encoding"] = "ran\x00dom" +        f.response.headers["Content-Disposition"] = 'inline; filename="filename.jpg"' + +        r = self.fetch("/flows/42/response/content") +        assert r.body == b"message" +        assert r.headers["Content-Encoding"] == "random" +        assert r.headers["Content-Disposition"] == 'attachment; filename="filename.jpg"' + +        del f.response.headers["Content-Disposition"] +        f.request.path = "/foo/bar.jpg" +        assert self.fetch( +            "/flows/42/response/content" +        ).headers["Content-Disposition"] == 'attachment; filename=bar.jpg' + +        f.response.content = b"" +        assert self.fetch("/flows/42/response/content").code == 400 + +        f.revert() + +    def test_update_flow_content(self): +        assert self.fetch( +            "/flows/42/request/content", +            method="POST", +            body="new" +        ).code == 200 +        f = self.view.get_by_id("42") +        assert f.request.content == b"new" +        assert f.modified() +        f.revert() + +    def test_update_flow_content_multipart(self): +        body = ( +            b'--somefancyboundary\r\n' +            b'Content-Disposition: form-data; name="a"; filename="a.txt"\r\n' +            b'\r\n' +            b'such multipart. very wow.\r\n' +            b'--somefancyboundary--\r\n' +        ) +        assert self.fetch( +            "/flows/42/request/content", +            method="POST", +            headers={"Content-Type": 'multipart/form-data; boundary="somefancyboundary"'}, +            body=body +        ).code == 200 +        f = self.view.get_by_id("42") +        assert f.request.content == b"such multipart. very wow." +        assert f.modified() +        f.revert() + +    def test_flow_content_view(self): +        assert json(self.fetch("/flows/42/request/content/raw")) == { +            "lines": [ +                [["text", "content"]] +            ], +            "description": "Raw" +        } +      def test_events(self): -        assert self.fetch("/events").code == 200 +        resp = self.fetch("/events") +        assert resp.code == 200 +        assert json(resp)[0]["level"] == "info" -    def test_flows(self): -        assert self.fetch("/flows").code == 200 +    def test_settings(self): +        assert json(self.fetch("/settings"))["mode"] == "regular" + +    def test_settings_update(self): +        assert self.put_json("/settings", {"anticache": True}).code == 200 +        assert self.put_json("/settings", {"wtf": True}).code == 400 + +    def test_err(self): +        with mock.patch("mitmproxy.tools.web.app.IndexHandler.get") as f: +            f.side_effect = RuntimeError +            assert self.fetch("/").code == 500 + +    @tornado.testing.gen_test +    def test_websocket(self): +        ws_url = "ws://localhost:{}/updates".format(self.get_http_port()) + +        ws_client = yield websocket.websocket_connect(ws_url) +        self.master.options.anticomp = True + +        response = yield ws_client.read_message() +        assert _json.loads(response) == { +            "resource": "settings", +            "cmd": "update", +            "data": {"anticomp": True}, +        } +        ws_client.close() + +        # trigger on_close by opening a second connection. +        ws_client2 = yield websocket.websocket_connect(ws_url) +        ws_client2.close()  | 
