From 5125c669ccd2db5de5f90c66db61e64f63f3ba4c Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Sat, 5 Sep 2015 20:45:58 +0200 Subject: adjust to new netlib Headers class --- test/test_flow.py | 158 ++++++++++++++++++++++++++---------------------------- 1 file changed, 75 insertions(+), 83 deletions(-) (limited to 'test/test_flow.py') diff --git a/test/test_flow.py b/test/test_flow.py index 9cce26b3..c93beca4 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -8,7 +8,7 @@ import mock import netlib.utils from netlib import odict -from netlib.http.semantics import CONTENT_MISSING, HDR_FORM_URLENCODED +from netlib.http.semantics import CONTENT_MISSING, HDR_FORM_URLENCODED, Headers from libmproxy import filt, protocol, controller, tnetstring, flow from libmproxy.models import Error, Flow, HTTPRequest, HTTPResponse, HTTPFlow, decoded from libmproxy.proxy.config import HostMatcher @@ -34,7 +34,7 @@ def test_app_registry(): r.host = "domain2" r.port = 80 assert not ar.get(r) - r.headers["host"] = ["domain"] + r.headers["host"] = "domain" assert ar.get(r) @@ -42,7 +42,7 @@ class TestStickyCookieState: def _response(self, cookie, host): s = flow.StickyCookieState(filt.parse(".*")) f = tutils.tflow(req=netlib.tutils.treq(host=host, port=80), resp=True) - f.response.headers["Set-Cookie"] = [cookie] + f.response.headers["Set-Cookie"] = cookie s.handle_response(f) return s, f @@ -75,13 +75,13 @@ class TestStickyAuthState: def test_handle_response(self): s = flow.StickyAuthState(filt.parse(".*")) f = tutils.tflow(resp=True) - f.request.headers["authorization"] = ["foo"] + f.request.headers["authorization"] = "foo" s.handle_request(f) assert "address" in s.hosts f = tutils.tflow(resp=True) s.handle_request(f) - assert f.request.headers["authorization"] == ["foo"] + assert f.request.headers["authorization"] == "foo" class TestClientPlaybackState: @@ -133,7 +133,7 @@ class TestServerPlaybackState: assert s._hash(r) assert s._hash(r) == s._hash(r2) - r.request.headers["foo"] = ["bar"] + r.request.headers["foo"] = "bar" assert s._hash(r) == s._hash(r2) r.request.path = "voing" assert s._hash(r) != s._hash(r2) @@ -153,12 +153,12 @@ class TestServerPlaybackState: None, False) r = tutils.tflow(resp=True) - r.request.headers["foo"] = ["bar"] + r.request.headers["foo"] = "bar" r2 = tutils.tflow(resp=True) assert not s._hash(r) == s._hash(r2) - r2.request.headers["foo"] = ["bar"] + r2.request.headers["foo"] = "bar" assert s._hash(r) == s._hash(r2) - r2.request.headers["oink"] = ["bar"] + r2.request.headers["oink"] = "bar" assert s._hash(r) == s._hash(r2) r = tutils.tflow(resp=True) @@ -167,10 +167,10 @@ class TestServerPlaybackState: def test_load(self): r = tutils.tflow(resp=True) - r.request.headers["key"] = ["one"] + r.request.headers["key"] = "one" r2 = tutils.tflow(resp=True) - r2.request.headers["key"] = ["two"] + r2.request.headers["key"] = "two" s = flow.ServerPlaybackState( None, [ @@ -179,21 +179,21 @@ class TestServerPlaybackState: assert len(s.fmap.keys()) == 1 n = s.next_flow(r) - assert n.request.headers["key"] == ["one"] + assert n.request.headers["key"] == "one" assert s.count() == 1 n = s.next_flow(r) - assert n.request.headers["key"] == ["two"] + assert n.request.headers["key"] == "two" assert s.count() == 0 assert not s.next_flow(r) def test_load_with_nopop(self): r = tutils.tflow(resp=True) - r.request.headers["key"] = ["one"] + r.request.headers["key"] = "one" r2 = tutils.tflow(resp=True) - r2.request.headers["key"] = ["two"] + r2.request.headers["key"] = "two" s = flow.ServerPlaybackState( None, [ @@ -224,12 +224,10 @@ class TestServerPlaybackState: None, [], False, False, None, False, [ "param1", "param2"], False) r = tutils.tflow(resp=True) - r.request.headers[ - "Content-Type"] = ["application/x-www-form-urlencoded"] + r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" r.request.content = "paramx=x¶m1=1" r2 = tutils.tflow(resp=True) - r2.request.headers[ - "Content-Type"] = ["application/x-www-form-urlencoded"] + r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" r2.request.content = "paramx=x¶m1=1" # same parameters assert s._hash(r) == s._hash(r2) @@ -254,10 +252,10 @@ class TestServerPlaybackState: None, [], False, False, None, False, [ "param1", "param2"], False) r = tutils.tflow(resp=True) - r.request.headers["Content-Type"] = ["application/json"] + r.request.headers["Content-Type"] = "application/json" r.request.content = '{"param1":"1"}' r2 = tutils.tflow(resp=True) - r2.request.headers["Content-Type"] = ["application/json"] + r2.request.headers["Content-Type"] = "application/json" r2.request.content = '{"param1":"1"}' # same content assert s._hash(r) == s._hash(r2) @@ -271,12 +269,10 @@ class TestServerPlaybackState: None, [], False, False, None, True, [ "param1", "param2"], False) r = tutils.tflow(resp=True) - r.request.headers[ - "Content-Type"] = ["application/x-www-form-urlencoded"] + r.request.headers["Content-Type"] = "application/x-www-form-urlencoded" r.request.content = "paramx=y" r2 = tutils.tflow(resp=True) - r2.request.headers[ - "Content-Type"] = ["application/x-www-form-urlencoded"] + r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded" r2.request.content = "paramx=x" # same parameters assert s._hash(r) == s._hash(r2) @@ -460,17 +456,17 @@ class TestFlow: def test_replace(self): f = tutils.tflow(resp=True) - f.request.headers["foo"] = ["foo"] + f.request.headers["foo"] = "foo" f.request.content = "afoob" - f.response.headers["foo"] = ["foo"] + f.response.headers["foo"] = "foo" f.response.content = "afoob" assert f.replace("foo", "bar") == 6 - assert f.request.headers["bar"] == ["bar"] + assert f.request.headers["bar"] == "bar" assert f.request.content == "abarb" - assert f.response.headers["bar"] == ["bar"] + assert f.response.headers["bar"] == "bar" assert f.response.content == "abarb" def test_replace_encoded(self): @@ -938,14 +934,14 @@ class TestFlowMaster: fm.set_stickycookie(".*") f = tutils.tflow(resp=True) - f.response.headers["set-cookie"] = ["foo=bar"] + f.response.headers["set-cookie"] = "foo=bar" fm.handle_request(f) fm.handle_response(f) assert fm.stickycookie_state.jar assert not "cookie" in f.request.headers f = f.copy() fm.handle_request(f) - assert f.request.headers["cookie"] == ["foo=bar"] + assert f.request.headers["cookie"] == "foo=bar" def test_stickyauth(self): s = flow.State() @@ -958,14 +954,14 @@ class TestFlowMaster: fm.set_stickyauth(".*") f = tutils.tflow(resp=True) - f.request.headers["authorization"] = ["foo"] + f.request.headers["authorization"] = "foo" fm.handle_request(f) f = tutils.tflow(resp=True) assert fm.stickyauth_state.hosts assert not "authorization" in f.request.headers fm.handle_request(f) - assert f.request.headers["authorization"] == ["foo"] + assert f.request.headers["authorization"] == "foo" def test_stream(self): with tutils.tmpdir() as tdir: @@ -1022,7 +1018,7 @@ class TestRequest: assert r.url == "https://address:22/path" assert r.pretty_url(True) == "https://address:22/path" - r.headers["Host"] = ["foo.com"] + r.headers["Host"] = "foo.com" assert r.pretty_url(False) == "https://address:22/path" assert r.pretty_url(True) == "https://foo.com:22/path" @@ -1048,19 +1044,17 @@ class TestRequest: def test_getset_form_urlencoded(self): d = odict.ODict([("one", "two"), ("three", "four")]) r = HTTPRequest.wrap(netlib.tutils.treq(content=netlib.utils.urlencode(d.lst))) - r.headers["content-type"] = [HDR_FORM_URLENCODED] + r.headers["content-type"] = HDR_FORM_URLENCODED assert r.get_form_urlencoded() == d d = odict.ODict([("x", "y")]) r.set_form_urlencoded(d) assert r.get_form_urlencoded() == d - r.headers["content-type"] = ["foo"] + r.headers["content-type"] = "foo" assert not r.get_form_urlencoded() def test_getset_query(self): - h = odict.ODictCaseless() - r = HTTPRequest.wrap(netlib.tutils.treq()) r.path = "/foo?x=y&a=b" q = r.get_query() @@ -1083,11 +1077,10 @@ class TestRequest: assert r.get_query() == qv def test_anticache(self): - h = odict.ODictCaseless() r = HTTPRequest.wrap(netlib.tutils.treq()) - r.headers = h - h["if-modified-since"] = ["test"] - h["if-none-match"] = ["test"] + r.headers = Headers() + r.headers["if-modified-since"] = "test" + r.headers["if-none-match"] = "test" r.anticache() assert not "if-modified-since" in r.headers assert not "if-none-match" in r.headers @@ -1095,25 +1088,29 @@ class TestRequest: def test_replace(self): r = HTTPRequest.wrap(netlib.tutils.treq()) r.path = "path/foo" - r.headers["Foo"] = ["fOo"] + r.headers["Foo"] = "fOo" r.content = "afoob" assert r.replace("foo(?i)", "boo") == 4 assert r.path == "path/boo" assert not "foo" in r.content - assert r.headers["boo"] == ["boo"] + assert r.headers["boo"] == "boo" def test_constrain_encoding(self): r = HTTPRequest.wrap(netlib.tutils.treq()) - r.headers["accept-encoding"] = ["gzip", "oink"] + r.headers["accept-encoding"] = "gzip, oink" + r.constrain_encoding() + assert "oink" not in r.headers["accept-encoding"] + + r.headers.set_all("accept-encoding", ["gzip", "oink"]) r.constrain_encoding() assert "oink" not in r.headers["accept-encoding"] def test_decodeencode(self): r = HTTPRequest.wrap(netlib.tutils.treq()) - r.headers["content-encoding"] = ["identity"] + r.headers["content-encoding"] = "identity" r.content = "falafel" r.decode() - assert not r.headers["content-encoding"] + assert "content-encoding" not in r.headers assert r.content == "falafel" r = HTTPRequest.wrap(netlib.tutils.treq()) @@ -1121,26 +1118,26 @@ class TestRequest: assert not r.decode() r = HTTPRequest.wrap(netlib.tutils.treq()) - r.headers["content-encoding"] = ["identity"] + r.headers["content-encoding"] = "identity" r.content = "falafel" r.encode("identity") - assert r.headers["content-encoding"] == ["identity"] + assert r.headers["content-encoding"] == "identity" assert r.content == "falafel" r = HTTPRequest.wrap(netlib.tutils.treq()) - r.headers["content-encoding"] = ["identity"] + r.headers["content-encoding"] = "identity" r.content = "falafel" r.encode("gzip") - assert r.headers["content-encoding"] == ["gzip"] + assert r.headers["content-encoding"] == "gzip" assert r.content != "falafel" r.decode() - assert not r.headers["content-encoding"] + assert "content-encoding" not in r.headers assert r.content == "falafel" def test_get_decoded_content(self): r = HTTPRequest.wrap(netlib.tutils.treq()) r.content = None - r.headers["content-encoding"] = ["identity"] + r.headers["content-encoding"] = "identity" assert r.get_decoded_content() == None r.content = "falafel" @@ -1148,11 +1145,9 @@ class TestRequest: assert r.get_decoded_content() == "falafel" def test_get_content_type(self): - h = odict.ODictCaseless() - h["Content-Type"] = ["text/plain"] resp = HTTPResponse.wrap(netlib.tutils.tresp()) - resp.headers = h - assert resp.headers.get_first("content-type") == "text/plain" + resp.headers = Headers(content_type="text/plain") + assert resp.headers["content-type"] == "text/plain" class TestResponse: @@ -1165,19 +1160,18 @@ class TestResponse: def test_refresh(self): r = HTTPResponse.wrap(netlib.tutils.tresp()) n = time.time() - r.headers["date"] = [email.utils.formatdate(n)] + r.headers["date"] = email.utils.formatdate(n) pre = r.headers["date"] r.refresh(n) assert pre == r.headers["date"] r.refresh(n + 60) - d = email.utils.parsedate_tz(r.headers["date"][0]) + d = email.utils.parsedate_tz(r.headers["date"]) d = email.utils.mktime_tz(d) # Weird that this is not exact... assert abs(60 - (d - n)) <= 1 - r.headers[ - "set-cookie"] = ["MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"] + r.headers["set-cookie"] = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure" r.refresh() def test_refresh_cookie(self): @@ -1192,47 +1186,45 @@ class TestResponse: def test_replace(self): r = HTTPResponse.wrap(netlib.tutils.tresp()) - r.headers["Foo"] = ["fOo"] + r.headers["Foo"] = "fOo" r.content = "afoob" assert r.replace("foo(?i)", "boo") == 3 assert not "foo" in r.content - assert r.headers["boo"] == ["boo"] + assert r.headers["boo"] == "boo" def test_decodeencode(self): r = HTTPResponse.wrap(netlib.tutils.tresp()) - r.headers["content-encoding"] = ["identity"] + r.headers["content-encoding"] = "identity" r.content = "falafel" assert r.decode() - assert not r.headers["content-encoding"] + assert "content-encoding" not in r.headers assert r.content == "falafel" r = HTTPResponse.wrap(netlib.tutils.tresp()) - r.headers["content-encoding"] = ["identity"] + r.headers["content-encoding"] = "identity" r.content = "falafel" r.encode("identity") - assert r.headers["content-encoding"] == ["identity"] + assert r.headers["content-encoding"] == "identity" assert r.content == "falafel" r = HTTPResponse.wrap(netlib.tutils.tresp()) - r.headers["content-encoding"] = ["identity"] + r.headers["content-encoding"] = "identity" r.content = "falafel" r.encode("gzip") - assert r.headers["content-encoding"] == ["gzip"] + assert r.headers["content-encoding"] == "gzip" assert r.content != "falafel" assert r.decode() - assert not r.headers["content-encoding"] + assert "content-encoding" not in r.headers assert r.content == "falafel" - r.headers["content-encoding"] = ["gzip"] + r.headers["content-encoding"] = "gzip" assert not r.decode() assert r.content == "falafel" def test_get_content_type(self): - h = odict.ODictCaseless() - h["Content-Type"] = ["text/plain"] resp = HTTPResponse.wrap(netlib.tutils.tresp()) - resp.headers = h - assert resp.headers.get_first("content-type") == "text/plain" + resp.headers = Headers(content_type="text/plain") + assert resp.headers["content-type"] == "text/plain" class TestError: @@ -1276,12 +1268,12 @@ class TestClientConnection: def test_decoded(): r = HTTPRequest.wrap(netlib.tutils.treq()) assert r.content == "content" - assert not r.headers["content-encoding"] + assert "content-encoding" not in r.headers r.encode("gzip") assert r.headers["content-encoding"] assert r.content != "content" with decoded(r): - assert not r.headers["content-encoding"] + assert "content-encoding" not in r.headers assert r.content == "content" assert r.headers["content-encoding"] assert r.content != "content" @@ -1378,18 +1370,18 @@ def test_setheaders(): h.add("~s", "one", "two") h.add("~s", "one", "three") f = tutils.tflow(resp=True) - f.request.headers["one"] = ["xxx"] - f.response.headers["one"] = ["xxx"] + f.request.headers["one"] = "xxx" + f.response.headers["one"] = "xxx" h.run(f) - assert f.request.headers["one"] == ["xxx"] - assert f.response.headers["one"] == ["two", "three"] + assert f.request.headers["one"] == "xxx" + assert f.response.headers.get_all("one") == ["two", "three"] h.clear() h.add("~q", "one", "two") h.add("~q", "one", "three") f = tutils.tflow() - f.request.headers["one"] = ["xxx"] + f.request.headers["one"] = "xxx" h.run(f) - assert f.request.headers["one"] == ["two", "three"] + assert f.request.headers.get_all("one") == ["two", "three"] assert not h.add("~", "foo", "bar") -- cgit v1.2.3