From d17309eda8f3cf5a0a701eb4dd0ce4378655be16 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Fri, 24 Feb 2017 17:53:08 +0100 Subject: flowfilter: coverage++ --- mitmproxy/flowfilter.py | 1 + mitmproxy/test/tflow.py | 3 +- setup.cfg | 2 - test/mitmproxy/addons/test_dumper.py | 2 +- test/mitmproxy/test_eventsequence.py | 2 + test/mitmproxy/test_flowfilter.py | 99 ++++++++++++++++++++++++++++++++++++ 6 files changed, 105 insertions(+), 4 deletions(-) diff --git a/mitmproxy/flowfilter.py b/mitmproxy/flowfilter.py index d4c0b3ea..7c4f95f7 100644 --- a/mitmproxy/flowfilter.py +++ b/mitmproxy/flowfilter.py @@ -431,6 +431,7 @@ filter_unary = [ FReq, FResp, FTCP, + FWebSocket, ] filter_rex = [ FBod, diff --git a/mitmproxy/test/tflow.py b/mitmproxy/test/tflow.py index 4454420b..f30d8b6f 100644 --- a/mitmproxy/test/tflow.py +++ b/mitmproxy/test/tflow.py @@ -72,7 +72,8 @@ def twebsocketflow(client_conn=True, server_conn=True, messages=True, err=None, if messages is True: messages = [ websocket.WebSocketMessage(websockets.OPCODE.BINARY, True, b"hello binary"), - websocket.WebSocketMessage(websockets.OPCODE.TEXT, False, "hello text".encode()), + websocket.WebSocketMessage(websockets.OPCODE.TEXT, True, "hello text".encode()), + websocket.WebSocketMessage(websockets.OPCODE.TEXT, False, "it's me".encode()), ] if err is True: err = terr() diff --git a/setup.cfg b/setup.cfg index 52f51f26..6ae4e0a7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -39,7 +39,6 @@ exclude = mitmproxy/controller.py mitmproxy/export.py mitmproxy/flow.py - mitmproxy/flowfilter.py mitmproxy/http.py mitmproxy/io_compat.py mitmproxy/master.py @@ -64,7 +63,6 @@ exclude = mitmproxy/exceptions.py mitmproxy/export.py mitmproxy/flow.py - mitmproxy/flowfilter.py mitmproxy/http.py mitmproxy/io.py mitmproxy/io_compat.py diff --git a/test/mitmproxy/addons/test_dumper.py b/test/mitmproxy/addons/test_dumper.py index 75b994fa..22d2c2c6 100644 --- a/test/mitmproxy/addons/test_dumper.py +++ b/test/mitmproxy/addons/test_dumper.py @@ -176,7 +176,7 @@ def test_websocket(): ctx.configure(d, flow_detail=3, showhost=True) f = tflow.twebsocketflow() d.websocket_message(f) - assert "hello text" in sio.getvalue() + assert "it's me" in sio.getvalue() sio.truncate(0) d.websocket_end(f) diff --git a/test/mitmproxy/test_eventsequence.py b/test/mitmproxy/test_eventsequence.py index fe0f92b3..871d4b9d 100644 --- a/test/mitmproxy/test_eventsequence.py +++ b/test/mitmproxy/test_eventsequence.py @@ -32,6 +32,8 @@ def test_websocket_flow(err): assert len(f.messages) == 1 assert next(i) == ("websocket_message", f) assert len(f.messages) == 2 + assert next(i) == ("websocket_message", f) + assert len(f.messages) == 3 if err: assert next(i) == ("websocket_error", f) assert next(i) == ("websocket_end", f) diff --git a/test/mitmproxy/test_flowfilter.py b/test/mitmproxy/test_flowfilter.py index 646c210f..46fff477 100644 --- a/test/mitmproxy/test_flowfilter.py +++ b/test/mitmproxy/test_flowfilter.py @@ -1,4 +1,5 @@ import io +import pytest from unittest.mock import patch from mitmproxy.test import tflow @@ -134,6 +135,12 @@ class TestMatchingHTTPFlow: e = self.err() assert self.q("~e", e) + def test_fmarked(self): + q = self.req() + assert not self.q("~marked", q) + q.marked = True + assert self.q("~marked", q) + def test_head(self): q = self.req() s = self.resp() @@ -279,6 +286,7 @@ class TestMatchingTCPFlow: f = self.flow() assert self.q("~tcp", f) assert not self.q("~http", f) + assert not self.q("~websocket", f) def test_ferr(self): e = self.err() @@ -388,6 +396,87 @@ class TestMatchingTCPFlow: assert not self.q("~u whatever", f) +class TestMatchingWebSocketFlow: + + def flow(self): + return tflow.twebsocketflow() + + def err(self): + return tflow.twebsocketflow(err=True) + + def q(self, q, o): + return flowfilter.parse(q)(o) + + def test_websocket(self): + f = self.flow() + assert self.q("~websocket", f) + assert not self.q("~tcp", f) + assert not self.q("~http", f) + + def test_ferr(self): + e = self.err() + assert self.q("~e", e) + + def test_body(self): + f = self.flow() + + # Messages sent by client or server + assert self.q("~b hello", f) + assert self.q("~b me", f) + assert not self.q("~b nonexistent", f) + + # Messages sent by client + assert self.q("~bq hello", f) + assert not self.q("~bq me", f) + assert not self.q("~bq nonexistent", f) + + # Messages sent by server + assert self.q("~bs me", f) + assert not self.q("~bs hello", f) + assert not self.q("~bs nonexistent", f) + + def test_src(self): + f = self.flow() + assert self.q("~src address", f) + assert not self.q("~src foobar", f) + assert self.q("~src :22", f) + assert not self.q("~src :99", f) + assert self.q("~src address:22", f) + + def test_dst(self): + f = self.flow() + f.server_conn = tflow.tserver_conn() + assert self.q("~dst address", f) + assert not self.q("~dst foobar", f) + assert self.q("~dst :22", f) + assert not self.q("~dst :99", f) + assert self.q("~dst address:22", f) + + def test_and(self): + f = self.flow() + f.server_conn = tflow.tserver_conn() + assert self.q("~b hello & ~b me", f) + assert not self.q("~src wrongaddress & ~b hello", f) + assert self.q("(~src :22 & ~dst :22) & ~b hello", f) + assert not self.q("(~src address:22 & ~dst :22) & ~b nonexistent", f) + assert not self.q("(~src address:22 & ~dst :99) & ~b hello", f) + + def test_or(self): + f = self.flow() + f.server_conn = tflow.tserver_conn() + assert self.q("~b hello | ~b me", f) + assert self.q("~src :22 | ~b me", f) + assert not self.q("~src :99 | ~dst :99", f) + assert self.q("(~src :22 | ~dst :22) | ~b me", f) + + def test_not(self): + f = self.flow() + assert not self.q("! ~src :22", f) + assert self.q("! ~src :99", f) + assert self.q("!~src :99 !~src :99", f) + assert not self.q("!~src :99 !~src :22", f) + + class TestMatchingDummyFlow: def flow(self): @@ -421,6 +510,8 @@ class TestMatchingDummyFlow: assert not self.q("~e", f) assert not self.q("~http", f) + assert not self.q("~tcp", f) + assert not self.q("~websocket", f) assert not self.q("~h whatever", f) assert not self.q("~hq whatever", f) @@ -450,3 +541,11 @@ def test_pyparsing_bug(extract_tb): # The text is a string with leading and trailing whitespace stripped; if the source is not available it is None. extract_tb.return_value = [("", 1, "test", None)] assert flowfilter.parse("test") + + +def test_match(): + with pytest.raises(ValueError): + flowfilter.match('[foobar', None) + + assert flowfilter.match(None, None) + assert not flowfilter.match('foobar', None) -- cgit v1.2.3