diff options
| author | Aldo Cortesi <aldo@nullcube.com> | 2011-08-03 22:38:23 +1200 | 
|---|---|---|
| committer | Aldo Cortesi <aldo@nullcube.com> | 2011-08-03 22:41:38 +1200 | 
| commit | 57c653be5f8a6fe0d1785421faa6513ebd3d48c0 (patch) | |
| tree | c2334815d6b20ec7719eba351126d307f11bf29f /test/test_flow.py | |
| parent | cbd8d09849fbbd8ccd8f5cbe29f09949fc344767 (diff) | |
| download | mitmproxy-57c653be5f8a6fe0d1785421faa6513ebd3d48c0.tar.gz mitmproxy-57c653be5f8a6fe0d1785421faa6513ebd3d48c0.tar.bz2 mitmproxy-57c653be5f8a6fe0d1785421faa6513ebd3d48c0.zip | |
Move all HTTP objects to flow.py
That's Request, Response, ClientConnect, ClientDisconnect, Error, and Headers.
Diffstat (limited to 'test/test_flow.py')
| -rw-r--r-- | test/test_flow.py | 350 | 
1 files changed, 334 insertions, 16 deletions
| diff --git a/test/test_flow.py b/test/test_flow.py index b21f84cd..f61e8de5 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -1,6 +1,7 @@ -import Queue +import Queue, time, textwrap  from cStringIO import StringIO -from libmproxy import console, proxy, filt, flow, controller +import email.utils +from libmproxy import console, proxy, filt, flow, controller, utils  import tutils  import libpry @@ -156,12 +157,12 @@ class uFlow(libpry.AutoTree):          assert f.get_state() == flow.Flow.from_state(state).get_state()          f.response = None -        f.error = proxy.Error(f.request, "error") +        f.error = flow.Error(f.request, "error")          state = f.get_state()           assert f.get_state() == flow.Flow.from_state(state).get_state()          f2 = tutils.tflow() -        f2.error = proxy.Error(f.request, "e2") +        f2.error = flow.Error(f.request, "e2")          assert not f == f2          f.load_state(f2.get_state())          assert f.get_state() == f2.get_state() @@ -240,7 +241,7 @@ class uFlow(libpry.AutoTree):  class uState(libpry.AutoTree):      def test_backup(self): -        bc = proxy.ClientConnect(("address", 22)) +        bc = flow.ClientConnect(("address", 22))          c = flow.State()          req = tutils.treq()          f = c.add_request(req) @@ -254,7 +255,7 @@ class uState(libpry.AutoTree):                  connect -> request -> response          """ -        bc = proxy.ClientConnect(("address", 22)) +        bc = flow.ClientConnect(("address", 22))          c = flow.State()          req = tutils.treq(bc) @@ -284,17 +285,17 @@ class uState(libpry.AutoTree):          assert c.add_response(resp)          assert c.active_flow_count() == 0 -        dc = proxy.ClientDisconnect(bc) +        dc = flow.ClientDisconnect(bc)      def test_err(self): -        bc = proxy.ClientConnect(("address", 22)) +        bc = flow.ClientConnect(("address", 22))          c = flow.State()          req = tutils.treq()          f = c.add_request(req) -        e = proxy.Error(f.request, "message") +        e = flow.Error(f.request, "message")          assert c.add_error(e) -        e = proxy.Error(tutils.tflow().request, "message") +        e = flow.Error(tutils.tflow().request, "message")          assert not c.add_error(e) @@ -348,7 +349,7 @@ class uState(libpry.AutoTree):      def _add_error(self, state):          req = tutils.treq()          f = state.add_request(req) -        f.error = proxy.Error(f.request, "msg") +        f.error = flow.Error(f.request, "msg")      def test_clear(self):          c = flow.State() @@ -451,10 +452,10 @@ class uFlowMaster(libpry.AutoTree):          resp = tutils.tresp(req)          fm.handle_response(resp)          assert fm.script.ns["log"][-1] == "response" -        dc = proxy.ClientDisconnect(req.client_conn) +        dc = flow.ClientDisconnect(req.client_conn)          fm.handle_clientdisconnect(dc)          assert fm.script.ns["log"][-1] == "clientdisconnect" -        err = proxy.Error(f.request, "msg") +        err = flow.Error(f.request, "msg")          fm.handle_error(err)          assert fm.script.ns["log"][-1] == "error" @@ -476,10 +477,10 @@ class uFlowMaster(libpry.AutoTree):          rx = tutils.tresp()          assert not fm.handle_response(rx) -        dc = proxy.ClientDisconnect(req.client_conn) +        dc = flow.ClientDisconnect(req.client_conn)          fm.handle_clientdisconnect(dc) -        err = proxy.Error(f.request, "msg") +        err = flow.Error(f.request, "msg")          fm.handle_error(err)      def test_client_playback(self): @@ -496,7 +497,7 @@ class uFlowMaster(libpry.AutoTree):          fm.tick(q)          assert fm.state.flow_count() -        fm.handle_error(proxy.Error(f.request, "error")) +        fm.handle_error(flow.Error(f.request, "error"))      def test_server_playback(self):          s = flow.State() @@ -564,6 +565,318 @@ class uFlowMaster(libpry.AutoTree):          fm.handle_request(f.request)          assert f.request.headers["authorization"] == ["foo"] +class uRequest(libpry.AutoTree): +    def test_simple(self): +        h = flow.Headers() +        h["test"] = ["test"] +        c = flow.ClientConnect(("addr", 2222)) +        r = flow.Request(c, "host", 22, "https", "GET", "/", h, "content") +        u = r.url() +        assert r.set_url(u) +        assert not r.set_url("") +        assert r.url() == u +        assert r.assemble() + +        r2 = r.copy() +        assert r == r2 + +    def test_anticache(self): +        h = flow.Headers() +        r = flow.Request(None, "host", 22, "https", "GET", "/", h, "content") +        h["if-modified-since"] = ["test"] +        h["if-none-match"] = ["test"] +        r.anticache() +        assert not "if-modified-since" in r.headers +        assert not "if-none-match" in r.headers + +    def test_getset_state(self): +        h = flow.Headers() +        h["test"] = ["test"] +        c = flow.ClientConnect(("addr", 2222)) +        r = flow.Request(c, "host", 22, "https", "GET", "/", h, "content") +        state = r.get_state() +        assert flow.Request.from_state(state) == r + +        r.client_conn = None +        state = r.get_state() +        assert flow.Request.from_state(state) == r + +        r2 = flow.Request(c, "testing", 20, "http", "PUT", "/foo", h, "test") +        assert not r == r2 +        r.load_state(r2.get_state()) +        assert r == r2 + +        r2.client_conn = None +        r.load_state(r2.get_state()) +        assert not r.client_conn + +    def test_replace(self): +        r = tutils.treq() +        r.path = "path/foo" +        r.headers["Foo"] = ["fOo"] +        r.content = "afoob" +        assert r.replace("foo(?i)", "boo") == 4 +        assert r.path == "path/boo" +        assert not "foo" in r.content +        assert r.headers["boo"] == ["boo"] + +    def test_decodeencode(self): +        r = tutils.treq() +        r.headers["content-encoding"] = ["identity"] +        r.content = "falafel" +        r.decode() +        assert not r.headers["content-encoding"] +        assert r.content == "falafel" + +        r = tutils.treq() +        r.headers["content-encoding"] = ["identity"] +        r.content = "falafel" +        r.encode("identity") +        assert r.headers["content-encoding"] == ["identity"] +        assert r.content == "falafel" + +        r = tutils.treq() +        r.headers["content-encoding"] = ["identity"] +        r.content = "falafel" +        r.encode("gzip") +        assert r.headers["content-encoding"] == ["gzip"] +        assert r.content != "falafel" +        r.decode() +        assert not r.headers["content-encoding"] +        assert r.content == "falafel" + + +class uResponse(libpry.AutoTree): +    def test_simple(self): +        h = flow.Headers() +        h["test"] = ["test"] +        c = flow.ClientConnect(("addr", 2222)) +        req = flow.Request(c, "host", 22, "https", "GET", "/", h, "content") +        resp = flow.Response(req, 200, "msg", h.copy(), "content") +        assert resp.assemble() + +        resp2 = resp.copy() +        assert resp2 == resp + +    def test_refresh(self): +        r = tutils.tresp() +        n = time.time() +        r.headers["date"] = [email.utils.formatdate(n)] +        pre = r.headers["date"] +        r.refresh(n) +        assert pre == r.headers["date"] +        r.refresh(n+60) + +        d = email.utils.parsedate_tz(r.headers["date"][0]) +        d = email.utils.mktime_tz(d) +        # Weird that this is not exact... +        assert abs(60-(d-n)) <= 1 + +        r.headers["set-cookie"] = ["MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"] +        r.refresh() + +    def test_refresh_cookie(self): +        r = tutils.tresp() + +        # Invalid expires format, sent to us by Reddit. +        c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/" +        assert r._refresh_cookie(c, 60) + +        c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure" +        assert "00:21:38" in r._refresh_cookie(c, 60) + + +    def test_getset_state(self): +        h = flow.Headers() +        h["test"] = ["test"] +        c = flow.ClientConnect(("addr", 2222)) +        r = flow.Request(c, "host", 22, "https", "GET", "/", h, "content") +        req = flow.Request(c, "host", 22, "https", "GET", "/", h, "content") +        resp = flow.Response(req, 200, "msg", h.copy(), "content") + +        state = resp.get_state() +        assert flow.Response.from_state(req, state) == resp + +        resp2 = flow.Response(req, 220, "foo", h.copy(), "test") +        assert not resp == resp2 +        resp.load_state(resp2.get_state()) +        assert resp == resp2 + +    def test_replace(self): +        r = tutils.tresp() +        r.headers["Foo"] = ["fOo"] +        r.content = "afoob" +        assert r.replace("foo(?i)", "boo") == 3 +        assert not "foo" in r.content +        assert r.headers["boo"] == ["boo"] + +    def test_decodeencode(self): +        r = tutils.tresp() +        r.headers["content-encoding"] = ["identity"] +        r.content = "falafel" +        r.decode() +        assert not r.headers["content-encoding"] +        assert r.content == "falafel" + +        r = tutils.tresp() +        r.headers["content-encoding"] = ["identity"] +        r.content = "falafel" +        r.encode("identity") +        assert r.headers["content-encoding"] == ["identity"] +        assert r.content == "falafel" + +        r = tutils.tresp() +        r.headers["content-encoding"] = ["identity"] +        r.content = "falafel" +        r.encode("gzip") +        assert r.headers["content-encoding"] == ["gzip"] +        assert r.content != "falafel" +        r.decode() +        assert not r.headers["content-encoding"] +        assert r.content == "falafel" + + +class uError(libpry.AutoTree): +    def test_getset_state(self): +        e = flow.Error(None, "Error") +        state = e.get_state() +        assert flow.Error.from_state(state) == e + +        assert e.copy() + +        e2 = flow.Error(None, "bar") +        assert not e == e2 +        e.load_state(e2.get_state()) +        assert e == e2 + + +        e3 = e.copy() +        assert e3 == e + +    def test_replace(self): +        e = flow.Error(None, "amoop") +        e.replace("moo", "bar") +        assert e.msg == "abarp" + + +class uClientConnect(libpry.AutoTree): +    def test_state(self): +        c = flow.ClientConnect(("a", 22)) +        assert flow.ClientConnect.from_state(c.get_state()) == c + +        c2 = flow.ClientConnect(("a", 25)) +        assert not c == c2 + +        c.load_state(c2.get_state()) +        assert c == c2 + +        c3 = c.copy() +        assert c3 == c + + +class uHeaders(libpry.AutoTree): +    def setUp(self): +        self.hd = flow.Headers() + +    def test_read_simple(self): +        data = """ +            Header: one +            Header2: two +            \r\n +        """ +        data = textwrap.dedent(data) +        data = data.strip() +        s = StringIO(data) +        self.hd.read(s) +        assert self.hd["header"] == ["one"] +        assert self.hd["header2"] == ["two"] + +    def test_read_multi(self): +        data = """ +            Header: one +            Header: two +            \r\n +        """ +        data = textwrap.dedent(data) +        data = data.strip() +        s = StringIO(data) +        self.hd.read(s) +        assert self.hd["header"] == ["one", "two"] + +    def test_read_continued(self): +        data = """ +            Header: one +            \ttwo +            Header2: three +            \r\n +        """ +        data = textwrap.dedent(data) +        data = data.strip() +        s = StringIO(data) +        self.hd.read(s) +        assert self.hd["header"] == ['one\r\n two'] + +    def test_dictToHeader1(self): +        self.hd.add("one", "uno") +        self.hd.add("two", "due") +        self.hd.add("two", "tre") +        expected = [ +            "one: uno\r\n", +            "two: due\r\n", +            "two: tre\r\n", +            "\r\n" +        ] +        out = repr(self.hd) +        for i in expected: +            assert out.find(i) >= 0 + +    def test_dictToHeader2(self): +        self.hd["one"] = ["uno"] +        expected1 = "one: uno\r\n" +        expected2 = "\r\n" +        out = repr(self.hd) +        assert out.find(expected1) >= 0 +        assert out.find(expected2) >= 0 + +    def test_match_re(self): +        h = flow.Headers() +        h.add("one", "uno") +        h.add("two", "due") +        h.add("two", "tre") +        assert h.match_re("uno") +        assert h.match_re("two: due") +        assert not h.match_re("nonono") + +    def test_getset_state(self): +        self.hd.add("foo", 1) +        self.hd.add("foo", 2) +        self.hd.add("bar", 3) +        state = self.hd.get_state() +        nd = flow.Headers.from_state(state) +        assert nd == self.hd + +    def test_copy(self): +        self.hd.add("foo", 1) +        self.hd.add("foo", 2) +        self.hd.add("bar", 3) +        assert self.hd == self.hd.copy() + +    def test_del(self): +        self.hd.add("foo", 1) +        self.hd.add("Foo", 2) +        self.hd.add("bar", 3) +        del self.hd["foo"] +        assert len(self.hd.lst) == 1 + +    def test_replace(self): +        self.hd.add("one", "two") +        self.hd.add("two", "one") +        assert self.hd.replace("one", "vun") == 2 +        assert self.hd.lst == [ +            ["vun", "two"], +            ["two", "vun"], +        ] +  tests = [      uStickyCookieState(), @@ -574,4 +887,9 @@ tests = [      uState(),      uSerialize(),      uFlowMaster(), +    uRequest(), +    uResponse(), +    uError(), +    uClientConnect(), +    uHeaders(),  ] | 
