aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_console_common.py2
-rw-r--r--test/test_contentview.py (renamed from test/test_console_contentview.py)137
-rw-r--r--test/test_dump.py57
-rw-r--r--test/test_server.py13
4 files changed, 107 insertions, 102 deletions
diff --git a/test/test_console_common.py b/test/test_console_common.py
index 57cbef98..459539c5 100644
--- a/test/test_console_common.py
+++ b/test/test_console_common.py
@@ -3,10 +3,8 @@ from nose.plugins.skip import SkipTest
if os.name == "nt":
raise SkipTest("Skipped on Windows.")
-from netlib import encoding
import libmproxy.console.common as common
-from libmproxy import utils, flow
import tutils
diff --git a/test/test_console_contentview.py b/test/test_contentview.py
index 6a93346a..97608520 100644
--- a/test/test_console_contentview.py
+++ b/test/test_contentview.py
@@ -1,16 +1,9 @@
-import os
-from nose.plugins.skip import SkipTest
+from libmproxy.exceptions import ContentViewException
from netlib.http import Headers
-
-if os.name == "nt":
- raise SkipTest("Skipped on Windows.")
-import sys
-
import netlib.utils
from netlib import encoding
-import libmproxy.console.contentview as cv
-from libmproxy import utils, flow
+import libmproxy.contentviews as cv
import tutils
try:
@@ -25,76 +18,65 @@ except:
class TestContentView:
- def test_trailer(self):
- txt = []
- cv.trailer(5, txt, 1000)
- assert not txt
- cv.trailer(cv.VIEW_CUTOFF + 10, txt, cv.VIEW_CUTOFF)
- assert txt
def test_view_auto(self):
v = cv.ViewAuto()
f = v(
- Headers(),
"foo",
- 1000
+ headers=Headers()
)
assert f[0] == "Raw"
f = v(
- Headers(content_type="text/html"),
"<html></html>",
- 1000
+ headers=Headers(content_type="text/html")
)
assert f[0] == "HTML"
f = v(
- Headers(content_type="text/flibble"),
"foo",
- 1000
+ headers=Headers(content_type="text/flibble")
)
assert f[0] == "Raw"
f = v(
- Headers(content_type="text/flibble"),
"<xml></xml>",
- 1000
+ headers=Headers(content_type="text/flibble")
)
assert f[0].startswith("XML")
def test_view_urlencoded(self):
d = netlib.utils.urlencode([("one", "two"), ("three", "four")])
v = cv.ViewURLEncoded()
- assert v([], d, 100)
+ assert v(d)
d = netlib.utils.urlencode([("adsfa", "")])
v = cv.ViewURLEncoded()
- assert v([], d, 100)
+ assert v(d)
def test_view_html(self):
v = cv.ViewHTML()
s = "<html><br><br></br><p>one</p></html>"
- assert v([], s, 1000)
+ assert v(s)
s = "gobbledygook"
- assert not v([], s, 1000)
+ assert not v(s)
def test_view_html_outline(self):
v = cv.ViewHTMLOutline()
s = "<html><br><br></br><p>one</p></html>"
- assert v([], s, 1000)
+ assert v(s)
def test_view_json(self):
cv.VIEW_CUTOFF = 100
v = cv.ViewJSON()
- assert v([], "{}", 1000)
- assert not v([], "{", 1000)
- assert v([], "[" + ",".join(["0"] * cv.VIEW_CUTOFF) + "]", 1000)
- assert v([], "[1, 2, 3, 4, 5]", 5)
+ assert v("{}")
+ assert not v("{")
+ assert v("[1, 2, 3, 4, 5]")
def test_view_xml(self):
v = cv.ViewXML()
- assert v([], "<foo></foo>", 1000)
- assert not v([], "<foo>", 1000)
+ assert v("<foo></foo>")
+ assert not v("<foo>")
s = """<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet title="XSL_formatting"?>
<rss
@@ -103,17 +85,17 @@ class TestContentView:
version="2.0">
</rss>
"""
- assert v([], s, 1000)
+ assert v(s)
def test_view_raw(self):
v = cv.ViewRaw()
- assert v([], "foo", 1000)
+ assert v("foo")
def test_view_javascript(self):
v = cv.ViewJavaScript()
- assert v([], "[1, 2, 3]", 100)
- assert v([], "[1, 2, 3", 100)
- assert v([], "function(a){[1, 2, 3]}", 100)
+ assert v("[1, 2, 3]")
+ assert v("[1, 2, 3")
+ assert v("function(a){[1, 2, 3]}")
def test_view_css(self):
v = cv.ViewCSS()
@@ -121,39 +103,39 @@ class TestContentView:
with open(tutils.test_data.path('data/1.css'), 'r') as fp:
fixture_1 = fp.read()
- result = v([], 'a', 100)
+ result = v('a')
if cssutils:
- assert len(result[1]) == 0
+ assert len(list(result[1])) == 0
else:
- assert len(result[1]) == 1
+ assert len(list(result[1])) == 1
- result = v([], fixture_1, 100)
+ result = v(fixture_1)
if cssutils:
- assert len(result[1]) > 1
+ assert len(list(result[1])) > 1
else:
- assert len(result[1]) == 1
+ assert len(list(result[1])) == 1
def test_view_hex(self):
v = cv.ViewHex()
- assert v([], "foo", 1000)
+ assert v("foo")
def test_view_image(self):
v = cv.ViewImage()
p = tutils.test_data.path("data/image.png")
- assert v([], file(p, "rb").read(), sys.maxsize)
+ assert v(file(p, "rb").read())
p = tutils.test_data.path("data/image.gif")
- assert v([], file(p, "rb").read(), sys.maxsize)
+ assert v(file(p, "rb").read())
p = tutils.test_data.path("data/image-err1.jpg")
- assert v([], file(p, "rb").read(), sys.maxsize)
+ assert v(file(p, "rb").read())
p = tutils.test_data.path("data/image.ico")
- assert v([], file(p, "rb").read(), sys.maxsize)
+ assert v(file(p, "rb").read())
- assert not v([], "flibble", sys.maxsize)
+ assert not v("flibble")
def test_view_multipart(self):
view = cv.ViewMultipart()
@@ -165,76 +147,65 @@ Larry
--AaB03x
""".strip()
h = Headers(content_type="multipart/form-data; boundary=AaB03x")
- assert view(h, v, 1000)
+ assert view(v, headers=h)
h = Headers()
- assert not view(h, v, 1000)
+ assert not view(v, headers=h)
h = Headers(content_type="multipart/form-data")
- assert not view(h, v, 1000)
+ assert not view(v, headers=h)
h = Headers(content_type="unparseable")
- assert not view(h, v, 1000)
+ assert not view(v, headers=h)
def test_get_content_view(self):
r = cv.get_content_view(
cv.get("Raw"),
- Headers(content_type="application/json"),
"[1, 2, 3]",
- 1000,
- False
+ headers=Headers(content_type="application/json")
)
assert "Raw" in r[0]
r = cv.get_content_view(
cv.get("Auto"),
- Headers(content_type="application/json"),
"[1, 2, 3]",
- 1000,
- False
+ headers=Headers(content_type="application/json")
)
assert r[0] == "JSON"
r = cv.get_content_view(
cv.get("Auto"),
- Headers(content_type="application/json"),
"[1, 2",
- 1000,
- False
+ headers=Headers(content_type="application/json")
)
assert "Raw" in r[0]
- r = cv.get_content_view(
+ tutils.raises(
+ ContentViewException,
+ cv.get_content_view,
cv.get("AMF"),
- Headers(),
"[1, 2",
- 1000,
- False
+ headers=Headers()
)
- assert "Raw" in r[0]
r = cv.get_content_view(
cv.get("Auto"),
- Headers(
+ encoding.encode('gzip', "[1, 2, 3]"),
+ headers=Headers(
content_type="application/json",
content_encoding="gzip"
- ),
- encoding.encode('gzip', "[1, 2, 3]"),
- 1000,
- False
+ )
)
assert "decoded gzip" in r[0]
assert "JSON" in r[0]
r = cv.get_content_view(
cv.get("XML"),
- Headers(
+ encoding.encode('gzip', "[1, 2, 3]"),
+ headers=Headers(
content_type="application/json",
content_encoding="gzip"
- ),
- encoding.encode('gzip', "[1, 2, 3]"),
- 1000,
- False
+ )
)
assert "decoded gzip" in r[0]
assert "Raw" in r[0]
@@ -245,22 +216,22 @@ if pyamf:
v = cv.ViewAMF()
p = tutils.test_data.path("data/amf01")
- assert v([], file(p, "rb").read(), sys.maxsize)
+ assert v(file(p, "rb").read())
p = tutils.test_data.path("data/amf02")
- assert v([], file(p, "rb").read(), sys.maxsize)
+ assert v(file(p, "rb").read())
def test_view_amf_response():
v = cv.ViewAMF()
p = tutils.test_data.path("data/amf03")
- assert v([], file(p, "rb").read(), sys.maxsize)
+ assert v(file(p, "rb").read())
if cv.ViewProtobuf.is_available():
def test_view_protobuf_request():
v = cv.ViewProtobuf()
p = tutils.test_data.path("data/protobuf01")
- content_type, output = v([], file(p, "rb").read(), sys.maxsize)
+ content_type, output = v(file(p, "rb").read())
assert content_type == "Protobuf"
assert output[0].text == '1: "3bbc333c-e61c-433b-819a-0b9a8cc103b8"'
diff --git a/test/test_dump.py b/test/test_dump.py
index c76f555f..29931759 100644
--- a/test/test_dump.py
+++ b/test/test_dump.py
@@ -1,5 +1,6 @@
import os
from cStringIO import StringIO
+from libmproxy.exceptions import ContentViewException
from libmproxy.models import HTTPResponse
import netlib.tutils
@@ -12,17 +13,51 @@ import mock
def test_strfuncs():
- t = HTTPResponse.wrap(netlib.tutils.tresp())
- t.is_replay = True
- dump.str_response(t)
-
- f = tutils.tflow()
- f.client_conn = None
- f.request.stickycookie = True
- assert "stickycookie" in dump.str_request(f, False)
- assert "stickycookie" in dump.str_request(f, True)
- assert "replay" in dump.str_request(f, False)
- assert "replay" in dump.str_request(f, True)
+ o = dump.Options()
+ m = dump.DumpMaster(None, o)
+
+ m.outfile = StringIO()
+ m.o.flow_detail = 0
+ m.echo_flow(tutils.tflow())
+ assert not m.outfile.getvalue()
+
+ m.o.flow_detail = 4
+ m.echo_flow(tutils.tflow())
+ assert m.outfile.getvalue()
+
+ m.outfile = StringIO()
+ m.echo_flow(tutils.tflow(resp=True))
+ assert "<<" in m.outfile.getvalue()
+
+ m.outfile = StringIO()
+ m.echo_flow(tutils.tflow(err=True))
+ assert "<<" in m.outfile.getvalue()
+
+ flow = tutils.tflow()
+ flow.request = netlib.tutils.treq()
+ flow.request.stickycookie = True
+ flow.client_conn = mock.MagicMock()
+ flow.client_conn.address.host = "foo"
+ flow.response = netlib.tutils.tresp(content=CONTENT_MISSING)
+ flow.response.is_replay = True
+ flow.response.code = 300
+ m.echo_flow(flow)
+
+
+ flow = tutils.tflow(resp=netlib.tutils.tresp("{"))
+ flow.response.headers["content-type"] = "application/json"
+ flow.response.code = 400
+ m.echo_flow(flow)
+
+
+@mock.patch("libmproxy.contentviews.get_content_view")
+def test_contentview(get_content_view):
+ get_content_view.side_effect = ContentViewException(""), ("x", iter([]))
+
+ o = dump.Options(flow_detail=4, verbosity=3)
+ m = dump.DumpMaster(None, o, StringIO())
+ m.echo_flow(tutils.tflow())
+ assert "Content viewer failed" in m.outfile.getvalue()
class TestDumpMaster:
diff --git a/test/test_server.py b/test/test_server.py
index 829b5f0a..4a5dd7c2 100644
--- a/test/test_server.py
+++ b/test/test_server.py
@@ -522,13 +522,13 @@ class TestProxy(tservers.HTTPProxTest):
assert f.response.code == 304
def test_response_timestamps(self):
- # test that we notice at least 2 sec delay between timestamps
+ # test that we notice at least 1 sec delay between timestamps
# in response object
f = self.pathod("304:b@1k:p50,1")
assert f.status_code == 304
response = self.master.state.view[0].response
- assert 1 <= response.timestamp_end - response.timestamp_start <= 1.2
+ assert 0.9 <= response.timestamp_end - response.timestamp_start <= 1.2
def test_request_timestamps(self):
# test that we notice a delay between timestamps in request object
@@ -547,8 +547,9 @@ class TestProxy(tservers.HTTPProxTest):
request, response = self.master.state.view[
0].request, self.master.state.view[0].response
assert response.code == 304 # sanity test for our low level request
- # time.sleep might be a little bit shorter than a second
- assert 0.95 < (request.timestamp_end - request.timestamp_start) < 1.2
+ # time.sleep might be a little bit shorter than a second,
+ # we observed up to 0.93s on appveyor.
+ assert 0.8 < (request.timestamp_end - request.timestamp_start) < 1.2
def test_request_timestamps_not_affected_by_client_time(self):
# test that don't include user wait time in request's timestamps
@@ -711,7 +712,7 @@ class TestStreamRequest(tservers.HTTPProxTest):
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
connection.connect(("127.0.0.1", self.proxy.port))
fconn = connection.makefile()
- spec = '200:h"Transfer-Encoding"="chunked":r:b"4\\r\\nthis\\r\\n7\\r\\nisatest\\r\\n0\\r\\n\\r\\n"'
+ spec = '200:h"Transfer-Encoding"="chunked":r:b"4\\r\\nthis\\r\\n11\\r\\nisatest__reachhex\\r\\n0\\r\\n\\r\\n"'
connection.send(
"GET %s/p/%s HTTP/1.1\r\n" %
(self.server.urlbase, spec))
@@ -726,7 +727,7 @@ class TestStreamRequest(tservers.HTTPProxTest):
chunks = list(protocol.read_http_body_chunked(
resp.headers, None, "GET", 200, False
))
- assert chunks == ["this", "isatest", ""]
+ assert chunks == ["this", "isatest__reachhex"]
connection.close()