aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy/filt.py
diff options
context:
space:
mode:
Diffstat (limited to 'libmproxy/filt.py')
-rw-r--r--libmproxy/filt.py428
1 files changed, 0 insertions, 428 deletions
diff --git a/libmproxy/filt.py b/libmproxy/filt.py
deleted file mode 100644
index aa62b717..00000000
--- a/libmproxy/filt.py
+++ /dev/null
@@ -1,428 +0,0 @@
-"""
- The following operators are understood:
-
- ~q Request
- ~s Response
-
- Headers:
-
- Patterns are matched against "name: value" strings. Field names are
- all-lowercase.
-
- ~a Asset content-type in response. Asset content types are:
- text/javascript
- application/x-javascript
- application/javascript
- text/css
- image/*
- application/x-shockwave-flash
- ~h rex Header line in either request or response
- ~hq rex Header in request
- ~hs rex Header in response
-
- ~b rex Expression in the body of either request or response
- ~bq rex Expression in the body of request
- ~bq rex Expression in the body of response
- ~t rex Shortcut for content-type header.
-
- ~d rex Request domain
- ~m rex Method
- ~u rex URL
- ~c CODE Response code.
- rex Equivalent to ~u rex
-"""
-from __future__ import absolute_import
-import re
-import sys
-import pyparsing as pp
-
-
-class _Token:
-
- def dump(self, indent=0, fp=sys.stdout):
- print >> fp, "\t" * indent, self.__class__.__name__,
- if hasattr(self, "expr"):
- print >> fp, "(%s)" % self.expr,
- print >> fp
-
-
-class _Action(_Token):
-
- @classmethod
- def make(klass, s, loc, toks):
- return klass(*toks[1:])
-
-
-class FErr(_Action):
- code = "e"
- help = "Match error"
-
- def __call__(self, f):
- return True if f.error else False
-
-
-class FReq(_Action):
- code = "q"
- help = "Match request with no response"
-
- def __call__(self, f):
- if not f.response:
- return True
-
-
-class FResp(_Action):
- code = "s"
- help = "Match response"
-
- def __call__(self, f):
- return True if f.response else False
-
-
-class _Rex(_Action):
- flags = 0
-
- def __init__(self, expr):
- self.expr = expr
- try:
- self.re = re.compile(self.expr, self.flags)
- except:
- raise ValueError("Cannot compile expression.")
-
-
-def _check_content_type(expr, o):
- val = o.headers.get("content-type")
- if val and re.search(expr, val):
- return True
- return False
-
-
-class FAsset(_Action):
- code = "a"
- help = "Match asset in response: CSS, Javascript, Flash, images."
- ASSET_TYPES = [
- "text/javascript",
- "application/x-javascript",
- "application/javascript",
- "text/css",
- "image/.*",
- "application/x-shockwave-flash"
- ]
-
- def __call__(self, f):
- if f.response:
- for i in self.ASSET_TYPES:
- if _check_content_type(i, f.response):
- return True
- return False
-
-
-class FContentType(_Rex):
- code = "t"
- help = "Content-type header"
-
- def __call__(self, f):
- if _check_content_type(self.expr, f.request):
- return True
- elif f.response and _check_content_type(self.expr, f.response):
- return True
- return False
-
-
-class FRequestContentType(_Rex):
- code = "tq"
- help = "Request Content-Type header"
-
- def __call__(self, f):
- return _check_content_type(self.expr, f.request)
-
-
-class FResponseContentType(_Rex):
- code = "ts"
- help = "Response Content-Type header"
-
- def __call__(self, f):
- if f.response:
- return _check_content_type(self.expr, f.response)
- return False
-
-
-class FHead(_Rex):
- code = "h"
- help = "Header"
- flags = re.MULTILINE
-
- def __call__(self, f):
- if f.request and self.re.search(str(f.request.headers)):
- return True
- if f.response and self.re.search(str(f.response.headers)):
- return True
- return False
-
-
-class FHeadRequest(_Rex):
- code = "hq"
- help = "Request header"
- flags = re.MULTILINE
-
- def __call__(self, f):
- if f.request and self.re.search(str(f.request.headers)):
- return True
-
-
-class FHeadResponse(_Rex):
- code = "hs"
- help = "Response header"
- flags = re.MULTILINE
-
- def __call__(self, f):
- if f.response and self.re.search(str(f.response.headers)):
- return True
-
-
-class FBod(_Rex):
- code = "b"
- help = "Body"
-
- def __call__(self, f):
- if f.request and f.request.content:
- if self.re.search(f.request.get_decoded_content()):
- return True
- if f.response and f.response.content:
- if self.re.search(f.response.get_decoded_content()):
- return True
- return False
-
-
-class FBodRequest(_Rex):
- code = "bq"
- help = "Request body"
-
- def __call__(self, f):
- if f.request and f.request.content:
- if self.re.search(f.request.get_decoded_content()):
- return True
-
-
-class FBodResponse(_Rex):
- code = "bs"
- help = "Response body"
-
- def __call__(self, f):
- if f.response and f.response.content:
- if self.re.search(f.response.get_decoded_content()):
- return True
-
-
-class FMethod(_Rex):
- code = "m"
- help = "Method"
- flags = re.IGNORECASE
-
- def __call__(self, f):
- return bool(self.re.search(f.request.method))
-
-
-class FDomain(_Rex):
- code = "d"
- help = "Domain"
- flags = re.IGNORECASE
-
- def __call__(self, f):
- return bool(self.re.search(f.request.host))
-
-
-class FUrl(_Rex):
- code = "u"
- help = "URL"
- # FUrl is special, because it can be "naked".
-
- @classmethod
- def make(klass, s, loc, toks):
- if len(toks) > 1:
- toks = toks[1:]
- return klass(*toks)
-
- def __call__(self, f):
- return self.re.search(f.request.url)
-
-
-class FSrc(_Rex):
- code = "src"
- help = "Match source address"
-
- def __call__(self, f):
- return f.client_conn.address and self.re.search(repr(f.client_conn.address))
-
-
-class FDst(_Rex):
- code = "dst"
- help = "Match destination address"
-
- def __call__(self, f):
- return f.server_conn.address and self.re.search(repr(f.server_conn.address))
-
-
-class _Int(_Action):
-
- def __init__(self, num):
- self.num = int(num)
-
-
-class FCode(_Int):
- code = "c"
- help = "HTTP response code"
-
- def __call__(self, f):
- if f.response and f.response.status_code == self.num:
- return True
-
-
-class FAnd(_Token):
-
- def __init__(self, lst):
- self.lst = lst
-
- def dump(self, indent=0, fp=sys.stdout):
- print >> fp, "\t" * indent, self.__class__.__name__
- for i in self.lst:
- i.dump(indent + 1, fp)
-
- def __call__(self, f):
- return all(i(f) for i in self.lst)
-
-
-class FOr(_Token):
-
- def __init__(self, lst):
- self.lst = lst
-
- def dump(self, indent=0, fp=sys.stdout):
- print >> fp, "\t" * indent, self.__class__.__name__
- for i in self.lst:
- i.dump(indent + 1, fp)
-
- def __call__(self, f):
- return any(i(f) for i in self.lst)
-
-
-class FNot(_Token):
-
- def __init__(self, itm):
- self.itm = itm[0]
-
- def dump(self, indent=0, fp=sys.stdout):
- print >> fp, "\t" * indent, self.__class__.__name__
- self.itm.dump(indent + 1, fp)
-
- def __call__(self, f):
- return not self.itm(f)
-
-
-filt_unary = [
- FReq,
- FResp,
- FAsset,
- FErr
-]
-filt_rex = [
- FHeadRequest,
- FHeadResponse,
- FHead,
- FBodRequest,
- FBodResponse,
- FBod,
- FMethod,
- FDomain,
- FUrl,
- FRequestContentType,
- FResponseContentType,
- FContentType,
- FSrc,
- FDst,
-]
-filt_int = [
- FCode
-]
-
-
-def _make():
- # Order is important - multi-char expressions need to come before narrow
- # ones.
- parts = []
- for klass in filt_unary:
- f = pp.Literal("~%s" % klass.code) + pp.WordEnd()
- f.setParseAction(klass.make)
- parts.append(f)
-
- simplerex = "".join(c for c in pp.printables if c not in "()~'\"")
- rex = pp.Word(simplerex) |\
- pp.QuotedString("\"", escChar='\\') |\
- pp.QuotedString("'", escChar='\\')
- for klass in filt_rex:
- f = pp.Literal("~%s" % klass.code) + pp.WordEnd() + rex.copy()
- f.setParseAction(klass.make)
- parts.append(f)
-
- for klass in filt_int:
- f = pp.Literal("~%s" % klass.code) + pp.WordEnd() + pp.Word(pp.nums)
- f.setParseAction(klass.make)
- parts.append(f)
-
- # A naked rex is a URL rex:
- f = rex.copy()
- f.setParseAction(FUrl.make)
- parts.append(f)
-
- atom = pp.MatchFirst(parts)
- expr = pp.operatorPrecedence(atom,
- [(pp.Literal("!").suppress(),
- 1,
- pp.opAssoc.RIGHT,
- lambda x: FNot(*x)),
- (pp.Literal("&").suppress(),
- 2,
- pp.opAssoc.LEFT,
- lambda x: FAnd(*x)),
- (pp.Literal("|").suppress(),
- 2,
- pp.opAssoc.LEFT,
- lambda x: FOr(*x)),
- ])
- expr = pp.OneOrMore(expr)
- return expr.setParseAction(lambda x: FAnd(x) if len(x) != 1 else x)
-bnf = _make()
-
-
-def parse(s):
- try:
- filt = bnf.parseString(s, parseAll=True)[0]
- filt.pattern = s
- return filt
- except pp.ParseException:
- return None
- except ValueError:
- return None
-
-
-help = []
-for i in filt_unary:
- help.append(
- ("~%s" % i.code, i.help)
- )
-for i in filt_rex:
- help.append(
- ("~%s regex" % i.code, i.help)
- )
-for i in filt_int:
- help.append(
- ("~%s int" % i.code, i.help)
- )
-help.sort()
-help.extend(
- [
- ("!", "unary not"),
- ("&", "and"),
- ("|", "or"),
- ("(...)", "grouping"),
- ]
-)