aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/addons/replace.py
blob: 34bb40c22cdad3847d3e80ebc0bdfe469ca7dc78 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import re

from mitmproxy import exceptions
from mitmproxy import flowfilter
from mitmproxy import ctx


def parse_hook(s):
    """
        Returns a (pattern, regex, replacement) tuple.

        The general form for a replacement hook is as follows:

            /patt/regex/replacement

        The first character specifies the separator. Example:

            :~q:foo:bar

        If only two clauses are specified, the pattern is set to match
        universally (i.e. ".*"). Example:

            /foo/bar/

        Clauses are parsed from left to right. Extra separators are taken to be
        part of the final clause. For instance, the replacement clause below is
        "foo/bar/":

            /one/two/foo/bar/
    """
    sep, rem = s[0], s[1:]
    parts = rem.split(sep, 2)
    if len(parts) == 2:
        patt = ".*"
        a, b = parts
    elif len(parts) == 3:
        patt, a, b = parts
    else:
        raise exceptions.OptionsError(
            "Invalid replacement specifier: %s" % s
        )
    return patt, a, b


class _ReplaceBase:
    def __init__(self):
        self.lst = []

    def configure(self, options, updated):
        """
            .replacements is a list of tuples (fpat, rex, s):

            fpatt: a string specifying a filter pattern.
            rex: a regular expression, as bytes.
            s: the replacement string, as bytes
        """
        if self.optionName in updated:
            lst = []
            for rep in getattr(options, self.optionName):
                if isinstance(rep, str):
                    fpatt, rex, s = parse_hook(rep)
                else:
                    fpatt, rex, s = rep

                flt = flowfilter.parse(fpatt)
                if not flt:
                    raise exceptions.OptionsError(
                        "Invalid filter pattern: %s" % fpatt
                    )
                try:
                    re.compile(rex)
                except re.error as e:
                    raise exceptions.OptionsError(
                        "Invalid regular expression: %s - %s" % (rex, str(e))
                    )
                lst.append((rex, s, flt))
            self.lst = lst

    def execute(self, f):
        for rex, s, flt in self.lst:
            if flt(f):
                if f.response:
                    self.replace(f.response, rex, s)
                else:
                    self.replace(f.request, rex, s)

    def request(self, flow):
        if not flow.reply.has_message:
            self.execute(flow)

    def response(self, flow):
        if not flow.reply.has_message:
            self.execute(flow)


class Replace(_ReplaceBase):
    optionName = "replacements"

    def replace(self, obj, rex, s):
        obj.replace(rex, s, flags=re.DOTALL)


class ReplaceFile(_ReplaceBase):
    optionName = "replacement_files"

    def replace(self, obj, rex, s):
        try:
            v = open(s, "rb").read()
        except IOError as e:
            ctx.log.warn("Could not read replacement file: %s" % s)
            return
        obj.replace(rex, v, flags=re.DOTALL)