From 6b22ca7a32252b5c5963704f6276eeac390a3622 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Wed, 15 Feb 2017 18:52:32 +0100 Subject: add individual-coverage check --- test/individual_coverage.py | 82 ++++++++++++++++++++++++++++ test/mitmproxy/test_flow.py | 112 --------------------------------------- test/mitmproxy/test_tcp.py | 60 ++++++++++++++++++++- test/mitmproxy/test_websocket.py | 63 +++++++++++++++++++++- 4 files changed, 203 insertions(+), 114 deletions(-) create mode 100644 test/individual_coverage.py (limited to 'test') diff --git a/test/individual_coverage.py b/test/individual_coverage.py new file mode 100644 index 00000000..35bcd27f --- /dev/null +++ b/test/individual_coverage.py @@ -0,0 +1,82 @@ +import io +import contextlib +import os +import sys +import glob +import multiprocessing +import configparser +import itertools +import pytest + + +def run_tests(src, test, fail): + stderr = io.StringIO() + stdout = io.StringIO() + with contextlib.redirect_stderr(stderr): + with contextlib.redirect_stdout(stdout): + e = pytest.main([ + '-qq', + '--disable-pytest-warnings', + '--no-faulthandler', + '--cov', src.replace('.py', '').replace('/', '.'), + '--cov-fail-under', '100', + '--cov-report', 'term-missing:skip-covered', + test + ]) + + if e == 0: + if fail: + print("SUCCESS but should have FAILED:", src, "Please remove this file from setup.cfg tool:individual_coverage/exclude.") + e = 42 + else: + print("SUCCESS:", src) + else: + if fail: + print("Ignoring fail:", src) + e = 0 + else: + cov = [l for l in stdout.getvalue().split("\n") if (src in l) or ("was never imported" in l)] + if len(cov) == 1: + print("FAIL:", cov[0]) + else: + print("FAIL:", src, test, stdout.getvalue(), stdout.getvalue()) + print(stderr.getvalue()) + print(stdout.getvalue()) + + sys.exit(e) + + +def start_pytest(src, test, fail): + # run pytest in a new process, otherwise imports and modules might conflict + proc = multiprocessing.Process(target=run_tests, args=(src, test, fail)) + proc.start() + proc.join() + return (src, test, proc.exitcode) + + +def main(): + c = configparser.ConfigParser() + c.read('setup.cfg') + fs = c['tool:individual_coverage']['exclude'].strip().split('\n') + no_individual_cov = [f.strip() for f in fs] + + excluded = ['mitmproxy/contrib/', 'mitmproxy/test/', 'mitmproxy/tools/', 'mitmproxy/platform/'] + src_files = glob.glob('mitmproxy/**/*.py', recursive=True) + glob.glob('pathod/**/*.py', recursive=True) + src_files = [f for f in src_files if os.path.basename(f) != '__init__.py'] + src_files = [f for f in src_files if not any(os.path.normpath(p) in f for p in excluded)] + + ps = [] + for src in sorted(src_files): + test = os.path.join("test", os.path.dirname(src), "test_" + os.path.basename(src)) + if os.path.isfile(test): + ps.append((src, test, src in no_individual_cov)) + + result = list(itertools.starmap(start_pytest, ps)) + + if any(e != 0 for _, _, e in result): + sys.exit(1) + pass + + +if __name__ == '__main__': + main() diff --git a/test/mitmproxy/test_flow.py b/test/mitmproxy/test_flow.py index 65e6845f..a78e5f80 100644 --- a/test/mitmproxy/test_flow.py +++ b/test/mitmproxy/test_flow.py @@ -10,7 +10,6 @@ from mitmproxy.exceptions import FlowReadException, Kill from mitmproxy import flow from mitmproxy import http from mitmproxy import connections -from mitmproxy import tcp from mitmproxy.proxy import ProxyConfig from mitmproxy.proxy.server import DummyServer from mitmproxy import master @@ -157,117 +156,6 @@ class TestHTTPFlow: assert f.response.raw_content == b"abarb" -class TestWebSocketFlow: - - def test_copy(self): - f = tflow.twebsocketflow() - f.get_state() - f2 = f.copy() - a = f.get_state() - b = f2.get_state() - del a["id"] - del b["id"] - del a["handshake_flow"]["id"] - del b["handshake_flow"]["id"] - assert a == b - assert not f == f2 - assert f is not f2 - - assert f.client_key == f2.client_key - assert f.client_protocol == f2.client_protocol - assert f.client_extensions == f2.client_extensions - assert f.server_accept == f2.server_accept - assert f.server_protocol == f2.server_protocol - assert f.server_extensions == f2.server_extensions - assert f.messages is not f2.messages - assert f.handshake_flow is not f2.handshake_flow - - for m in f.messages: - m2 = m.copy() - m2.set_state(m2.get_state()) - assert m is not m2 - assert m.get_state() == m2.get_state() - - f = tflow.twebsocketflow(err=True) - f2 = f.copy() - assert f is not f2 - assert f.handshake_flow is not f2.handshake_flow - assert f.error.get_state() == f2.error.get_state() - assert f.error is not f2.error - - def test_match(self): - f = tflow.twebsocketflow() - assert not flowfilter.match("~b nonexistent", f) - assert flowfilter.match(None, f) - assert not flowfilter.match("~b nonexistent", f) - - f = tflow.twebsocketflow(err=True) - assert flowfilter.match("~e", f) - - with pytest.raises(ValueError): - flowfilter.match("~", f) - - def test_repr(self): - f = tflow.twebsocketflow() - assert 'WebSocketFlow' in repr(f) - assert 'binary message: ' in repr(f.messages[0]) - assert 'text message: ' in repr(f.messages[1]) - - -class TestTCPFlow: - - def test_copy(self): - f = tflow.ttcpflow() - f.get_state() - f2 = f.copy() - a = f.get_state() - b = f2.get_state() - del a["id"] - del b["id"] - assert a == b - assert not f == f2 - assert f is not f2 - - assert f.messages is not f2.messages - - for m in f.messages: - assert m.get_state() - m2 = m.copy() - assert not m == m2 - assert m is not m2 - - a = m.get_state() - b = m2.get_state() - assert a == b - - m = tcp.TCPMessage(False, 'foo') - m.set_state(f.messages[0].get_state()) - assert m.timestamp == f.messages[0].timestamp - - f = tflow.ttcpflow(err=True) - f2 = f.copy() - assert f is not f2 - assert f.error.get_state() == f2.error.get_state() - assert f.error is not f2.error - - def test_match(self): - f = tflow.ttcpflow() - assert not flowfilter.match("~b nonexistent", f) - assert flowfilter.match(None, f) - assert not flowfilter.match("~b nonexistent", f) - - f = tflow.ttcpflow(err=True) - assert flowfilter.match("~e", f) - - with pytest.raises(ValueError): - flowfilter.match("~", f) - - def test_repr(self): - f = tflow.ttcpflow() - assert 'TCPFlow' in repr(f) - assert '-> ' in repr(f.messages[0]) - - class TestSerialize: def _treader(self): diff --git a/test/mitmproxy/test_tcp.py b/test/mitmproxy/test_tcp.py index 777ab4dd..dce6493c 100644 --- a/test/mitmproxy/test_tcp.py +++ b/test/mitmproxy/test_tcp.py @@ -1 +1,59 @@ -# TODO: write tests +import pytest + +from mitmproxy import tcp +from mitmproxy import flowfilter +from mitmproxy.test import tflow + + +class TestTCPFlow: + + def test_copy(self): + f = tflow.ttcpflow() + f.get_state() + f2 = f.copy() + a = f.get_state() + b = f2.get_state() + del a["id"] + del b["id"] + assert a == b + assert not f == f2 + assert f is not f2 + + assert f.messages is not f2.messages + + for m in f.messages: + assert m.get_state() + m2 = m.copy() + assert not m == m2 + assert m is not m2 + + a = m.get_state() + b = m2.get_state() + assert a == b + + m = tcp.TCPMessage(False, 'foo') + m.set_state(f.messages[0].get_state()) + assert m.timestamp == f.messages[0].timestamp + + f = tflow.ttcpflow(err=True) + f2 = f.copy() + assert f is not f2 + assert f.error.get_state() == f2.error.get_state() + assert f.error is not f2.error + + def test_match(self): + f = tflow.ttcpflow() + assert not flowfilter.match("~b nonexistent", f) + assert flowfilter.match(None, f) + assert not flowfilter.match("~b nonexistent", f) + + f = tflow.ttcpflow(err=True) + assert flowfilter.match("~e", f) + + with pytest.raises(ValueError): + flowfilter.match("~", f) + + def test_repr(self): + f = tflow.ttcpflow() + assert 'TCPFlow' in repr(f) + assert '-> ' in repr(f.messages[0]) diff --git a/test/mitmproxy/test_websocket.py b/test/mitmproxy/test_websocket.py index 777ab4dd..f2963390 100644 --- a/test/mitmproxy/test_websocket.py +++ b/test/mitmproxy/test_websocket.py @@ -1 +1,62 @@ -# TODO: write tests +import pytest + +from mitmproxy import flowfilter +from mitmproxy.test import tflow + + +class TestWebSocketFlow: + + def test_copy(self): + f = tflow.twebsocketflow() + f.get_state() + f2 = f.copy() + a = f.get_state() + b = f2.get_state() + del a["id"] + del b["id"] + del a["handshake_flow"]["id"] + del b["handshake_flow"]["id"] + assert a == b + assert not f == f2 + assert f is not f2 + + assert f.client_key == f2.client_key + assert f.client_protocol == f2.client_protocol + assert f.client_extensions == f2.client_extensions + assert f.server_accept == f2.server_accept + assert f.server_protocol == f2.server_protocol + assert f.server_extensions == f2.server_extensions + assert f.messages is not f2.messages + assert f.handshake_flow is not f2.handshake_flow + + for m in f.messages: + m2 = m.copy() + m2.set_state(m2.get_state()) + assert m is not m2 + assert m.get_state() == m2.get_state() + + f = tflow.twebsocketflow(err=True) + f2 = f.copy() + assert f is not f2 + assert f.handshake_flow is not f2.handshake_flow + assert f.error.get_state() == f2.error.get_state() + assert f.error is not f2.error + + def test_match(self): + f = tflow.twebsocketflow() + assert not flowfilter.match("~b nonexistent", f) + assert flowfilter.match(None, f) + assert not flowfilter.match("~b nonexistent", f) + + f = tflow.twebsocketflow(err=True) + assert flowfilter.match("~e", f) + + with pytest.raises(ValueError): + flowfilter.match("~", f) + + def test_repr(self): + f = tflow.twebsocketflow() + assert f.message_info(f.messages[0]) + assert 'WebSocketFlow' in repr(f) + assert 'binary message: ' in repr(f.messages[0]) + assert 'text message: ' in repr(f.messages[1]) -- cgit v1.2.3