diff options
Diffstat (limited to 'mitmproxy/test/test_dump.py')
-rw-r--r-- | mitmproxy/test/test_dump.py | 241 |
1 files changed, 241 insertions, 0 deletions
diff --git a/mitmproxy/test/test_dump.py b/mitmproxy/test/test_dump.py new file mode 100644 index 00000000..dbd0c653 --- /dev/null +++ b/mitmproxy/test/test_dump.py @@ -0,0 +1,241 @@ +import os +from cStringIO import StringIO +from libmproxy.exceptions import ContentViewException +from libmproxy.models import HTTPResponse + +import netlib.tutils +from netlib.http import CONTENT_MISSING + +from libmproxy import dump, flow +from libmproxy.proxy import Log +from . import tutils +import mock + + +def test_strfuncs(): + o = dump.Options() + m = dump.DumpMaster(None, o) + + m.outfile = StringIO() + m.o.flow_detail = 0 + m.echo_flow(tutils.tflow()) + assert not m.outfile.getvalue() + + m.o.flow_detail = 4 + m.echo_flow(tutils.tflow()) + assert m.outfile.getvalue() + + m.outfile = StringIO() + m.echo_flow(tutils.tflow(resp=True)) + assert "<<" in m.outfile.getvalue() + + m.outfile = StringIO() + m.echo_flow(tutils.tflow(err=True)) + assert "<<" in m.outfile.getvalue() + + flow = tutils.tflow() + flow.request = netlib.tutils.treq() + flow.request.stickycookie = True + flow.client_conn = mock.MagicMock() + flow.client_conn.address.host = "foo" + flow.response = netlib.tutils.tresp(content=CONTENT_MISSING) + flow.response.is_replay = True + flow.response.status_code = 300 + m.echo_flow(flow) + + flow = tutils.tflow(resp=netlib.tutils.tresp(content="{")) + flow.response.headers["content-type"] = "application/json" + flow.response.status_code = 400 + m.echo_flow(flow) + + +@mock.patch("libmproxy.contentviews.get_content_view") +def test_contentview(get_content_view): + get_content_view.side_effect = ContentViewException(""), ("x", iter([])) + + o = dump.Options(flow_detail=4, verbosity=3) + m = dump.DumpMaster(None, o, StringIO()) + m.echo_flow(tutils.tflow()) + assert "Content viewer failed" in m.outfile.getvalue() + + +class TestDumpMaster: + + def _cycle(self, m, content): + f = tutils.tflow(req=netlib.tutils.treq(content=content)) + l = Log("connect") + l.reply = mock.MagicMock() + m.handle_log(l) + m.handle_clientconnect(f.client_conn) + m.handle_serverconnect(f.server_conn) + m.handle_request(f) + f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=content)) + f = m.handle_response(f) + m.handle_clientdisconnect(f.client_conn) + return f + + def _dummy_cycle(self, n, filt, content, **options): + cs = StringIO() + o = dump.Options(filtstr=filt, **options) + m = dump.DumpMaster(None, o, outfile=cs) + for i in range(n): + self._cycle(m, content) + m.shutdown() + return cs.getvalue() + + def _flowfile(self, path): + f = open(path, "wb") + fw = flow.FlowWriter(f) + t = tutils.tflow(resp=True) + fw.add(t) + f.close() + + def test_error(self): + cs = StringIO() + o = dump.Options(flow_detail=1) + m = dump.DumpMaster(None, o, outfile=cs) + f = tutils.tflow(err=True) + m.handle_request(f) + assert m.handle_error(f) + assert "error" in cs.getvalue() + + def test_missing_content(self): + cs = StringIO() + o = dump.Options(flow_detail=3) + m = dump.DumpMaster(None, o, outfile=cs) + f = tutils.tflow() + f.request.content = CONTENT_MISSING + m.handle_request(f) + f.response = HTTPResponse.wrap(netlib.tutils.tresp()) + f.response.content = CONTENT_MISSING + m.handle_response(f) + assert "content missing" in cs.getvalue() + + def test_replay(self): + cs = StringIO() + + o = dump.Options(server_replay=["nonexistent"], kill=True) + tutils.raises(dump.DumpError, dump.DumpMaster, None, o, outfile=cs) + + with tutils.tmpdir() as t: + p = os.path.join(t, "rep") + self._flowfile(p) + + o = dump.Options(server_replay=[p], kill=True) + m = dump.DumpMaster(None, o, outfile=cs) + + self._cycle(m, "content") + self._cycle(m, "content") + + o = dump.Options(server_replay=[p], kill=False) + m = dump.DumpMaster(None, o, outfile=cs) + self._cycle(m, "nonexistent") + + o = dump.Options(client_replay=[p], kill=False) + m = dump.DumpMaster(None, o, outfile=cs) + + def test_read(self): + with tutils.tmpdir() as t: + p = os.path.join(t, "read") + self._flowfile(p) + assert "GET" in self._dummy_cycle( + 0, + None, + "", + flow_detail=1, + rfile=p + ) + + tutils.raises( + dump.DumpError, self._dummy_cycle, + 0, None, "", verbosity=1, rfile="/nonexistent" + ) + tutils.raises( + dump.DumpError, self._dummy_cycle, + 0, None, "", verbosity=1, rfile="test_dump.py" + ) + + def test_options(self): + o = dump.Options(verbosity = 2) + assert o.verbosity == 2 + + def test_filter(self): + assert not "GET" in self._dummy_cycle(1, "~u foo", "", verbosity=1) + + def test_app(self): + o = dump.Options(app=True) + s = mock.MagicMock() + m = dump.DumpMaster(s, o) + assert len(m.apps.apps) == 1 + + def test_replacements(self): + cs = StringIO() + o = dump.Options(replacements=[(".*", "content", "foo")]) + m = dump.DumpMaster(None, o, outfile=cs) + f = self._cycle(m, "content") + assert f.request.content == "foo" + + def test_setheader(self): + cs = StringIO() + o = dump.Options(setheaders=[(".*", "one", "two")]) + m = dump.DumpMaster(None, o, outfile=cs) + f = self._cycle(m, "content") + assert f.request.headers["one"] == "two" + + def test_basic(self): + for i in (1, 2, 3): + assert "GET" in self._dummy_cycle(1, "~s", "", flow_detail=i) + assert "GET" in self._dummy_cycle( + 1, + "~s", + "\x00\x00\x00", + flow_detail=i) + assert "GET" in self._dummy_cycle(1, "~s", "ascii", flow_detail=i) + + def test_write(self): + with tutils.tmpdir() as d: + p = os.path.join(d, "a") + self._dummy_cycle(1, None, "", outfile=(p, "wb"), verbosity=0) + assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 1 + + def test_write_append(self): + with tutils.tmpdir() as d: + p = os.path.join(d, "a.append") + self._dummy_cycle(1, None, "", outfile=(p, "wb"), verbosity=0) + self._dummy_cycle(1, None, "", outfile=(p, "ab"), verbosity=0) + assert len(list(flow.FlowReader(open(p, "rb")).stream())) == 2 + + def test_write_err(self): + tutils.raises( + dump.DumpError, + self._dummy_cycle, + 1, + None, + "", + outfile = ("nonexistentdir/foo", "wb") + ) + + def test_script(self): + ret = self._dummy_cycle( + 1, None, "", + scripts=[tutils.test_data.path("scripts/all.py")], verbosity=1 + ) + assert "XCLIENTCONNECT" in ret + assert "XSERVERCONNECT" in ret + assert "XREQUEST" in ret + assert "XRESPONSE" in ret + assert "XCLIENTDISCONNECT" in ret + tutils.raises( + dump.DumpError, + self._dummy_cycle, 1, None, "", scripts=["nonexistent"] + ) + tutils.raises( + dump.DumpError, + self._dummy_cycle, 1, None, "", scripts=["starterr.py"] + ) + + def test_stickycookie(self): + self._dummy_cycle(1, None, "", stickycookie = ".*") + + def test_stickyauth(self): + self._dummy_cycle(1, None, "", stickyauth = ".*") |