aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/mitmproxy/addons/test_clientplayback.py72
-rw-r--r--test/mitmproxy/proxy/protocol/test_http_replay.py1
-rw-r--r--test/mitmproxy/proxy/test_server.py42
-rw-r--r--test/mitmproxy/test_flow.py35
-rw-r--r--test/mitmproxy/tools/web/test_app.py17
5 files changed, 82 insertions, 85 deletions
diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py
index f172af83..0bb24e87 100644
--- a/test/mitmproxy/addons/test_clientplayback.py
+++ b/test/mitmproxy/addons/test_clientplayback.py
@@ -22,6 +22,76 @@ class MockThread():
class TestClientPlayback:
+ # @staticmethod
+ # def wait_until_not_live(flow):
+ # """
+ # Race condition: We don't want to replay the flow while it is still live.
+ # """
+ # s = time.time()
+ # while flow.live:
+ # time.sleep(0.001)
+ # if time.time() - s > 5:
+ # raise RuntimeError("Flow is live for too long.")
+
+ # def test_replay(self):
+ # assert self.pathod("304").status_code == 304
+ # assert len(self.master.state.flows) == 1
+ # l = self.master.state.flows[-1]
+ # assert l.response.status_code == 304
+ # l.request.path = "/p/305"
+ # self.wait_until_not_live(l)
+ # rt = self.master.replay_request(l, block=True)
+ # assert l.response.status_code == 305
+
+ # # Disconnect error
+ # l.request.path = "/p/305:d0"
+ # rt = self.master.replay_request(l, block=True)
+ # assert rt
+ # if isinstance(self, tservers.HTTPUpstreamProxyTest):
+ # assert l.response.status_code == 502
+ # else:
+ # assert l.error
+
+ # # Port error
+ # l.request.port = 1
+ # # In upstream mode, we get a 502 response from the upstream proxy server.
+ # # In upstream mode with ssl, the replay will fail as we cannot establish
+ # # SSL with the upstream proxy.
+ # rt = self.master.replay_request(l, block=True)
+ # assert rt
+ # if isinstance(self, tservers.HTTPUpstreamProxyTest):
+ # assert l.response.status_code == 502
+ # else:
+ # assert l.error
+
+ # def test_replay(self):
+ # opts = options.Options()
+ # fm = master.Master(opts)
+ # f = tflow.tflow(resp=True)
+ # f.request.content = None
+ # with pytest.raises(ReplayException, match="missing"):
+ # fm.replay_request(f)
+
+ # f.request = None
+ # with pytest.raises(ReplayException, match="request"):
+ # fm.replay_request(f)
+
+ # f.intercepted = True
+ # with pytest.raises(ReplayException, match="intercepted"):
+ # fm.replay_request(f)
+
+ # f.live = True
+ # with pytest.raises(ReplayException, match="live"):
+ # fm.replay_request(f)
+
+ # req = tutils.treq(headers=net_http.Headers(((b":authority", b"foo"), (b"header", b"qvalue"), (b"content-length", b"7"))))
+ # f = tflow.tflow(req=req)
+ # f.request.http_version = "HTTP/2.0"
+ # with mock.patch('mitmproxy.proxy.protocol.http_replay.RequestReplayThread.run'):
+ # rt = fm.replay_request(f)
+ # assert rt.f.request.http_version == "HTTP/1.1"
+ # assert ":authority" not in rt.f.request.headers
+
def test_playback(self):
cp = clientplayback.ClientPlayback()
with taddons.context(cp) as tctx:
@@ -29,7 +99,7 @@ class TestClientPlayback:
f = tflow.tflow(resp=True)
cp.start_replay([f])
assert cp.count() == 1
- RP = "mitmproxy.proxy.protocol.http_replay.RequestReplayThread"
+ RP = "mitmproxy.addons.clientplayback.RequestReplayThread"
with mock.patch(RP) as rp:
assert not cp.current_thread
cp.tick()
diff --git a/test/mitmproxy/proxy/protocol/test_http_replay.py b/test/mitmproxy/proxy/protocol/test_http_replay.py
deleted file mode 100644
index 777ab4dd..00000000
--- a/test/mitmproxy/proxy/protocol/test_http_replay.py
+++ /dev/null
@@ -1 +0,0 @@
-# TODO: write tests
diff --git a/test/mitmproxy/proxy/test_server.py b/test/mitmproxy/proxy/test_server.py
index 936414ab..914f9184 100644
--- a/test/mitmproxy/proxy/test_server.py
+++ b/test/mitmproxy/proxy/test_server.py
@@ -31,48 +31,6 @@ class CommonMixin:
def test_large(self):
assert len(self.pathod("200:b@50k").content) == 1024 * 50
- @staticmethod
- def wait_until_not_live(flow):
- """
- Race condition: We don't want to replay the flow while it is still live.
- """
- s = time.time()
- while flow.live:
- time.sleep(0.001)
- if time.time() - s > 5:
- raise RuntimeError("Flow is live for too long.")
-
- def test_replay(self):
- assert self.pathod("304").status_code == 304
- assert len(self.master.state.flows) == 1
- l = self.master.state.flows[-1]
- assert l.response.status_code == 304
- l.request.path = "/p/305"
- self.wait_until_not_live(l)
- rt = self.master.replay_request(l, block=True)
- assert l.response.status_code == 305
-
- # Disconnect error
- l.request.path = "/p/305:d0"
- rt = self.master.replay_request(l, block=True)
- assert rt
- if isinstance(self, tservers.HTTPUpstreamProxyTest):
- assert l.response.status_code == 502
- else:
- assert l.error
-
- # Port error
- l.request.port = 1
- # In upstream mode, we get a 502 response from the upstream proxy server.
- # In upstream mode with ssl, the replay will fail as we cannot establish
- # SSL with the upstream proxy.
- rt = self.master.replay_request(l, block=True)
- assert rt
- if isinstance(self, tservers.HTTPUpstreamProxyTest):
- assert l.response.status_code == 502
- else:
- assert l.error
-
def test_http(self):
f = self.pathod("304")
assert f.status_code == 304
diff --git a/test/mitmproxy/test_flow.py b/test/mitmproxy/test_flow.py
index a6f194a7..4956a1d2 100644
--- a/test/mitmproxy/test_flow.py
+++ b/test/mitmproxy/test_flow.py
@@ -1,17 +1,14 @@
import io
-from unittest import mock
import pytest
-from mitmproxy.test import tflow, tutils, taddons
+from mitmproxy.test import tflow, taddons
import mitmproxy.io
from mitmproxy import flowfilter
from mitmproxy import options
from mitmproxy.io import tnetstring
-from mitmproxy.exceptions import FlowReadException, ReplayException
+from mitmproxy.exceptions import FlowReadException
from mitmproxy import flow
from mitmproxy import http
-from mitmproxy.net import http as net_http
-from mitmproxy import master
from . import tservers
@@ -122,34 +119,6 @@ class TestFlowMaster:
assert s.flows[1].handshake_flow == f.handshake_flow
assert len(s.flows[1].messages) == len(f.messages)
- def test_replay(self):
- opts = options.Options()
- fm = master.Master(opts)
- f = tflow.tflow(resp=True)
- f.request.content = None
- with pytest.raises(ReplayException, match="missing"):
- fm.replay_request(f)
-
- f.request = None
- with pytest.raises(ReplayException, match="request"):
- fm.replay_request(f)
-
- f.intercepted = True
- with pytest.raises(ReplayException, match="intercepted"):
- fm.replay_request(f)
-
- f.live = True
- with pytest.raises(ReplayException, match="live"):
- fm.replay_request(f)
-
- req = tutils.treq(headers=net_http.Headers(((b":authority", b"foo"), (b"header", b"qvalue"), (b"content-length", b"7"))))
- f = tflow.tflow(req=req)
- f.request.http_version = "HTTP/2.0"
- with mock.patch('mitmproxy.proxy.protocol.http_replay.RequestReplayThread.run'):
- rt = fm.replay_request(f)
- assert rt.f.request.http_version == "HTTP/1.1"
- assert ":authority" not in rt.f.request.headers
-
@pytest.mark.asyncio
async def test_all(self):
opts = options.Options(
diff --git a/test/mitmproxy/tools/web/test_app.py b/test/mitmproxy/tools/web/test_app.py
index 00188895..b272861c 100644
--- a/test/mitmproxy/tools/web/test_app.py
+++ b/test/mitmproxy/tools/web/test_app.py
@@ -185,14 +185,15 @@ class TestApp(tornado.testing.AsyncHTTPTestCase):
self.fetch("/flows/42/revert", method="POST")
assert not f._backup
- def test_flow_replay(self):
- with mock.patch("mitmproxy.master.Master.replay_request") as replay_request:
- assert self.fetch("/flows/42/replay", method="POST").code == 200
- assert replay_request.called
- replay_request.side_effect = exceptions.ReplayException(
- "out of replays"
- )
- assert self.fetch("/flows/42/replay", method="POST").code == 400
+ # FIXME
+ # def test_flow_replay(self):
+ # with mock.patch("mitmproxy.master.Master.replay_request") as replay_request:
+ # assert self.fetch("/flows/42/replay", method="POST").code == 200
+ # assert replay_request.called
+ # replay_request.side_effect = exceptions.ReplayException(
+ # "out of replays"
+ # )
+ # assert self.fetch("/flows/42/replay", method="POST").code == 400
def test_flow_content(self):
f = self.view.get_by_id("42")