diff options
Diffstat (limited to 'test/mitmproxy/addons/test_clientplayback.py')
-rw-r--r-- | test/mitmproxy/addons/test_clientplayback.py | 72 |
1 files changed, 33 insertions, 39 deletions
diff --git a/test/mitmproxy/addons/test_clientplayback.py b/test/mitmproxy/addons/test_clientplayback.py index 0bb24e87..a63bec53 100644 --- a/test/mitmproxy/addons/test_clientplayback.py +++ b/test/mitmproxy/addons/test_clientplayback.py @@ -92,47 +92,13 @@ class TestClientPlayback: # assert rt.f.request.http_version == "HTTP/1.1" # assert ":authority" not in rt.f.request.headers - def test_playback(self): - cp = clientplayback.ClientPlayback() - with taddons.context(cp) as tctx: - assert cp.count() == 0 - f = tflow.tflow(resp=True) - cp.start_replay([f]) - assert cp.count() == 1 - RP = "mitmproxy.addons.clientplayback.RequestReplayThread" - with mock.patch(RP) as rp: - assert not cp.current_thread - cp.tick() - assert rp.called - assert cp.current_thread - - cp.flows = [] - cp.current_thread.is_alive.return_value = False - assert cp.count() == 1 - cp.tick() - assert cp.count() == 0 - assert tctx.master.has_event("update") - assert tctx.master.has_event("processing_complete") - - cp.current_thread = MockThread() - cp.tick() - assert cp.current_thread is None - - cp.start_replay([f]) - cp.stop_replay() - assert not cp.flows - - df = tflow.DummyFlow(tflow.tclient_conn(), tflow.tserver_conn(), True) - with pytest.raises(exceptions.CommandError, match="Can't replay live flow."): - cp.start_replay([df]) - def test_load_file(self, tmpdir): cp = clientplayback.ClientPlayback() with taddons.context(cp): fpath = str(tmpdir.join("flows")) tdump(fpath, [tflow.tflow(resp=True)]) cp.load_file(fpath) - assert cp.flows + assert cp.count() == 1 with pytest.raises(exceptions.CommandError): cp.load_file("/nonexistent") @@ -141,11 +107,39 @@ class TestClientPlayback: with taddons.context(cp) as tctx: path = str(tmpdir.join("flows")) tdump(path, [tflow.tflow()]) + assert cp.count() == 0 tctx.configure(cp, client_replay=[path]) - cp.configured = False + assert cp.count() == 1 tctx.configure(cp, client_replay=[]) - cp.configured = False - tctx.configure(cp) - cp.configured = False with pytest.raises(exceptions.OptionsError): tctx.configure(cp, client_replay=["nonexistent"]) + + def test_check(self): + cp = clientplayback.ClientPlayback() + with taddons.context(cp): + f = tflow.tflow(resp=True) + f.live = True + assert "live flow" in cp.check(f) + + f = tflow.tflow(resp=True) + f.intercepted = True + assert "intercepted flow" in cp.check(f) + + f = tflow.tflow(resp=True) + f.request = None + assert "missing request" in cp.check(f) + + f = tflow.tflow(resp=True) + f.request.raw_content = None + assert "missing content" in cp.check(f) + + def test_playback(self): + cp = clientplayback.ClientPlayback() + with taddons.context(cp): + assert cp.count() == 0 + f = tflow.tflow(resp=True) + cp.start_replay([f]) + assert cp.count() == 1 + + cp.stop_replay() + assert cp.count() == 0
\ No newline at end of file |