aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2017-02-17 22:56:13 +0100
committerGitHub <noreply@github.com>2017-02-17 22:56:13 +0100
commit33acb48c71c26d0a54a2b4dafade07563b1be5c2 (patch)
treea1734516b7ecabca0d4192463e358ffcc729dd05 /test
parent4cec88fc7fcf8a5c16c0639961c27792cd73af13 (diff)
parent6b22ca7a32252b5c5963704f6276eeac390a3622 (diff)
downloadmitmproxy-33acb48c71c26d0a54a2b4dafade07563b1be5c2.tar.gz
mitmproxy-33acb48c71c26d0a54a2b4dafade07563b1be5c2.tar.bz2
mitmproxy-33acb48c71c26d0a54a2b4dafade07563b1be5c2.zip
Merge pull request #2032 from Kriechi/individual-coverage
add individual-coverage check
Diffstat (limited to 'test')
-rw-r--r--test/individual_coverage.py82
-rw-r--r--test/mitmproxy/test_flow.py112
-rw-r--r--test/mitmproxy/test_tcp.py60
-rw-r--r--test/mitmproxy/test_websocket.py63
4 files changed, 203 insertions, 114 deletions
diff --git a/test/individual_coverage.py b/test/individual_coverage.py
new file mode 100644
index 00000000..35bcd27f
--- /dev/null
+++ b/test/individual_coverage.py
@@ -0,0 +1,82 @@
+import io
+import contextlib
+import os
+import sys
+import glob
+import multiprocessing
+import configparser
+import itertools
+import pytest
+
+
+def run_tests(src, test, fail):
+ stderr = io.StringIO()
+ stdout = io.StringIO()
+ with contextlib.redirect_stderr(stderr):
+ with contextlib.redirect_stdout(stdout):
+ e = pytest.main([
+ '-qq',
+ '--disable-pytest-warnings',
+ '--no-faulthandler',
+ '--cov', src.replace('.py', '').replace('/', '.'),
+ '--cov-fail-under', '100',
+ '--cov-report', 'term-missing:skip-covered',
+ test
+ ])
+
+ if e == 0:
+ if fail:
+ print("SUCCESS but should have FAILED:", src, "Please remove this file from setup.cfg tool:individual_coverage/exclude.")
+ e = 42
+ else:
+ print("SUCCESS:", src)
+ else:
+ if fail:
+ print("Ignoring fail:", src)
+ e = 0
+ else:
+ cov = [l for l in stdout.getvalue().split("\n") if (src in l) or ("was never imported" in l)]
+ if len(cov) == 1:
+ print("FAIL:", cov[0])
+ else:
+ print("FAIL:", src, test, stdout.getvalue(), stdout.getvalue())
+ print(stderr.getvalue())
+ print(stdout.getvalue())
+
+ sys.exit(e)
+
+
+def start_pytest(src, test, fail):
+ # run pytest in a new process, otherwise imports and modules might conflict
+ proc = multiprocessing.Process(target=run_tests, args=(src, test, fail))
+ proc.start()
+ proc.join()
+ return (src, test, proc.exitcode)
+
+
+def main():
+ c = configparser.ConfigParser()
+ c.read('setup.cfg')
+ fs = c['tool:individual_coverage']['exclude'].strip().split('\n')
+ no_individual_cov = [f.strip() for f in fs]
+
+ excluded = ['mitmproxy/contrib/', 'mitmproxy/test/', 'mitmproxy/tools/', 'mitmproxy/platform/']
+ src_files = glob.glob('mitmproxy/**/*.py', recursive=True) + glob.glob('pathod/**/*.py', recursive=True)
+ src_files = [f for f in src_files if os.path.basename(f) != '__init__.py']
+ src_files = [f for f in src_files if not any(os.path.normpath(p) in f for p in excluded)]
+
+ ps = []
+ for src in sorted(src_files):
+ test = os.path.join("test", os.path.dirname(src), "test_" + os.path.basename(src))
+ if os.path.isfile(test):
+ ps.append((src, test, src in no_individual_cov))
+
+ result = list(itertools.starmap(start_pytest, ps))
+
+ if any(e != 0 for _, _, e in result):
+ sys.exit(1)
+ pass
+
+
+if __name__ == '__main__':
+ main()
diff --git a/test/mitmproxy/test_flow.py b/test/mitmproxy/test_flow.py
index 65e6845f..a78e5f80 100644
--- a/test/mitmproxy/test_flow.py
+++ b/test/mitmproxy/test_flow.py
@@ -10,7 +10,6 @@ from mitmproxy.exceptions import FlowReadException, Kill
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import connections
-from mitmproxy import tcp
from mitmproxy.proxy import ProxyConfig
from mitmproxy.proxy.server import DummyServer
from mitmproxy import master
@@ -157,117 +156,6 @@ class TestHTTPFlow:
assert f.response.raw_content == b"abarb"
-class TestWebSocketFlow:
-
- def test_copy(self):
- f = tflow.twebsocketflow()
- f.get_state()
- f2 = f.copy()
- a = f.get_state()
- b = f2.get_state()
- del a["id"]
- del b["id"]
- del a["handshake_flow"]["id"]
- del b["handshake_flow"]["id"]
- assert a == b
- assert not f == f2
- assert f is not f2
-
- assert f.client_key == f2.client_key
- assert f.client_protocol == f2.client_protocol
- assert f.client_extensions == f2.client_extensions
- assert f.server_accept == f2.server_accept
- assert f.server_protocol == f2.server_protocol
- assert f.server_extensions == f2.server_extensions
- assert f.messages is not f2.messages
- assert f.handshake_flow is not f2.handshake_flow
-
- for m in f.messages:
- m2 = m.copy()
- m2.set_state(m2.get_state())
- assert m is not m2
- assert m.get_state() == m2.get_state()
-
- f = tflow.twebsocketflow(err=True)
- f2 = f.copy()
- assert f is not f2
- assert f.handshake_flow is not f2.handshake_flow
- assert f.error.get_state() == f2.error.get_state()
- assert f.error is not f2.error
-
- def test_match(self):
- f = tflow.twebsocketflow()
- assert not flowfilter.match("~b nonexistent", f)
- assert flowfilter.match(None, f)
- assert not flowfilter.match("~b nonexistent", f)
-
- f = tflow.twebsocketflow(err=True)
- assert flowfilter.match("~e", f)
-
- with pytest.raises(ValueError):
- flowfilter.match("~", f)
-
- def test_repr(self):
- f = tflow.twebsocketflow()
- assert 'WebSocketFlow' in repr(f)
- assert 'binary message: ' in repr(f.messages[0])
- assert 'text message: ' in repr(f.messages[1])
-
-
-class TestTCPFlow:
-
- def test_copy(self):
- f = tflow.ttcpflow()
- f.get_state()
- f2 = f.copy()
- a = f.get_state()
- b = f2.get_state()
- del a["id"]
- del b["id"]
- assert a == b
- assert not f == f2
- assert f is not f2
-
- assert f.messages is not f2.messages
-
- for m in f.messages:
- assert m.get_state()
- m2 = m.copy()
- assert not m == m2
- assert m is not m2
-
- a = m.get_state()
- b = m2.get_state()
- assert a == b
-
- m = tcp.TCPMessage(False, 'foo')
- m.set_state(f.messages[0].get_state())
- assert m.timestamp == f.messages[0].timestamp
-
- f = tflow.ttcpflow(err=True)
- f2 = f.copy()
- assert f is not f2
- assert f.error.get_state() == f2.error.get_state()
- assert f.error is not f2.error
-
- def test_match(self):
- f = tflow.ttcpflow()
- assert not flowfilter.match("~b nonexistent", f)
- assert flowfilter.match(None, f)
- assert not flowfilter.match("~b nonexistent", f)
-
- f = tflow.ttcpflow(err=True)
- assert flowfilter.match("~e", f)
-
- with pytest.raises(ValueError):
- flowfilter.match("~", f)
-
- def test_repr(self):
- f = tflow.ttcpflow()
- assert 'TCPFlow' in repr(f)
- assert '-> ' in repr(f.messages[0])
-
-
class TestSerialize:
def _treader(self):
diff --git a/test/mitmproxy/test_tcp.py b/test/mitmproxy/test_tcp.py
index 777ab4dd..dce6493c 100644
--- a/test/mitmproxy/test_tcp.py
+++ b/test/mitmproxy/test_tcp.py
@@ -1 +1,59 @@
-# TODO: write tests
+import pytest
+
+from mitmproxy import tcp
+from mitmproxy import flowfilter
+from mitmproxy.test import tflow
+
+
+class TestTCPFlow:
+
+ def test_copy(self):
+ f = tflow.ttcpflow()
+ f.get_state()
+ f2 = f.copy()
+ a = f.get_state()
+ b = f2.get_state()
+ del a["id"]
+ del b["id"]
+ assert a == b
+ assert not f == f2
+ assert f is not f2
+
+ assert f.messages is not f2.messages
+
+ for m in f.messages:
+ assert m.get_state()
+ m2 = m.copy()
+ assert not m == m2
+ assert m is not m2
+
+ a = m.get_state()
+ b = m2.get_state()
+ assert a == b
+
+ m = tcp.TCPMessage(False, 'foo')
+ m.set_state(f.messages[0].get_state())
+ assert m.timestamp == f.messages[0].timestamp
+
+ f = tflow.ttcpflow(err=True)
+ f2 = f.copy()
+ assert f is not f2
+ assert f.error.get_state() == f2.error.get_state()
+ assert f.error is not f2.error
+
+ def test_match(self):
+ f = tflow.ttcpflow()
+ assert not flowfilter.match("~b nonexistent", f)
+ assert flowfilter.match(None, f)
+ assert not flowfilter.match("~b nonexistent", f)
+
+ f = tflow.ttcpflow(err=True)
+ assert flowfilter.match("~e", f)
+
+ with pytest.raises(ValueError):
+ flowfilter.match("~", f)
+
+ def test_repr(self):
+ f = tflow.ttcpflow()
+ assert 'TCPFlow' in repr(f)
+ assert '-> ' in repr(f.messages[0])
diff --git a/test/mitmproxy/test_websocket.py b/test/mitmproxy/test_websocket.py
index 777ab4dd..f2963390 100644
--- a/test/mitmproxy/test_websocket.py
+++ b/test/mitmproxy/test_websocket.py
@@ -1 +1,62 @@
-# TODO: write tests
+import pytest
+
+from mitmproxy import flowfilter
+from mitmproxy.test import tflow
+
+
+class TestWebSocketFlow:
+
+ def test_copy(self):
+ f = tflow.twebsocketflow()
+ f.get_state()
+ f2 = f.copy()
+ a = f.get_state()
+ b = f2.get_state()
+ del a["id"]
+ del b["id"]
+ del a["handshake_flow"]["id"]
+ del b["handshake_flow"]["id"]
+ assert a == b
+ assert not f == f2
+ assert f is not f2
+
+ assert f.client_key == f2.client_key
+ assert f.client_protocol == f2.client_protocol
+ assert f.client_extensions == f2.client_extensions
+ assert f.server_accept == f2.server_accept
+ assert f.server_protocol == f2.server_protocol
+ assert f.server_extensions == f2.server_extensions
+ assert f.messages is not f2.messages
+ assert f.handshake_flow is not f2.handshake_flow
+
+ for m in f.messages:
+ m2 = m.copy()
+ m2.set_state(m2.get_state())
+ assert m is not m2
+ assert m.get_state() == m2.get_state()
+
+ f = tflow.twebsocketflow(err=True)
+ f2 = f.copy()
+ assert f is not f2
+ assert f.handshake_flow is not f2.handshake_flow
+ assert f.error.get_state() == f2.error.get_state()
+ assert f.error is not f2.error
+
+ def test_match(self):
+ f = tflow.twebsocketflow()
+ assert not flowfilter.match("~b nonexistent", f)
+ assert flowfilter.match(None, f)
+ assert not flowfilter.match("~b nonexistent", f)
+
+ f = tflow.twebsocketflow(err=True)
+ assert flowfilter.match("~e", f)
+
+ with pytest.raises(ValueError):
+ flowfilter.match("~", f)
+
+ def test_repr(self):
+ f = tflow.twebsocketflow()
+ assert f.message_info(f.messages[0])
+ assert 'WebSocketFlow' in repr(f)
+ assert 'binary message: ' in repr(f.messages[0])
+ assert 'text message: ' in repr(f.messages[1])