diff options
| -rw-r--r-- | mitmproxy/flowfilter.py | 112 | 
1 files changed, 48 insertions, 64 deletions
| diff --git a/mitmproxy/flowfilter.py b/mitmproxy/flowfilter.py index 3f5afb48..b222d2a8 100644 --- a/mitmproxy/flowfilter.py +++ b/mitmproxy/flowfilter.py @@ -32,19 +32,17 @@          rex         Equivalent to ~u rex  """ +import functools  import re  import sys -import functools +from typing import Callable, ClassVar, Optional, Sequence, Type + +import pyparsing as pp +from mitmproxy import flow  from mitmproxy import http -from mitmproxy import websocket  from mitmproxy import tcp -from mitmproxy import flow - -from mitmproxy.utils import strutils - -import pyparsing as pp -from typing import Callable, Sequence, Type, Optional, ClassVar +from mitmproxy import websocket  def only(*types): @@ -54,7 +52,9 @@ def only(*types):              if isinstance(flow, types):                  return fn(self, flow)              return False +          return filter_types +      return decorator @@ -146,10 +146,10 @@ class _Rex(_Action):      def __init__(self, expr):          self.expr = expr          if self.is_binary: -            expr = strutils.escaped_str_to_bytes(expr) +            expr = expr.encode()          try:              self.re = re.compile(expr, self.flags) -        except: +        except Exception:              raise ValueError("Cannot compile expression.") @@ -336,6 +336,7 @@ class FUrl(_Rex):      code = "u"      help = "URL"      is_binary = False +      # FUrl is special, because it can be "naked".      @classmethod @@ -469,68 +470,51 @@ def _make():      # Order is important - multi-char expressions need to come before narrow      # ones.      parts = [] -    for klass in filter_unary: -        f = pp.Literal("~%s" % klass.code) + pp.WordEnd() -        f.setParseAction(klass.make) +    for cls in filter_unary: +        f = pp.Literal(f"~{cls.code}") + pp.WordEnd() +        f.setParseAction(cls.make)          parts.append(f) -    simplerex = "".join(c for c in pp.printables if c not in "()~'\"") -    alphdevanagari = pp.pyparsing_unicode.Devanagari.alphas -    alphcyrillic = pp.pyparsing_unicode.Cyrillic.alphas -    alphgreek = pp.pyparsing_unicode.Greek.alphas -    alphchinese = pp.pyparsing_unicode.Chinese.alphas -    alpharabic = pp.pyparsing_unicode.Arabic.alphas -    alphhebrew = pp.pyparsing_unicode.Hebrew.alphas -    alphjapanese = pp.pyparsing_unicode.Japanese.alphas -    alphkorean = pp.pyparsing_unicode.Korean.alphas -    alphlatin1 = pp.pyparsing_unicode.Latin1.alphas -    alphlatinA = pp.pyparsing_unicode.LatinA.alphas -    alphlatinB = pp.pyparsing_unicode.LatinB.alphas - -    rex = pp.Word(simplerex) |\ -        pp.Word(alphcyrillic) |\ -        pp.Word(alphgreek) |\ -        pp.Word(alphchinese) |\ -        pp.Word(alpharabic) |\ -        pp.Word(alphdevanagari) |\ -        pp.Word(alphhebrew) |\ -        pp.Word(alphjapanese) |\ -        pp.Word(alphkorean) |\ -        pp.Word(alphlatin1) |\ -        pp.Word(alphlatinA) |\ -        pp.Word(alphlatinB) |\ -        pp.QuotedString("\"", escChar='\\') |\ -        pp.QuotedString("'", escChar='\\') -    for klass in filter_rex: -        f = pp.Literal("~%s" % klass.code) + pp.WordEnd() + rex.copy() -        f.setParseAction(klass.make) +    # This is a bit of a hack to simulate Word(pyparsing_unicode.printables), +    # which has a horrible performance with len(pyparsing.pyparsing_unicode.printables) == 1114060 +    unicode_words = pp.CharsNotIn("()~'\"" + pp.ParserElement.DEFAULT_WHITE_CHARS) +    unicode_words.skipWhitespace = True +    regex = ( +            unicode_words +            | pp.QuotedString('"', escChar='\\') +            | pp.QuotedString("'", escChar='\\') +    ) +    for cls in filter_rex: +        f = pp.Literal(f"~{cls.code}") + pp.WordEnd() + regex.copy() +        f.setParseAction(cls.make)          parts.append(f) -    for klass in filter_int: -        f = pp.Literal("~%s" % klass.code) + pp.WordEnd() + pp.Word(pp.nums) -        f.setParseAction(klass.make) +    for cls in filter_int: +        f = pp.Literal(f"~{cls.code}") + pp.WordEnd() + pp.Word(pp.nums) +        f.setParseAction(cls.make)          parts.append(f)      # A naked rex is a URL rex: -    f = rex.copy() +    f = regex.copy()      f.setParseAction(FUrl.make)      parts.append(f)      atom = pp.MatchFirst(parts) -    expr = pp.operatorPrecedence(atom, -                                 [(pp.Literal("!").suppress(), -                                   1, -                                   pp.opAssoc.RIGHT, -                                   lambda x: FNot(*x)), -                                     (pp.Literal("&").suppress(), -                                      2, -                                      pp.opAssoc.LEFT, -                                      lambda x: FAnd(*x)), -                                     (pp.Literal("|").suppress(), -                                      2, -                                      pp.opAssoc.LEFT, -                                      lambda x: FOr(*x)), -                                  ]) +    expr = pp.infixNotation( +        atom, +        [(pp.Literal("!").suppress(), +          1, +          pp.opAssoc.RIGHT, +          lambda x: FNot(*x)), +         (pp.Literal("&").suppress(), +          2, +          pp.opAssoc.LEFT, +          lambda x: FAnd(*x)), +         (pp.Literal("|").suppress(), +          2, +          pp.opAssoc.LEFT, +          lambda x: FOr(*x)), +         ])      expr = pp.OneOrMore(expr)      return expr.setParseAction(lambda x: FAnd(x) if len(x) != 1 else x) @@ -570,15 +554,15 @@ def match(flt, flow):  help = []  for a in filter_unary:      help.append( -        ("~%s" % a.code, a.help) +        (f"~{a.code}", a.help)      )  for b in filter_rex:      help.append( -        ("~%s regex" % b.code, b.help) +        (f"~{b.code} regex", b.help)      )  for c in filter_int:      help.append( -        ("~%s int" % c.code, c.help) +        (f"~{c.code} int", c.help)      )  help.sort()  help.extend( | 
