diff options
Diffstat (limited to 'test/test_flow.py')
-rw-r--r-- | test/test_flow.py | 115 |
1 files changed, 44 insertions, 71 deletions
diff --git a/test/test_flow.py b/test/test_flow.py index 3e111fc1..5ae6c8d6 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -2,7 +2,8 @@ import Queue, time, os.path from cStringIO import StringIO import email.utils from libmproxy import filt, protocol, controller, utils, tnetstring, proxy, flow -from libmproxy.protocol.primitives import Error +from libmproxy.protocol.primitives import Error, Flow +from libmproxy.protocol.http import decoded import tutils @@ -176,7 +177,8 @@ class TestFlow: f2 = f.copy() a = f._get_state() b = f2._get_state() - assert f == f2 + assert f._get_state() == f2._get_state() + assert not f == f2 assert not f is f2 assert f.request == f2.request assert not f.request is f2.request @@ -234,7 +236,8 @@ class TestFlow: assert f._get_state() == protocol.http.HTTPFlow._from_state(state)._get_state() f2 = f.copy() - assert f == f2 + assert f._get_state() == f2._get_state() + assert not f == f2 f2.error = Error("e2") assert not f == f2 f._load_state(f2._get_state()) @@ -340,7 +343,7 @@ class TestState: connect -> request -> response """ - bc = flow.ClientConnect(("address", 22)) + bc = tutils.tclient_conn() c = flow.State() req = tutils.treq(bc) @@ -359,6 +362,7 @@ class TestState: assert c.active_flow_count() == 1 unseen_resp = tutils.tresp() + unseen_resp.flow = None assert not c.add_response(unseen_resp) assert c.active_flow_count() == 1 @@ -370,8 +374,8 @@ class TestState: c = flow.State() req = tutils.treq() f = c.add_request(req) - e = Error("message") - assert c.add_error(e) + f.error = Error("message") + assert c.add_error(f.error) e = Error("message") assert not c.add_error(e) @@ -379,10 +383,9 @@ class TestState: c = flow.State() req = tutils.treq() f = c.add_request(req) - e = Error("message") + e = tutils.terr() c.set_limit("~e") assert not c.view - assert not c.view assert c.add_error(e) assert c.view @@ -460,7 +463,7 @@ class TestState: c.clear() c.load_flows(flows) - assert isinstance(c._flow_list[0], flow.Flow) + assert isinstance(c._flow_list[0], Flow) def test_accept_all(self): c = flow.State() @@ -595,9 +598,7 @@ class TestFlowMaster: #load second script assert not fm.load_script(tutils.test_data.path("scripts/all.py")) assert len(fm.scripts) == 2 - dc = flow.ClientDisconnect(req.flow.client_conn) - dc.reply = controller.DummyReply() - fm.handle_clientdisconnect(dc) + fm.handle_clientdisconnect(sc) assert fm.scripts[0].ns["log"][-1] == "clientdisconnect" assert fm.scripts[1].ns["log"][-1] == "clientdisconnect" @@ -607,7 +608,7 @@ class TestFlowMaster: assert len(fm.scripts) == 0 assert not fm.load_script(tutils.test_data.path("scripts/all.py")) - err = Error("msg") + err = tutils.terr() err.reply = controller.DummyReply() fm.handle_error(err) assert fm.scripts[0].ns["log"][-1] == "error" @@ -621,7 +622,7 @@ class TestFlowMaster: f2 = fm.duplicate_flow(f) assert f2.response assert s.flow_count() == 2 - assert s.index(f2) + assert s.index(f2) == 1 def test_all(self): s = flow.State() @@ -639,15 +640,14 @@ class TestFlowMaster: assert s.flow_count() == 1 rx = tutils.tresp() + rx.flow = None assert not fm.handle_response(rx) - dc = flow.ClientDisconnect(req.flow.client_conn) - dc.reply = controller.DummyReply() - fm.handle_clientdisconnect(dc) + fm.handle_clientdisconnect(req.flow.client_conn) - err = Error("msg") - err.reply = controller.DummyReply() - fm.handle_error(err) + f.error = Error("msg") + f.error.reply = controller.DummyReply() + fm.handle_error(f.error) fm.load_script(tutils.test_data.path("scripts/a.py")) fm.shutdown() @@ -666,9 +666,9 @@ class TestFlowMaster: fm.tick(q) assert fm.state.flow_count() - err = Error("error") - err.reply = controller.DummyReply() - fm.handle_error(err) + f.error = Error("error") + f.error.reply = controller.DummyReply() + fm.handle_error(f.error) def test_server_playback(self): controller.should_exit = False @@ -771,20 +771,16 @@ class TestFlowMaster: assert r()[0].response - tf = tutils.tflow_full() + tf = tutils.tflow() fm.start_stream(file(p, "ab"), None) fm.handle_request(tf.request) fm.shutdown() assert not r()[1].response - class TestRequest: def test_simple(self): - h = flow.ODictCaseless() - h["test"] = ["test"] - c = flow.ClientConnect(("addr", 2222)) - r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + r = tutils.treq() u = r.get_url() assert r.set_url(u) assert not r.set_url("") @@ -799,19 +795,11 @@ class TestRequest: assert r._assemble() assert r.size() == len(r._assemble()) - r.close = True - assert "connection: close" in r._assemble() - - assert r._assemble(True) - r.content = flow.CONTENT_MISSING - assert not r._assemble() + tutils.raises("Cannot assemble flow with CONTENT_MISSING", r._assemble) def test_get_url(self): - h = flow.ODictCaseless() - h["test"] = ["test"] - c = flow.ClientConnect(("addr", 2222)) - r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + r = tutils.tflow().request assert r.get_url() == "https://host:22/" assert r.get_url(hostheader=True) == "https://host:22/" r.headers["Host"] = ["foo.com"] @@ -819,11 +807,10 @@ class TestRequest: assert r.get_url(hostheader=True) == "https://foo.com:22/" def test_path_components(self): - h = flow.ODictCaseless() - c = flow.ClientConnect(("addr", 2222)) - r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") + r = tutils.treq() + r.path = "/" assert r.get_path_components() == [] - r = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/foo/bar", h, "content") + r.path = "/foo/bar" assert r.get_path_components() == ["foo", "bar"] q = flow.ODict() q["test"] = ["123"] @@ -839,10 +826,9 @@ class TestRequest: assert "%2F" in r.path def test_getset_form_urlencoded(self): - h = flow.ODictCaseless() - h["content-type"] = [flow.HDR_FORM_URLENCODED] d = flow.ODict([("one", "two"), ("three", "four")]) - r = flow.Request(None, (1, 1), "host", 22, "https", "GET", "/", h, utils.urlencode(d.lst)) + r = tutils.treq(content=utils.urlencode(d.lst)) + r.headers["content-type"] = [protocol.http.HDR_FORM_URLENCODED] assert r.get_form_urlencoded() == d d = flow.ODict([("x", "y")]) @@ -855,19 +841,20 @@ class TestRequest: def test_getset_query(self): h = flow.ODictCaseless() - r = flow.Request(None, (1, 1), "host", 22, "https", "GET", "/foo?x=y&a=b", h, "content") + r = tutils.treq() + r.path = "/foo?x=y&a=b" q = r.get_query() assert q.lst == [("x", "y"), ("a", "b")] - r = flow.Request(None, (1, 1), "host", 22, "https", "GET", "/", h, "content") + r.path = "/" q = r.get_query() assert not q - r = flow.Request(None, (1, 1), "host", 22, "https", "GET", "/?adsfa", h, "content") + r.path = "/?adsfa" q = r.get_query() assert q.lst == [("adsfa", "")] - r = flow.Request(None, (1, 1), "host", 22, "https", "GET", "/foo?x=y&a=b", h, "content") + r.path = "/foo?x=y&a=b" assert r.get_query() r.set_query(flow.ODict([])) assert not r.get_query() @@ -985,34 +972,20 @@ class TestRequest: print r._assemble_headers() assert result == 62 - def test_get_transmitted_size(self): - h = flow.ODictCaseless() - h["headername"] = ["headervalue"] - r = tutils.treq() - r.headers = h - result = r.get_transmitted_size() - assert result==len("content") - r.content = None - assert r.get_transmitted_size() == 0 - def test_get_content_type(self): h = flow.ODictCaseless() h["Content-Type"] = ["text/plain"] resp = tutils.tresp() resp.headers = h - assert resp.get_content_type()=="text/plain" + assert resp.headers.get_first("content-type") == "text/plain" class TestResponse: def test_simple(self): - h = flow.ODictCaseless() - h["test"] = ["test"] - c = flow.ClientConnect(("addr", 2222)) - req = flow.Request(c, (1, 1), "host", 22, "https", "GET", "/", h, "content") - resp = flow.Response(req, (1, 1), 200, "msg", h.copy(), "content", None) + f = tutils.tflow_full() + resp = f.response assert resp._assemble() assert resp.size() == len(resp._assemble()) - resp2 = resp.copy() assert resp2 == resp @@ -1021,7 +994,7 @@ class TestResponse: assert resp.size() == len(resp._assemble()) resp.content = flow.CONTENT_MISSING - assert not resp._assemble() + tutils.raises("Cannot assemble flow with CONTENT_MISSING", resp._assemble) def test_refresh(self): r = tutils.tresp() @@ -1147,7 +1120,7 @@ class TestResponse: h["Content-Type"] = ["text/plain"] resp = tutils.tresp() resp.headers = h - assert resp.headers.get_first("content-type")=="text/plain" + assert resp.headers.get_first("content-type") == "text/plain" class TestError: @@ -1195,13 +1168,13 @@ def test_decoded(): r.encode("gzip") assert r.headers["content-encoding"] assert r.content != "content" - with flow.decoded(r): + with decoded(r): assert not r.headers["content-encoding"] assert r.content == "content" assert r.headers["content-encoding"] assert r.content != "content" - with flow.decoded(r): + with decoded(r): r.content = "foo" assert r.content != "foo" |