diff options
Diffstat (limited to 'test/test_console.py')
-rw-r--r-- | test/test_console.py | 269 |
1 files changed, 269 insertions, 0 deletions
diff --git a/test/test_console.py b/test/test_console.py new file mode 100644 index 00000000..50780aa5 --- /dev/null +++ b/test/test_console.py @@ -0,0 +1,269 @@ +from libmproxy import console, proxy, utils, filt +import libpry + +def treq(conn=None): + if not conn: + conn = proxy.BrowserConnection("address", 22) + headers = utils.Headers() + headers["header"] = ["qvalue"] + return proxy.Request(conn, "host", 80, "http", "GET", "/path", headers, "content") + + +def tresp(req=None): + if not req: + req = treq() + headers = utils.Headers() + headers["header_response"] = ["svalue"] + return proxy.Response(req, 200, "HTTP/1.1", "message", headers, "content_response") + + +def tflow(): + bc = proxy.BrowserConnection("address", 22) + return console.Flow(bc) + + +class uState(libpry.AutoTree): + def test_backup(self): + bc = proxy.BrowserConnection("address", 22) + c = console.State() + f = console.Flow(bc) + c.add_browserconnect(f) + + f.backup() + c.revert(f) + + def test_flow(self): + """ + normal flow: + + connect -> request -> response + """ + bc = proxy.BrowserConnection("address", 22) + c = console.State() + f = console.Flow(bc) + c.add_browserconnect(f) + assert c.lookup(bc) + assert c.get_focus() == (f, 0) + + req = treq(bc) + assert c.add_request(req) + assert len(c.flow_list) == 1 + assert c.lookup(req) + + newreq = treq() + assert not c.add_request(newreq) + assert not c.lookup(newreq) + + resp = tresp(req) + assert c.add_response(resp) + assert len(c.flow_list) == 1 + assert f.waiting == False + assert c.lookup(resp) + + newresp = tresp() + assert not c.add_response(newresp) + assert not c.lookup(newresp) + + def test_err(self): + bc = proxy.BrowserConnection("address", 22) + c = console.State() + f = console.Flow(bc) + c.add_browserconnect(f) + e = proxy.Error(bc, "message") + assert c.add_error(e) + + e = proxy.Error(proxy.BrowserConnection("address", 22), "message") + assert not c.add_error(e) + + def test_view(self): + c = console.State() + + f = tflow() + c.add_browserconnect(f) + assert len(c.view) == 1 + c.set_limit(filt.parse("~q")) + assert len(c.view) == 0 + c.set_limit(None) + + + f = tflow() + req = treq(f.connection) + c.add_browserconnect(f) + c.add_request(req) + assert len(c.view) == 2 + c.set_limit(filt.parse("~q")) + assert len(c.view) == 1 + c.set_limit(filt.parse("~s")) + assert len(c.view) == 0 + + def test_focus(self): + """ + normal flow: + + connect -> request -> response + """ + c = console.State() + + bc = proxy.BrowserConnection("address", 22) + f = console.Flow(bc) + c.add_browserconnect(f) + assert c.get_focus() == (f, 0) + assert c.get_from_pos(0) == (f, 0) + assert c.get_from_pos(1) == (None, None) + assert c.get_next(0) == (None, None) + + bc2 = proxy.BrowserConnection("address", 22) + f2 = console.Flow(bc2) + c.add_browserconnect(f2) + assert c.get_focus() == (f, 1) + assert c.get_next(0) == (f, 1) + assert c.get_prev(1) == (f2, 0) + assert c.get_next(1) == (None, None) + + c.set_focus(0) + assert c.get_focus() == (f2, 0) + c.set_focus(-1) + assert c.get_focus() == (f2, 0) + + c.delete_flow(f2) + assert c.get_focus() == (f, 0) + c.delete_flow(f) + assert c.get_focus() == (None, None) + + def _add_request(self, state): + f = tflow() + state.add_browserconnect(f) + q = treq(f.connection) + state.add_request(q) + return f + + def _add_response(self, state): + f = self._add_request(state) + r = tresp(f.request) + state.add_response(r) + + def test_focus_view(self): + c = console.State() + self._add_request(c) + self._add_response(c) + self._add_request(c) + self._add_response(c) + self._add_request(c) + self._add_response(c) + c.set_limit(filt.parse("~q")) + assert len(c.view) == 3 + assert c.focus == 2 + + def test_delete_last(self): + c = console.State() + f1 = tflow() + f2 = tflow() + c.add_browserconnect(f1) + c.add_browserconnect(f2) + c.set_focus(1) + c.delete_flow(f1) + assert c.focus == 0 + + def test_kill_flow(self): + c = console.State() + f = tflow() + c.add_browserconnect(f) + c.kill_flow(f) + assert not c.flow_list + + def test_clear(self): + c = console.State() + f = tflow() + c.add_browserconnect(f) + f.intercepting = True + + c.clear() + assert len(c.flow_list) == 1 + f.intercepting = False + c.clear() + assert len(c.flow_list) == 0 + + +class uFlow(libpry.AutoTree): + def test_match(self): + f = tflow() + f.response = tresp() + f.request = f.response.request + assert not f.match(filt.parse("~b test")) + + def test_backup(self): + f = tflow() + f.backup() + f.revert() + + def test_simple(self): + f = tflow() + assert f.get_text() + + f.request = treq() + assert f.get_text() + + f.response = tresp() + f.response.headers["content-type"] = ["text/html"] + assert f.get_text() + f.response.code = 404 + assert f.get_text() + + f.focus = True + assert f.get_text() + + f.connection = console.ReplayConnection() + assert f.get_text() + + f.response = None + assert f.get_text() + + f.error = proxy.Error(200, "test") + assert f.get_text() + + def test_kill(self): + f = tflow() + f.request = treq() + f.intercept() + assert not f.request.acked + f.kill() + assert f.request.acked + f.intercept() + f.response = tresp() + f.request = f.response.request + f.request.ack() + assert not f.response.acked + f.kill() + assert f.response.acked + + def test_accept_intercept(self): + f = tflow() + f.request = treq() + f.intercept() + assert not f.request.acked + f.accept_intercept() + assert f.request.acked + f.response = tresp() + f.request = f.response.request + f.intercept() + f.request.ack() + assert not f.response.acked + f.accept_intercept() + assert f.response.acked + + +class uformat_keyvals(libpry.AutoTree): + def test_simple(self): + assert console.format_keyvals( + [ + ("aa", "bb"), + ("cc", "dd"), + ] + ) + + +tests = [ + uFlow(), + uformat_keyvals(), + uState() +] |