aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--mitmproxy/addons/proxyauth.py99
-rw-r--r--test/mitmproxy/addons/test_proxyauth.py122
2 files changed, 205 insertions, 16 deletions
diff --git a/mitmproxy/addons/proxyauth.py b/mitmproxy/addons/proxyauth.py
index fc68de71..aeeb04f3 100644
--- a/mitmproxy/addons/proxyauth.py
+++ b/mitmproxy/addons/proxyauth.py
@@ -3,6 +3,11 @@ import binascii
import passlib.apache
from mitmproxy import exceptions
+from mitmproxy import http
+import mitmproxy.net.http
+
+
+REALM = "mitmproxy"
def parse_http_basic_auth(s):
@@ -20,19 +25,74 @@ def parse_http_basic_auth(s):
return scheme, parts[0], parts[1]
-def assemble_http_basic_auth(scheme, username, password):
- v = binascii.b2a_base64(
- (username + ":" + password).encode("utf8")
- ).decode("ascii")
- return scheme + " " + v
-
-
class ProxyAuth:
def __init__(self):
self.nonanonymous = False
self.htpasswd = None
self.singleuser = None
+ def enabled(self):
+ return any([self.nonanonymous, self.htpasswd, self.singleuser])
+
+ def which_auth_header(self, f):
+ if f.mode == "regular":
+ return 'Proxy-Authorization'
+ else:
+ return 'Authorization'
+
+ def auth_required_response(self, f):
+ if f.mode == "regular":
+ hdrname = 'Proxy-Authenticate'
+ else:
+ hdrname = 'WWW-Authenticate'
+
+ headers = mitmproxy.net.http.Headers()
+ headers[hdrname] = 'Basic realm="%s"' % REALM
+
+ if f.mode == "transparent":
+ return http.make_error_response(
+ 401,
+ "Authentication Required",
+ headers
+ )
+ else:
+ return http.make_error_response(
+ 407,
+ "Proxy Authentication Required",
+ headers,
+ )
+
+ def check(self, f):
+ auth_value = f.request.headers.get(self.which_auth_header(f), None)
+ if not auth_value:
+ return False
+ parts = parse_http_basic_auth(auth_value)
+ if not parts:
+ return False
+ scheme, username, password = parts
+ if scheme.lower() != 'basic':
+ return False
+
+ if self.nonanonymous:
+ pass
+ elif self.singleuser:
+ if [username, password] != self.singleuser:
+ return False
+ elif self.htpasswd:
+ if not self.htpasswd.check_password(username, password):
+ return False
+ else:
+ raise NotImplementedError("Should never happen.")
+
+ return True
+
+ def authenticate(self, f):
+ if self.check(f):
+ del f.request.headers[self.which_auth_header(f)]
+ else:
+ f.response = self.auth_required_response(f)
+
+ # Handlers
def configure(self, options, updated):
if "auth_nonanonymous" in updated:
self.nonanonymous = options.auth_nonanonymous
@@ -57,12 +117,25 @@ class ProxyAuth:
"Could not open htpasswd file: %s" % v
)
else:
- self.auth_htpasswd = None
+ self.htpasswd = None
+ if self.enabled():
+ if options.mode == "transparent":
+ raise exceptions.OptionsError(
+ "Proxy Authentication not supported in transparent mode."
+ )
+ elif options.mode == "socks5":
+ raise exceptions.OptionsError(
+ "Proxy Authentication not supported in SOCKS mode. "
+ "https://github.com/mitmproxy/mitmproxy/issues/738"
+ )
+ # TODO: check for multiple auth options
def http_connect(self, f):
- # mode = regular
- pass
+ if self.enabled() and f.mode == "regular":
+ self.authenticate(f)
- def http_request(self, f):
- # mode = regular, no via
- pass
+ def requestheaders(self, f):
+ if self.enabled():
+ # Are we already authenticated in CONNECT?
+ if not (f.mode == "regular" and f.server_conn.via):
+ self.authenticate(f)
diff --git a/test/mitmproxy/addons/test_proxyauth.py b/test/mitmproxy/addons/test_proxyauth.py
index e9dcf7bf..73d87cbf 100644
--- a/test/mitmproxy/addons/test_proxyauth.py
+++ b/test/mitmproxy/addons/test_proxyauth.py
@@ -7,11 +7,17 @@ from mitmproxy.test import tutils
from mitmproxy.addons import proxyauth
+def mkauth(username, password, scheme="basic"):
+ v = binascii.b2a_base64(
+ (username + ":" + password).encode("utf8")
+ ).decode("ascii")
+ return scheme + " " + v
+
+
def test_parse_http_basic_auth():
- vals = ("basic", "foo", "bar")
assert proxyauth.parse_http_basic_auth(
- proxyauth.assemble_http_basic_auth(*vals)
- ) == vals
+ mkauth("test", "test")
+ ) == ("basic", "test", "test")
assert not proxyauth.parse_http_basic_auth("")
assert not proxyauth.parse_http_basic_auth("foo bar")
v = "basic " + binascii.b2a_base64(b"foo").decode("ascii")
@@ -51,3 +57,113 @@ def test_configure():
up,
auth_htpasswd = "nonexistent"
)
+
+ ctx.configure(
+ up,
+ auth_htpasswd = tutils.test_data.path(
+ "mitmproxy/net/data/htpasswd"
+ )
+ )
+ assert up.htpasswd
+ assert up.htpasswd.check_password("test", "test")
+ assert not up.htpasswd.check_password("test", "foo")
+ ctx.configure(up, auth_htpasswd = None)
+ assert not up.htpasswd
+
+ tutils.raises(
+ exceptions.OptionsError,
+ ctx.configure,
+ up,
+ auth_nonanonymous = True,
+ mode = "transparent"
+ )
+ tutils.raises(
+ exceptions.OptionsError,
+ ctx.configure,
+ up,
+ auth_nonanonymous = True,
+ mode = "socks5"
+ )
+
+
+def test_check():
+ up = proxyauth.ProxyAuth()
+ with taddons.context() as ctx:
+ ctx.configure(up, auth_nonanonymous=True)
+ f = tflow.tflow()
+ assert not up.check(f)
+ f.request.headers["Proxy-Authorization"] = mkauth("test", "test")
+ assert up.check(f)
+
+ f.request.headers["Proxy-Authorization"] = "invalid"
+ assert not up.check(f)
+
+ f.request.headers["Proxy-Authorization"] = mkauth(
+ "test", "test", scheme = "unknown"
+ )
+ assert not up.check(f)
+
+ ctx.configure(up, auth_nonanonymous=False, auth_singleuser="test:test")
+ f.request.headers["Proxy-Authorization"] = mkauth("test", "test")
+ assert up.check(f)
+ ctx.configure(up, auth_nonanonymous=False, auth_singleuser="test:foo")
+ assert not up.check(f)
+
+ ctx.configure(
+ up,
+ auth_singleuser = None,
+ auth_htpasswd = tutils.test_data.path(
+ "mitmproxy/net/data/htpasswd"
+ )
+ )
+ f.request.headers["Proxy-Authorization"] = mkauth("test", "test")
+ assert up.check(f)
+ f.request.headers["Proxy-Authorization"] = mkauth("test", "foo")
+ assert not up.check(f)
+
+
+def test_authenticate():
+ up = proxyauth.ProxyAuth()
+ with taddons.context() as ctx:
+ ctx.configure(up, auth_nonanonymous=True)
+
+ f = tflow.tflow()
+ assert not f.response
+ up.authenticate(f)
+ assert f.response.status_code == 407
+
+ f = tflow.tflow()
+ f.request.headers["Proxy-Authorization"] = mkauth("test", "test")
+ up.authenticate(f)
+ assert not f.response
+ assert not f.request.headers.get("Proxy-Authorization")
+
+ f = tflow.tflow()
+ f.mode = "transparent"
+ assert not f.response
+ up.authenticate(f)
+ assert f.response.status_code == 401
+
+ f = tflow.tflow()
+ f.mode = "transparent"
+ f.request.headers["Authorization"] = mkauth("test", "test")
+ up.authenticate(f)
+ assert not f.response
+ assert not f.request.headers.get("Authorization")
+
+
+def test_handlers():
+ up = proxyauth.ProxyAuth()
+ with taddons.context() as ctx:
+ ctx.configure(up, auth_nonanonymous=True)
+
+ f = tflow.tflow()
+ assert not f.response
+ up.requestheaders(f)
+ assert f.response.status_code == 407
+
+ f = tflow.tflow()
+ f.request.method = "CONNECT"
+ assert not f.response
+ up.http_connect(f)
+ assert f.response.status_code == 407