diff options
| -rw-r--r-- | mitmproxy/filt.py | 3 | ||||
| -rw-r--r-- | test/mitmproxy/test_filt.py | 24 | ||||
| -rw-r--r-- | test/mitmproxy/tutils.py | 28 | 
3 files changed, 37 insertions, 18 deletions
diff --git a/mitmproxy/filt.py b/mitmproxy/filt.py index 51e9cc30..f67698e5 100644 --- a/mitmproxy/filt.py +++ b/mitmproxy/filt.py @@ -76,7 +76,6 @@ class FErr(_Action):      code = "e"      help = "Match error" -    @only(HTTPFlow, TCPFlow)      def __call__(self, f):          return True if f.error else False @@ -326,7 +325,6 @@ class FSrc(_Rex):      help = "Match source address"      is_binary = False -    @only(HTTPFlow, TCPFlow)      def __call__(self, f):          return f.client_conn.address and self.re.search(repr(f.client_conn.address)) @@ -336,7 +334,6 @@ class FDst(_Rex):      help = "Match destination address"      is_binary = False -    @only(HTTPFlow, TCPFlow)      def __call__(self, f):          return f.server_conn.address and self.re.search(repr(f.server_conn.address)) diff --git a/test/mitmproxy/test_filt.py b/test/mitmproxy/test_filt.py index 92a48cd2..69f042bb 100644 --- a/test/mitmproxy/test_filt.py +++ b/test/mitmproxy/test_filt.py @@ -2,7 +2,6 @@ from six.moves import cStringIO as StringIO  from mock import patch  from mitmproxy import filt -from mitmproxy.models.flow import Flow  from . import tutils @@ -379,23 +378,21 @@ class TestMatchingTCPFlow:          assert not self.q("~u whatever", f) -class DummyFlow(Flow): -    """ A flow that is neither HTTP nor TCP. """ - -    def __init__(self): -        pass - -  class TestMatchingDummyFlow:      def flow(self): -        return DummyFlow() +        return tutils.tdummyflow() + +    def err(self): +        return tutils.tdummyflow(err=True)      def q(self, q, o):          return filt.parse(q)(o)      def test_filters(self): +        e = self.err()          f = self.flow() +        f.server_conn = tutils.tserver_conn()          assert not self.q("~a", f) @@ -403,12 +400,14 @@ class TestMatchingDummyFlow:          assert not self.q("~bq whatever", f)          assert not self.q("~bs whatever", f) -        assert not self.q("~dst whatever", f) -          assert not self.q("~c 0", f)          assert not self.q("~d whatever", f) +        assert self.q("~dst address", f) +        assert not self.q("~dst nonexistent", f) + +        assert self.q("~e", e)          assert not self.q("~e", f)          assert not self.q("~http", f) @@ -421,7 +420,8 @@ class TestMatchingDummyFlow:          assert not self.q("~s", f) -        assert not self.q("~src whatever", f) +        assert self.q("~src address", f) +        assert not self.q("~src nonexistent", f)          assert not self.q("~tcp", f) diff --git a/test/mitmproxy/tutils.py b/test/mitmproxy/tutils.py index d0a09035..d743a9e6 100644 --- a/test/mitmproxy/tutils.py +++ b/test/mitmproxy/tutils.py @@ -4,18 +4,19 @@ import tempfile  import argparse  import sys -from mitmproxy.models.tcp import TCPMessage -from six.moves import cStringIO as StringIO  from contextlib import contextmanager -  from unittest.case import SkipTest +from six.moves import cStringIO as StringIO +  import netlib.utils  import netlib.tutils  from mitmproxy import controller  from mitmproxy.models import (      ClientConnection, ServerConnection, Error, HTTPRequest, HTTPResponse, HTTPFlow, TCPFlow  ) +from mitmproxy.models.tcp import TCPMessage +from mitmproxy.models.flow import Flow  def _skip_windows(*args): @@ -47,6 +48,27 @@ def skip_appveyor(fn):          return fn +class DummyFlow(Flow): +    """A flow that is neither HTTP nor TCP.""" + +    def __init__(self, client_conn, server_conn, live=None): +        super(DummyFlow, self).__init__("dummy", client_conn, server_conn, live) + + +def tdummyflow(client_conn=True, server_conn=True, err=None): +    if client_conn is True: +        client_conn = tclient_conn() +    if server_conn is True: +        server_conn = tserver_conn() +    if err is True: +        err = terr() + +    f = DummyFlow(client_conn, server_conn) +    f.error = err +    f.reply = controller.DummyReply() +    return f + +  def ttcpflow(client_conn=True, server_conn=True, messages=True, err=None):      if client_conn is True:          client_conn = tclient_conn()  | 
