diff options
Diffstat (limited to 'test/test_flow.py')
-rw-r--r-- | test/test_flow.py | 130 |
1 files changed, 87 insertions, 43 deletions
diff --git a/test/test_flow.py b/test/test_flow.py index 399c8827..6ed279c2 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -5,6 +5,7 @@ import mock from libmproxy import filt, protocol, controller, utils, tnetstring, flow from libmproxy.protocol.primitives import Error, Flow from libmproxy.protocol.http import decoded, CONTENT_MISSING +from libmproxy.proxy.config import HostMatcher from libmproxy.proxy import ProxyConfig from libmproxy.proxy.server import DummyServer from libmproxy.proxy.connection import ClientConnection @@ -104,7 +105,7 @@ class TestClientPlaybackState: q = Queue.Queue() fm.state.clear() - fm.tick(q) + fm.tick(q, timeout=0) fm.stop_client_playback() assert not fm.client_playback @@ -112,7 +113,7 @@ class TestClientPlaybackState: class TestServerPlaybackState: def test_hash(self): - s = flow.ServerPlaybackState(None, [], False, False) + s = flow.ServerPlaybackState(None, [], False, False, None, False) r = tutils.tflow() r2 = tutils.tflow() @@ -124,7 +125,7 @@ class TestServerPlaybackState: assert s._hash(r) != s._hash(r2) def test_headers(self): - s = flow.ServerPlaybackState(["foo"], [], False, False) + s = flow.ServerPlaybackState(["foo"], [], False, False, None, False) r = tutils.tflow(resp=True) r.request.headers["foo"] = ["bar"] r2 = tutils.tflow(resp=True) @@ -145,7 +146,7 @@ class TestServerPlaybackState: r2 = tutils.tflow(resp=True) r2.request.headers["key"] = ["two"] - s = flow.ServerPlaybackState(None, [r, r2], False, False) + s = flow.ServerPlaybackState(None, [r, r2], False, False, None, False) assert s.count() == 2 assert len(s.fmap.keys()) == 1 @@ -166,28 +167,68 @@ class TestServerPlaybackState: r2 = tutils.tflow(resp=True) r2.request.headers["key"] = ["two"] - s = flow.ServerPlaybackState(None, [r, r2], False, True) + s = flow.ServerPlaybackState(None, [r, r2], False, True, None, False) assert s.count() == 2 s.next_flow(r) assert s.count() == 2 + def test_ignore_params(self): + s = flow.ServerPlaybackState(None, [], False, False, ["param1", "param2"], False) + r = tutils.tflow(resp=True) + r.request.path="/test?param1=1" + r2 = tutils.tflow(resp=True) + r2.request.path="/test" + assert s._hash(r) == s._hash(r2) + r2.request.path="/test?param1=2" + assert s._hash(r) == s._hash(r2) + r2.request.path="/test?param2=1" + assert s._hash(r) == s._hash(r2) + r2.request.path="/test?param3=2" + assert not s._hash(r) == s._hash(r2) + + def test_ignore_content(self): + s = flow.ServerPlaybackState(None, [], False, False, None, False) + r = tutils.tflow(resp=True) + r2 = tutils.tflow(resp=True) + + r.request.content = "foo" + r2.request.content = "foo" + assert s._hash(r) == s._hash(r2) + r2.request.content = "bar" + assert not s._hash(r) == s._hash(r2) + + #now ignoring content + s = flow.ServerPlaybackState(None, [], False, False, None, True) + r = tutils.tflow(resp=True) + r2 = tutils.tflow(resp=True) + r.request.content = "foo" + r2.request.content = "foo" + assert s._hash(r) == s._hash(r2) + r2.request.content = "bar" + assert s._hash(r) == s._hash(r2) + r2.request.content = "" + assert s._hash(r) == s._hash(r2) + r2.request.content = None + assert s._hash(r) == s._hash(r2) + + class TestFlow: def test_copy(self): f = tutils.tflow(resp=True) - a0 = f._get_state() + a0 = f.get_state() f2 = f.copy() - a = f._get_state() - b = f2._get_state() - assert f._get_state() == f2._get_state() + a = f.get_state() + b = f2.get_state() + assert f.get_state() == f2.get_state() assert not f == f2 assert not f is f2 - assert f.request == f2.request + assert f.request.get_state() == f2.request.get_state() assert not f.request is f2.request assert f.request.headers == f2.request.headers assert not f.request.headers is f2.request.headers - assert f.response == f2.response + assert f.response.get_state() == f2.response.get_state() assert not f.response is f2.response f = tutils.tflow(err=True) @@ -196,7 +237,7 @@ class TestFlow: assert not f.request is f2.request assert f.request.headers == f2.request.headers assert not f.request.headers is f2.request.headers - assert f.error == f2.error + assert f.error.get_state() == f2.error.get_state() assert not f.error is f2.error def test_match(self): @@ -230,21 +271,21 @@ class TestFlow: def test_getset_state(self): f = tutils.tflow(resp=True) - state = f._get_state() - assert f._get_state() == protocol.http.HTTPFlow._from_state(state)._get_state() + state = f.get_state() + assert f.get_state() == protocol.http.HTTPFlow.from_state(state).get_state() f.response = None f.error = Error("error") - state = f._get_state() - assert f._get_state() == protocol.http.HTTPFlow._from_state(state)._get_state() + state = f.get_state() + assert f.get_state() == protocol.http.HTTPFlow.from_state(state).get_state() f2 = f.copy() - assert f._get_state() == f2._get_state() + assert f.get_state() == f2.get_state() assert not f == f2 f2.error = Error("e2") assert not f == f2 - f._load_state(f2._get_state()) - assert f._get_state() == f2._get_state() + f.load_state(f2.get_state()) + assert f.get_state() == f2.get_state() def test_kill(self): s = flow.State() @@ -482,7 +523,7 @@ class TestSerialize: assert len(l) == 1 f2 = l[0] - assert f2._get_state() == f._get_state() + assert f2.get_state() == f.get_state() assert f2.request.assemble() == f.request.assemble() def test_load_flows(self): @@ -530,7 +571,7 @@ class TestSerialize: def test_versioncheck(self): f = tutils.tflow() - d = f._get_state() + d = f.get_state() d["version"] = (0, 0) sio = StringIO() tnetstring.dump(d, sio) @@ -553,11 +594,11 @@ class TestFlowMaster: def test_getset_ignore(self): p = mock.Mock() - p.config.ignore = [] + p.config.check_ignore = HostMatcher() fm = flow.FlowMaster(p, flow.State()) - assert not fm.get_ignore() - fm.set_ignore(["^apple\.com:", ":443$"]) - assert fm.get_ignore() + assert not fm.get_ignore_filter() + fm.set_ignore_filter(["^apple\.com:", ":443$"]) + assert fm.get_ignore_filter() def test_replay(self): s = flow.State() @@ -569,6 +610,9 @@ class TestFlowMaster: f.intercepting = True assert "intercepting" in fm.replay_request(f) + f.live = True + assert "live" in fm.replay_request(f) + def test_script_reqerr(self): s = flow.State() fm = flow.FlowMaster(None, s) @@ -649,12 +693,12 @@ class TestFlowMaster: f = tutils.tflow(resp=True) pb = [tutils.tflow(resp=True), f] fm = flow.FlowMaster(None, s) - assert not fm.start_server_playback(pb, False, [], False, False) + assert not fm.start_server_playback(pb, False, [], False, False, None, False) assert not fm.start_client_playback(pb, False) q = Queue.Queue() assert not fm.state.flow_count() - fm.tick(q) + fm.tick(q, 0) assert fm.state.flow_count() f.error = Error("error") @@ -671,18 +715,18 @@ class TestFlowMaster: fm.refresh_server_playback = True assert not fm.do_server_playback(tutils.tflow()) - fm.start_server_playback(pb, False, [], False, False) + fm.start_server_playback(pb, False, [], False, False, None, False) assert fm.do_server_playback(tutils.tflow()) - fm.start_server_playback(pb, False, [], True, False) + fm.start_server_playback(pb, False, [], True, False, None, False) r = tutils.tflow() r.request.content = "gibble" assert not fm.do_server_playback(r) assert fm.do_server_playback(tutils.tflow()) - fm.start_server_playback(pb, False, [], True, False) + fm.start_server_playback(pb, False, [], True, False, None, False) q = Queue.Queue() - fm.tick(q) + fm.tick(q, 0) assert fm.should_exit.is_set() fm.stop_server_playback() @@ -695,7 +739,7 @@ class TestFlowMaster: pb = [f] fm = flow.FlowMaster(None, s) fm.refresh_server_playback = True - fm.start_server_playback(pb, True, [], False, False) + fm.start_server_playback(pb, True, [], False, False, None, False) f = tutils.tflow() f.request.host = "nonexistent" @@ -779,7 +823,7 @@ class TestRequest: assert r.size() == len(r.assemble()) r2 = r.copy() - assert r == r2 + assert r.get_state() == r2.get_state() r.content = None assert r.assemble() @@ -988,7 +1032,7 @@ class TestResponse: assert resp.size() == len(resp.assemble()) resp2 = resp.copy() - assert resp2 == resp + assert resp2.get_state() == resp.get_state() resp.content = None assert resp.assemble() @@ -1131,37 +1175,37 @@ class TestResponse: class TestError: def test_getset_state(self): e = Error("Error") - state = e._get_state() - assert Error._from_state(state) == e + state = e.get_state() + assert Error.from_state(state).get_state() == e.get_state() assert e.copy() e2 = Error("bar") assert not e == e2 - e._load_state(e2._get_state()) - assert e == e2 - + e.load_state(e2.get_state()) + assert e.get_state() == e2.get_state() e3 = e.copy() - assert e3 == e + assert e3.get_state() == e.get_state() class TestClientConnection: def test_state(self): c = tutils.tclient_conn() - assert ClientConnection._from_state(c._get_state()) == c + assert ClientConnection.from_state(c.get_state()).get_state() ==\ + c.get_state() c2 = tutils.tclient_conn() c2.address.address = (c2.address.host, 4242) assert not c == c2 c2.timestamp_start = 42 - c._load_state(c2._get_state()) + c.load_state(c2.get_state()) assert c.timestamp_start == 42 c3 = c.copy() - assert c3 == c + assert c3.get_state() == c.get_state() assert str(c) |