aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/addons/upstream_auth.py
blob: c0581e27c6184fc4f3cd1c44248f48e9f45065d7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import re
import typing
import base64

from mitmproxy import exceptions
from mitmproxy import ctx
from mitmproxy.utils import strutils


def parse_upstream_auth(auth):
    pattern = re.compile(".+:")
    if pattern.search(auth) is None:
        raise exceptions.OptionsError(
            "Invalid upstream auth specification: %s" % auth
        )
    return b"Basic" + b" " + base64.b64encode(strutils.always_bytes(auth))


class UpstreamAuth():
    """
        This addon handles authentication to systems upstream from us for the
        upstream proxy and reverse proxy mode. There are 3 cases:

        - Upstream proxy CONNECT requests should have authentication added, and
          subsequent already connected requests should not.
        - Upstream proxy regular requests
        - Reverse proxy regular requests (CONNECT is invalid in this mode)
    """
    def __init__(self):
        self.auth = None

    def load(self, loader):
        loader.add_option(
            "upstream_auth", typing.Optional[str], None,
            """
            Add HTTP Basic authentication to upstream proxy and reverse proxy
            requests. Format: username:password.
            """
        )

    def configure(self, updated):
        # FIXME: We're doing this because our proxy core is terminally confused
        # at the moment. Ideally, we should be able to check if we're in
        # reverse proxy mode at the HTTP layer, so that scripts can put the
        # proxy in reverse proxy mode for specific requests.
        if "upstream_auth" in updated:
            if ctx.options.upstream_auth is None:
                self.auth = None
            else:
                self.auth = parse_upstream_auth(ctx.options.upstream_auth)

    def http_connect(self, f):
        if self.auth and f.mode == "upstream":
            f.request.headers["Proxy-Authorization"] = self.auth

    def requestheaders(self, f):
        if self.auth:
            if f.mode == "upstream" and not f.server_conn.via:
                f.request.headers["Proxy-Authorization"] = self.auth
            elif ctx.options.mode.startswith("reverse"):
                f.request.headers["Proxy-Authorization"] = self.auth