diff options
-rw-r--r-- | libmproxy/console.py | 2 | ||||
-rw-r--r-- | libmproxy/contrib/pyparsing.py (renamed from libmproxy/pyparsing.py) | 0 | ||||
-rw-r--r-- | libmproxy/dump.py | 1 | ||||
-rw-r--r-- | libmproxy/filt.py | 4 | ||||
-rw-r--r-- | libmproxy/flow.py | 2 | ||||
-rw-r--r-- | libmproxy/proxy.py | 7 | ||||
-rw-r--r-- | libmproxy/script.py | 1 | ||||
-rw-r--r-- | test/.pry | 4 | ||||
-rw-r--r-- | test/test_console.py | 131 | ||||
-rw-r--r-- | test/test_filt.py | 9 | ||||
-rw-r--r-- | test/test_flow.py | 142 | ||||
-rw-r--r-- | test/test_proxy.py | 7 |
12 files changed, 197 insertions, 113 deletions
diff --git a/libmproxy/console.py b/libmproxy/console.py index 0df1339c..03fde9f5 100644 --- a/libmproxy/console.py +++ b/libmproxy/console.py @@ -858,7 +858,7 @@ class ConsoleMaster(controller.Master): try: idx = self.state.view.index(self.currentflow) self.conn_list_view.set_focus(idx) - except IndexError: + except (IndexError, ValueError): pass self.body = urwid.ListBox(self.conn_list_view) self.statusbar = StatusBar(self, self.footer_text_default) diff --git a/libmproxy/pyparsing.py b/libmproxy/contrib/pyparsing.py index 06b11d98..06b11d98 100644 --- a/libmproxy/pyparsing.py +++ b/libmproxy/contrib/pyparsing.py diff --git a/libmproxy/dump.py b/libmproxy/dump.py index 83238da4..1fe1c095 100644 --- a/libmproxy/dump.py +++ b/libmproxy/dump.py @@ -1,6 +1,7 @@ import sys import controller +#begin nocover class DumpMaster(controller.Master): """ A simple master that just dumps to screen. diff --git a/libmproxy/filt.py b/libmproxy/filt.py index 87746f94..31c43581 100644 --- a/libmproxy/filt.py +++ b/libmproxy/filt.py @@ -39,7 +39,7 @@ rex Equivalent to ~u rex """ import re, sys -import pyparsing as pp +import contrib.pyparsing as pp class _Token: @@ -315,6 +315,6 @@ def parse(s): return bnf.parseString(s, parseAll=True)[0] except pp.ParseException: return None - except ValueError: + except ValueError, e: return None diff --git a/libmproxy/flow.py b/libmproxy/flow.py index cf39bed9..8b8a8bae 100644 --- a/libmproxy/flow.py +++ b/libmproxy/flow.py @@ -48,8 +48,8 @@ class Flow: @classmethod def script_deserialize(klass, data): - data = base64.decodestring(data) try: + data = base64.decodestring(data) data = bson.loads(data) # bson.loads doesn't define a particular exception on error... except Exception: diff --git a/libmproxy/proxy.py b/libmproxy/proxy.py index 9715bb5a..00b9e0ba 100644 --- a/libmproxy/proxy.py +++ b/libmproxy/proxy.py @@ -45,7 +45,10 @@ def parse_url(url): port = int(port) else: host = netloc - port = 80 + if scheme == "https": + port = 443 + else: + port = 80 path = urlparse.urlunparse(('', '', path, params, query, fragment)) if not path: path = "/" @@ -277,6 +280,8 @@ class FileLike: return result +#begin nocover + class ServerConnection: def __init__(self, request): self.request = request diff --git a/libmproxy/script.py b/libmproxy/script.py index 9ff861e9..5693f5e5 100644 --- a/libmproxy/script.py +++ b/libmproxy/script.py @@ -10,6 +10,7 @@ from contrib import bson import flow +#begin nocover def load_flow(): """ Load a flow from the stdin. Returns a Flow object. @@ -1,5 +1,5 @@ base = .. coverage = ../libmproxy -exclude = ../libmproxy/pyparsing.py - . +exclude = . + ../libmproxy/contrib diff --git a/test/test_console.py b/test/test_console.py index 59dd7920..6252d8de 100644 --- a/test/test_console.py +++ b/test/test_console.py @@ -4,15 +4,6 @@ import libpry class uState(libpry.AutoTree): - def test_backup(self): - bc = proxy.BrowserConnection("address", 22) - c = console.ConsoleState() - f = flow.Flow(bc) - c.add_browserconnect(f) - - f.backup() - c.revert(f) - def test_flow(self): """ normal flow: @@ -26,56 +17,6 @@ class uState(libpry.AutoTree): assert c.lookup(bc) assert c.get_focus() == (f, 0) - req = utils.treq(bc) - assert c.add_request(req) - assert len(c.flow_list) == 1 - assert c.lookup(req) - - newreq = utils.treq() - assert not c.add_request(newreq) - assert not c.lookup(newreq) - - resp = utils.tresp(req) - assert c.add_response(resp) - assert len(c.flow_list) == 1 - assert c.lookup(resp) - - newresp = utils.tresp() - assert not c.add_response(newresp) - assert not c.lookup(newresp) - - def test_err(self): - bc = proxy.BrowserConnection("address", 22) - c = console.ConsoleState() - f = flow.Flow(bc) - c.add_browserconnect(f) - e = proxy.Error(bc, "message") - assert c.add_error(e) - - e = proxy.Error(proxy.BrowserConnection("address", 22), "message") - assert not c.add_error(e) - - def test_view(self): - c = console.ConsoleState() - - f = utils.tflow() - c.add_browserconnect(f) - assert len(c.view) == 1 - c.set_limit(filt.parse("~q")) - assert len(c.view) == 0 - c.set_limit(None) - - - f = utils.tflow() - req = utils.treq(f.connection) - c.add_browserconnect(f) - c.add_request(req) - assert len(c.view) == 2 - c.set_limit(filt.parse("~q")) - assert len(c.view) == 1 - c.set_limit(filt.parse("~s")) - assert len(c.view) == 0 - def test_focus(self): """ normal flow: @@ -122,48 +63,22 @@ class uState(libpry.AutoTree): r = utils.tresp(f.request) state.add_response(r) - def test_focus_view(self): - c = console.ConsoleState() - self._add_request(c) - self._add_response(c) - self._add_request(c) - self._add_response(c) - self._add_request(c) - self._add_response(c) - c.set_limit(filt.parse("~q")) - assert len(c.view) == 3 - assert c.focus == 2 - - def test_delete_last(self): - c = console.ConsoleState() - f1 = utils.tflow() - f2 = utils.tflow() - c.add_browserconnect(f1) - c.add_browserconnect(f2) - c.set_focus(1) - c.delete_flow(f1) - assert c.focus == 0 - - def test_kill_flow(self): + def test_add_request(self): c = console.ConsoleState() f = utils.tflow() c.add_browserconnect(f) - c.kill_flow(f) - assert not c.flow_list + q = utils.treq(f.connection) + c.focus = None + assert c.add_request(q) - def test_clear(self): + def test_add_response(self): c = console.ConsoleState() - f = utils.tflow() - c.add_browserconnect(f) - f.intercepting = True - - c.clear() - assert len(c.flow_list) == 1 - f.intercepting = False - c.clear() - assert len(c.flow_list) == 0 + f = self._add_request(c) + r = utils.tresp(f.request) + c.focus = None + c.add_response(r) - def test_dump_flows(self): + def test_focus_view(self): c = console.ConsoleState() self._add_request(c) self._add_response(c) @@ -171,11 +86,9 @@ class uState(libpry.AutoTree): self._add_response(c) self._add_request(c) self._add_response(c) - - dump = c.dump_flows() - c.clear() - c.load_flows(dump) - assert isinstance(c.flow_list[0], flow.Flow) + c.set_limit(filt.parse("~q")) + assert len(c.view) == 3 + assert c.focus == 2 class uformat_keyvals(libpry.AutoTree): @@ -183,10 +96,27 @@ class uformat_keyvals(libpry.AutoTree): assert console.format_keyvals( [ ("aa", "bb"), + None, ("cc", "dd"), ] ) + +class uformat_flow(libpry.AutoTree): + def test_simple(self): + f = utils.tflow() + assert ('focus', '>> ') not in console.format_flow(f, False) + assert ('focus', '>> ') in console.format_flow(f, True) + + f.response = utils.tresp() + f.request = f.response.request + f.backup() + + assert ('method', '[edited] ') in console.format_flow(f, True) + f.connection = flow.ReplayConnection() + assert ('method', '[replay] ') in console.format_flow(f, True) + + class uPathCompleter(libpry.AutoTree): def test_completion(self): c = console._PathCompleter(True) @@ -219,6 +149,7 @@ class uPathCompleter(libpry.AutoTree): tests = [ uformat_keyvals(), + uformat_flow(), uState(), uPathCompleter() ] diff --git a/test/test_filt.py b/test/test_filt.py index 3cf0f6cd..036c1892 100644 --- a/test/test_filt.py +++ b/test/test_filt.py @@ -9,6 +9,9 @@ class uParsing(libpry.AutoTree): x.dump(fp=c) assert c.getvalue() + def test_err(self): + assert filt.parse("~h [") is None + def test_simple(self): assert not filt.parse("~b") assert filt.parse("~q") @@ -20,9 +23,6 @@ class uParsing(libpry.AutoTree): assert len(p.lst) == 2 def test_naked_url(self): - #a = filt.parse("foobar") - #assert a.lst[0].expr == "foobar" - a = filt.parse("foobar ~h rex") assert a.lst[0].expr == "foobar" assert a.lst[1].expr == "rex" @@ -214,6 +214,9 @@ class uMatching(libpry.AutoTree): assert not self.q("!~c 201 !~c 200", s) + + + tests = [ uMatching(), uParsing() diff --git a/test/test_flow.py b/test/test_flow.py index 470547a5..59b277e4 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -26,6 +26,10 @@ class uFlow(libpry.AutoTree): f.request = f.response.request assert not f.match(filt.parse("~b test")) + def test_dump(self): + f = utils.tflow() + assert f.dump() + def test_backup(self): f = utils.tflow() assert not f.modified() @@ -59,7 +63,6 @@ class uFlow(libpry.AutoTree): assert console.format_flow(f, True) assert console.format_flow(f, False) - f.focus = True assert console.format_flow(f, True) assert console.format_flow(f, False) @@ -110,6 +113,141 @@ class uFlow(libpry.AutoTree): f.request = utils.treq() +class uState(libpry.AutoTree): + def test_backup(self): + bc = proxy.BrowserConnection("address", 22) + c = flow.State() + f = flow.Flow(bc) + c.add_browserconnect(f) + + f.backup() + c.revert(f) + + def test_flow(self): + """ + normal flow: + + connect -> request -> response + """ + bc = proxy.BrowserConnection("address", 22) + c = flow.State() + f = flow.Flow(bc) + c.add_browserconnect(f) + assert c.lookup(bc) + + req = utils.treq(bc) + assert c.add_request(req) + assert len(c.flow_list) == 1 + assert c.lookup(req) + + newreq = utils.treq() + assert not c.add_request(newreq) + assert not c.lookup(newreq) + + resp = utils.tresp(req) + assert c.add_response(resp) + assert len(c.flow_list) == 1 + assert c.lookup(resp) + + newresp = utils.tresp() + assert not c.add_response(newresp) + assert not c.lookup(newresp) + + def test_err(self): + bc = proxy.BrowserConnection("address", 22) + c = flow.State() + f = flow.Flow(bc) + c.add_browserconnect(f) + e = proxy.Error(bc, "message") + assert c.add_error(e) + + e = proxy.Error(proxy.BrowserConnection("address", 22), "message") + assert not c.add_error(e) + + def test_view(self): + c = flow.State() + + f = utils.tflow() + c.add_browserconnect(f) + assert len(c.view) == 1 + c.set_limit(filt.parse("~q")) + assert len(c.view) == 0 + c.set_limit(None) + + + f = utils.tflow() + req = utils.treq(f.connection) + c.add_browserconnect(f) + c.add_request(req) + assert len(c.view) == 2 + c.set_limit(filt.parse("~q")) + assert len(c.view) == 1 + c.set_limit(filt.parse("~s")) + assert len(c.view) == 0 + + def _add_request(self, state): + f = utils.tflow() + state.add_browserconnect(f) + q = utils.treq(f.connection) + state.add_request(q) + return f + + def _add_response(self, state): + f = self._add_request(state) + r = utils.tresp(f.request) + state.add_response(r) + + def _add_error(self, state): + f = utils.tflow() + f.error = proxy.Error(None, "msg") + state.add_browserconnect(f) + q = utils.treq(f.connection) + state.add_request(q) + + def test_kill_flow(self): + c = flow.State() + f = utils.tflow() + c.add_browserconnect(f) + c.kill_flow(f) + assert not c.flow_list + + def test_clear(self): + c = flow.State() + f = utils.tflow() + c.add_browserconnect(f) + f.intercepting = True + + c.clear() + assert len(c.flow_list) == 1 + f.intercepting = False + c.clear() + assert len(c.flow_list) == 0 + + def test_dump_flows(self): + c = flow.State() + self._add_request(c) + self._add_response(c) + self._add_request(c) + self._add_response(c) + self._add_request(c) + self._add_response(c) + self._add_error(c) + + dump = c.dump_flows() + c.clear() + c.load_flows(dump) + assert isinstance(c.flow_list[0], flow.Flow) + + def test_accept_all(self): + c = flow.State() + self._add_request(c) + self._add_response(c) + self._add_request(c) + c.accept_all() + + + tests = [ - uFlow() + uFlow(), + uState(), ] diff --git a/test/test_proxy.py b/test/test_proxy.py index ea1c56aa..ff9ec4bb 100644 --- a/test/test_proxy.py +++ b/test/test_proxy.py @@ -1,7 +1,7 @@ import threading, urllib, Queue, urllib2, cStringIO import libpry import serv, sslserv -from libmproxy import proxy, controller, utils +from libmproxy import proxy, controller, utils, dump, script import random # Yes, the random ports are horrible. During development, sockets are often not @@ -196,6 +196,9 @@ class u_parse_url(libpry.AutoTree): s, h, po, pa = proxy.parse_url("http://foo") assert pa == "/" + s, h, po, pa = proxy.parse_url("https://foo") + assert po == 443 + class uConfig(libpry.AutoTree): def test_pem(self): @@ -210,6 +213,8 @@ class uFileLike(libpry.AutoTree): s.flush() assert s.readline() == "foobar\n" assert s.readline() == "foobar" + # Test __getattr__ + assert s.isatty class uRequest(libpry.AutoTree): |