diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/test_console_common.py | 2 | ||||
-rw-r--r-- | test/test_contentview.py (renamed from test/test_console_contentview.py) | 137 | ||||
-rw-r--r-- | test/test_dump.py | 57 | ||||
-rw-r--r-- | test/test_server.py | 13 |
4 files changed, 107 insertions, 102 deletions
diff --git a/test/test_console_common.py b/test/test_console_common.py index 57cbef98..459539c5 100644 --- a/test/test_console_common.py +++ b/test/test_console_common.py @@ -3,10 +3,8 @@ from nose.plugins.skip import SkipTest if os.name == "nt": raise SkipTest("Skipped on Windows.") -from netlib import encoding import libmproxy.console.common as common -from libmproxy import utils, flow import tutils diff --git a/test/test_console_contentview.py b/test/test_contentview.py index 6a93346a..97608520 100644 --- a/test/test_console_contentview.py +++ b/test/test_contentview.py @@ -1,16 +1,9 @@ -import os -from nose.plugins.skip import SkipTest +from libmproxy.exceptions import ContentViewException from netlib.http import Headers - -if os.name == "nt": - raise SkipTest("Skipped on Windows.") -import sys - import netlib.utils from netlib import encoding -import libmproxy.console.contentview as cv -from libmproxy import utils, flow +import libmproxy.contentviews as cv import tutils try: @@ -25,76 +18,65 @@ except: class TestContentView: - def test_trailer(self): - txt = [] - cv.trailer(5, txt, 1000) - assert not txt - cv.trailer(cv.VIEW_CUTOFF + 10, txt, cv.VIEW_CUTOFF) - assert txt def test_view_auto(self): v = cv.ViewAuto() f = v( - Headers(), "foo", - 1000 + headers=Headers() ) assert f[0] == "Raw" f = v( - Headers(content_type="text/html"), "<html></html>", - 1000 + headers=Headers(content_type="text/html") ) assert f[0] == "HTML" f = v( - Headers(content_type="text/flibble"), "foo", - 1000 + headers=Headers(content_type="text/flibble") ) assert f[0] == "Raw" f = v( - Headers(content_type="text/flibble"), "<xml></xml>", - 1000 + headers=Headers(content_type="text/flibble") ) assert f[0].startswith("XML") def test_view_urlencoded(self): d = netlib.utils.urlencode([("one", "two"), ("three", "four")]) v = cv.ViewURLEncoded() - assert v([], d, 100) + assert v(d) d = netlib.utils.urlencode([("adsfa", "")]) v = cv.ViewURLEncoded() - assert v([], d, 100) + assert v(d) def test_view_html(self): v = cv.ViewHTML() s = "<html><br><br></br><p>one</p></html>" - assert v([], s, 1000) + assert v(s) s = "gobbledygook" - assert not v([], s, 1000) + assert not v(s) def test_view_html_outline(self): v = cv.ViewHTMLOutline() s = "<html><br><br></br><p>one</p></html>" - assert v([], s, 1000) + assert v(s) def test_view_json(self): cv.VIEW_CUTOFF = 100 v = cv.ViewJSON() - assert v([], "{}", 1000) - assert not v([], "{", 1000) - assert v([], "[" + ",".join(["0"] * cv.VIEW_CUTOFF) + "]", 1000) - assert v([], "[1, 2, 3, 4, 5]", 5) + assert v("{}") + assert not v("{") + assert v("[1, 2, 3, 4, 5]") def test_view_xml(self): v = cv.ViewXML() - assert v([], "<foo></foo>", 1000) - assert not v([], "<foo>", 1000) + assert v("<foo></foo>") + assert not v("<foo>") s = """<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet title="XSL_formatting"?> <rss @@ -103,17 +85,17 @@ class TestContentView: version="2.0"> </rss> """ - assert v([], s, 1000) + assert v(s) def test_view_raw(self): v = cv.ViewRaw() - assert v([], "foo", 1000) + assert v("foo") def test_view_javascript(self): v = cv.ViewJavaScript() - assert v([], "[1, 2, 3]", 100) - assert v([], "[1, 2, 3", 100) - assert v([], "function(a){[1, 2, 3]}", 100) + assert v("[1, 2, 3]") + assert v("[1, 2, 3") + assert v("function(a){[1, 2, 3]}") def test_view_css(self): v = cv.ViewCSS() @@ -121,39 +103,39 @@ class TestContentView: with open(tutils.test_data.path('data/1.css'), 'r') as fp: fixture_1 = fp.read() - result = v([], 'a', 100) + result = v('a') if cssutils: - assert len(result[1]) == 0 + assert len(list(result[1])) == 0 else: - assert len(result[1]) == 1 + assert len(list(result[1])) == 1 - result = v([], fixture_1, 100) + result = v(fixture_1) if cssutils: - assert len(result[1]) > 1 + assert len(list(result[1])) > 1 else: - assert len(result[1]) == 1 + assert len(list(result[1])) == 1 def test_view_hex(self): v = cv.ViewHex() - assert v([], "foo", 1000) + assert v("foo") def test_view_image(self): v = cv.ViewImage() p = tutils.test_data.path("data/image.png") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) p = tutils.test_data.path("data/image.gif") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) p = tutils.test_data.path("data/image-err1.jpg") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) p = tutils.test_data.path("data/image.ico") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) - assert not v([], "flibble", sys.maxsize) + assert not v("flibble") def test_view_multipart(self): view = cv.ViewMultipart() @@ -165,76 +147,65 @@ Larry --AaB03x """.strip() h = Headers(content_type="multipart/form-data; boundary=AaB03x") - assert view(h, v, 1000) + assert view(v, headers=h) h = Headers() - assert not view(h, v, 1000) + assert not view(v, headers=h) h = Headers(content_type="multipart/form-data") - assert not view(h, v, 1000) + assert not view(v, headers=h) h = Headers(content_type="unparseable") - assert not view(h, v, 1000) + assert not view(v, headers=h) def test_get_content_view(self): r = cv.get_content_view( cv.get("Raw"), - Headers(content_type="application/json"), "[1, 2, 3]", - 1000, - False + headers=Headers(content_type="application/json") ) assert "Raw" in r[0] r = cv.get_content_view( cv.get("Auto"), - Headers(content_type="application/json"), "[1, 2, 3]", - 1000, - False + headers=Headers(content_type="application/json") ) assert r[0] == "JSON" r = cv.get_content_view( cv.get("Auto"), - Headers(content_type="application/json"), "[1, 2", - 1000, - False + headers=Headers(content_type="application/json") ) assert "Raw" in r[0] - r = cv.get_content_view( + tutils.raises( + ContentViewException, + cv.get_content_view, cv.get("AMF"), - Headers(), "[1, 2", - 1000, - False + headers=Headers() ) - assert "Raw" in r[0] r = cv.get_content_view( cv.get("Auto"), - Headers( + encoding.encode('gzip', "[1, 2, 3]"), + headers=Headers( content_type="application/json", content_encoding="gzip" - ), - encoding.encode('gzip', "[1, 2, 3]"), - 1000, - False + ) ) assert "decoded gzip" in r[0] assert "JSON" in r[0] r = cv.get_content_view( cv.get("XML"), - Headers( + encoding.encode('gzip', "[1, 2, 3]"), + headers=Headers( content_type="application/json", content_encoding="gzip" - ), - encoding.encode('gzip', "[1, 2, 3]"), - 1000, - False + ) ) assert "decoded gzip" in r[0] assert "Raw" in r[0] @@ -245,22 +216,22 @@ if pyamf: v = cv.ViewAMF() p = tutils.test_data.path("data/amf01") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) p = tutils.test_data.path("data/amf02") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) def test_view_amf_response(): v = cv.ViewAMF() p = tutils.test_data.path("data/amf03") - assert v([], file(p, "rb").read(), sys.maxsize) + assert v(file(p, "rb").read()) if cv.ViewProtobuf.is_available(): def test_view_protobuf_request(): v = cv.ViewProtobuf() p = tutils.test_data.path("data/protobuf01") - content_type, output = v([], file(p, "rb").read(), sys.maxsize) + content_type, output = v(file(p, "rb").read()) assert content_type == "Protobuf" assert output[0].text == '1: "3bbc333c-e61c-433b-819a-0b9a8cc103b8"' diff --git a/test/test_dump.py b/test/test_dump.py index c76f555f..29931759 100644 --- a/test/test_dump.py +++ b/test/test_dump.py @@ -1,5 +1,6 @@ import os from cStringIO import StringIO +from libmproxy.exceptions import ContentViewException from libmproxy.models import HTTPResponse import netlib.tutils @@ -12,17 +13,51 @@ import mock def test_strfuncs(): - t = HTTPResponse.wrap(netlib.tutils.tresp()) - t.is_replay = True - dump.str_response(t) - - f = tutils.tflow() - f.client_conn = None - f.request.stickycookie = True - assert "stickycookie" in dump.str_request(f, False) - assert "stickycookie" in dump.str_request(f, True) - assert "replay" in dump.str_request(f, False) - assert "replay" in dump.str_request(f, True) + o = dump.Options() + m = dump.DumpMaster(None, o) + + m.outfile = StringIO() + m.o.flow_detail = 0 + m.echo_flow(tutils.tflow()) + assert not m.outfile.getvalue() + + m.o.flow_detail = 4 + m.echo_flow(tutils.tflow()) + assert m.outfile.getvalue() + + m.outfile = StringIO() + m.echo_flow(tutils.tflow(resp=True)) + assert "<<" in m.outfile.getvalue() + + m.outfile = StringIO() + m.echo_flow(tutils.tflow(err=True)) + assert "<<" in m.outfile.getvalue() + + flow = tutils.tflow() + flow.request = netlib.tutils.treq() + flow.request.stickycookie = True + flow.client_conn = mock.MagicMock() + flow.client_conn.address.host = "foo" + flow.response = netlib.tutils.tresp(content=CONTENT_MISSING) + flow.response.is_replay = True + flow.response.code = 300 + m.echo_flow(flow) + + + flow = tutils.tflow(resp=netlib.tutils.tresp("{")) + flow.response.headers["content-type"] = "application/json" + flow.response.code = 400 + m.echo_flow(flow) + + +@mock.patch("libmproxy.contentviews.get_content_view") +def test_contentview(get_content_view): + get_content_view.side_effect = ContentViewException(""), ("x", iter([])) + + o = dump.Options(flow_detail=4, verbosity=3) + m = dump.DumpMaster(None, o, StringIO()) + m.echo_flow(tutils.tflow()) + assert "Content viewer failed" in m.outfile.getvalue() class TestDumpMaster: diff --git a/test/test_server.py b/test/test_server.py index 829b5f0a..4a5dd7c2 100644 --- a/test/test_server.py +++ b/test/test_server.py @@ -522,13 +522,13 @@ class TestProxy(tservers.HTTPProxTest): assert f.response.code == 304 def test_response_timestamps(self): - # test that we notice at least 2 sec delay between timestamps + # test that we notice at least 1 sec delay between timestamps # in response object f = self.pathod("304:b@1k:p50,1") assert f.status_code == 304 response = self.master.state.view[0].response - assert 1 <= response.timestamp_end - response.timestamp_start <= 1.2 + assert 0.9 <= response.timestamp_end - response.timestamp_start <= 1.2 def test_request_timestamps(self): # test that we notice a delay between timestamps in request object @@ -547,8 +547,9 @@ class TestProxy(tservers.HTTPProxTest): request, response = self.master.state.view[ 0].request, self.master.state.view[0].response assert response.code == 304 # sanity test for our low level request - # time.sleep might be a little bit shorter than a second - assert 0.95 < (request.timestamp_end - request.timestamp_start) < 1.2 + # time.sleep might be a little bit shorter than a second, + # we observed up to 0.93s on appveyor. + assert 0.8 < (request.timestamp_end - request.timestamp_start) < 1.2 def test_request_timestamps_not_affected_by_client_time(self): # test that don't include user wait time in request's timestamps @@ -711,7 +712,7 @@ class TestStreamRequest(tservers.HTTPProxTest): connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM) connection.connect(("127.0.0.1", self.proxy.port)) fconn = connection.makefile() - spec = '200:h"Transfer-Encoding"="chunked":r:b"4\\r\\nthis\\r\\n7\\r\\nisatest\\r\\n0\\r\\n\\r\\n"' + spec = '200:h"Transfer-Encoding"="chunked":r:b"4\\r\\nthis\\r\\n11\\r\\nisatest__reachhex\\r\\n0\\r\\n\\r\\n"' connection.send( "GET %s/p/%s HTTP/1.1\r\n" % (self.server.urlbase, spec)) @@ -726,7 +727,7 @@ class TestStreamRequest(tservers.HTTPProxTest): chunks = list(protocol.read_http_body_chunked( resp.headers, None, "GET", 200, False )) - assert chunks == ["this", "isatest", ""] + assert chunks == ["this", "isatest__reachhex"] connection.close() |