aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_server.py')
-rw-r--r--test/test_server.py282
1 files changed, 228 insertions, 54 deletions
diff --git a/test/test_server.py b/test/test_server.py
index 3906cb8e..d33bcc89 100644
--- a/test/test_server.py
+++ b/test/test_server.py
@@ -1,10 +1,10 @@
import socket, time
-import mock
+from libmproxy.proxy.config import ProxyConfig
from netlib import tcp, http_auth, http
from libpathod import pathoc, pathod
+from netlib.certutils import SSLCert
import tutils, tservers
-from libmproxy import flow
-from libmproxy.protocol import KILL
+from libmproxy.protocol import KILL, Error
from libmproxy.protocol.http import CONTENT_MISSING
"""
@@ -21,8 +21,11 @@ class CommonMixin:
def test_replay(self):
assert self.pathod("304").status_code == 304
- assert len(self.master.state.view) == 1
- l = self.master.state.view[0]
+ if isinstance(self, tservers.HTTPUpstreamProxTest) and self.ssl:
+ assert len(self.master.state.view) == 2
+ else:
+ assert len(self.master.state.view) == 1
+ l = self.master.state.view[-1]
assert l.response.code == 304
l.request.path = "/p/305"
rt = self.master.replay_request(l, block=True)
@@ -31,18 +34,28 @@ class CommonMixin:
# Disconnect error
l.request.path = "/p/305:d0"
rt = self.master.replay_request(l, block=True)
- assert l.error
+ assert not rt
+ if isinstance(self, tservers.HTTPUpstreamProxTest):
+ assert l.response.code == 502
+ else:
+ assert l.error
# Port error
l.request.port = 1
- self.master.replay_request(l, block=True)
- assert l.error
+ # In upstream mode, we get a 502 response from the upstream proxy server.
+ # In upstream mode with ssl, the replay will fail as we cannot establish SSL with the upstream proxy.
+ rt = self.master.replay_request(l, block=True)
+ assert not rt
+ if isinstance(self, tservers.HTTPUpstreamProxTest) and not self.ssl:
+ assert l.response.code == 502
+ else:
+ assert l.error
def test_http(self):
f = self.pathod("304")
assert f.status_code == 304
- l = self.master.state.view[0]
+ l = self.master.state.view[-1] # In Upstream mode with SSL, we may already have a previous CONNECT request.
assert l.client_conn.address
assert "host" in l.request.headers
assert l.response.code == 304
@@ -55,6 +68,51 @@ class CommonMixin:
line = t.rfile.readline()
assert ("Bad Request" in line) or ("Bad Gateway" in line)
+ def test_sni(self):
+ if not self.ssl:
+ return
+
+ f = self.pathod("304", sni="testserver.com")
+ assert f.status_code == 304
+ log = self.server.last_log()
+ assert log["request"]["sni"] == "testserver.com"
+
+class TcpMixin:
+ def _ignore_on(self):
+ conf = ProxyConfig(ignore=[".+:%s" % self.server.port])
+ self.config.ignore.append(conf.ignore[0])
+
+ def _ignore_off(self):
+ self.config.ignore.pop()
+
+ def test_ignore(self):
+ spec = '304:h"Alternate-Protocol"="mitmproxy-will-remove-this"'
+ n = self.pathod(spec)
+ self._ignore_on()
+ i = self.pathod(spec)
+ i2 = self.pathod(spec)
+ self._ignore_off()
+
+ assert i.status_code == i2.status_code == n.status_code == 304
+ assert "Alternate-Protocol" in i.headers
+ assert "Alternate-Protocol" in i2.headers
+ assert "Alternate-Protocol" not in n.headers
+
+ # Test that we get the original SSL cert
+ if self.ssl:
+ i_cert = SSLCert(i.sslinfo.certchain[0])
+ i2_cert = SSLCert(i2.sslinfo.certchain[0])
+ n_cert = SSLCert(n.sslinfo.certchain[0])
+
+ assert i_cert == i2_cert
+ assert i_cert != n_cert
+
+ # Test Non-HTTP traffic
+ spec = "200:i0,@100:d0" # this results in just 100 random bytes
+ assert self.pathod(spec).status_code == 502 # mitmproxy responds with bad gateway
+ self._ignore_on()
+ tutils.raises("invalid server response", self.pathod, spec) # pathoc tries to parse answer as HTTP
+ self._ignore_off()
class AppMixin:
@@ -64,7 +122,6 @@ class AppMixin:
assert "mitmproxy" in ret.content
-
class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin):
def test_app_err(self):
p = self.pathoc()
@@ -186,7 +243,7 @@ class TestHTTPConnectSSLError(tservers.HTTPProxTest):
tutils.raises("502 - Bad Gateway", p.http_connect, dst)
-class TestHTTPS(tservers.HTTPProxTest, CommonMixin):
+class TestHTTPS(tservers.HTTPProxTest, CommonMixin, TcpMixin):
ssl = True
ssloptions = pathod.SSLOptions(request_client_cert=True)
clientcerts = True
@@ -195,12 +252,6 @@ class TestHTTPS(tservers.HTTPProxTest, CommonMixin):
assert f.status_code == 304
assert self.server.last_log()["request"]["clientcert"]["keyinfo"]
- def test_sni(self):
- f = self.pathod("304", sni="testserver.com")
- assert f.status_code == 304
- l = self.server.last_log()
- assert self.server.last_log()["request"]["sni"] == "testserver.com"
-
def test_error_post_connect(self):
p = self.pathoc()
assert p.request("get:/:i0,'invalid\r\n\r\n'").status_code == 400
@@ -228,21 +279,16 @@ class TestHTTPSNoCommonName(tservers.HTTPProxTest):
assert f.sslinfo.certchain[0].get_subject().CN == "127.0.0.1"
-class TestReverse(tservers.ReverseProxTest, CommonMixin):
+class TestReverse(tservers.ReverseProxTest, CommonMixin, TcpMixin):
reverse = True
-class TestTransparent(tservers.TransparentProxTest, CommonMixin):
+class TestTransparent(tservers.TransparentProxTest, CommonMixin, TcpMixin):
ssl = False
-class TestTransparentSSL(tservers.TransparentProxTest, CommonMixin):
+class TestTransparentSSL(tservers.TransparentProxTest, CommonMixin, TcpMixin):
ssl = True
- def test_sni(self):
- f = self.pathod("304", sni="testserver.com")
- assert f.status_code == 304
- l = self.server.last_log()
- assert l["request"]["sni"] == "testserver.com"
def test_sslerr(self):
p = pathoc.Pathoc(("localhost", self.proxy.port))
@@ -323,7 +369,7 @@ class TestProxy(tservers.HTTPProxTest):
f = self.pathod("200:b@100")
assert f.status_code == 200
f = self.master.state.view[0]
- assert f.server_conn.peername == ("127.0.0.1", self.server.port)
+ assert f.server_conn.address == ("127.0.0.1", self.server.port)
class TestProxySSL(tservers.HTTPProxTest):
ssl=True
@@ -331,29 +377,30 @@ class TestProxySSL(tservers.HTTPProxTest):
# tests that the ssl timestamp is present when ssl is used
f = self.pathod("304:b@10k")
assert f.status_code == 304
- first_request = self.master.state.view[0].request
- assert first_request.flow.server_conn.timestamp_ssl_setup
+ first_flow = self.master.state.view[0]
+ assert first_flow.server_conn.timestamp_ssl_setup
class MasterRedirectRequest(tservers.TestMaster):
- def handle_request(self, request):
+ redirect_port = None # Set by TestRedirectRequest
+
+ def handle_request(self, f):
+ request = f.request
if request.path == "/p/201":
- url = request.get_url()
+ url = request.url
new = "http://127.0.0.1:%s/p/201" % self.redirect_port
- request.set_url(new)
- request.set_url(new)
- request.flow.live.change_server(("127.0.0.1", self.redirect_port), False)
- request.set_url(url)
- tutils.raises("SSL handshake error", request.flow.live.change_server, ("127.0.0.1", self.redirect_port), True)
- request.set_url(new)
- request.set_url(url)
- request.set_url(new)
- tservers.TestMaster.handle_request(self, request)
+ request.url = new
+ f.live.change_server(("127.0.0.1", self.redirect_port), False)
+ request.url = url
+ tutils.raises("SSL handshake error", f.live.change_server, ("127.0.0.1", self.redirect_port), True)
+ request.url = new
+ tservers.TestMaster.handle_request(self, f)
- def handle_response(self, response):
- response.content = str(response.flow.client_conn.address.port)
- tservers.TestMaster.handle_response(self, response)
+ def handle_response(self, f):
+ f.response.content = str(f.client_conn.address.port)
+ f.response.headers["server-conn-id"] = [str(f.server_conn.source_address.port)]
+ tservers.TestMaster.handle_response(self, f)
class TestRedirectRequest(tservers.HTTPProxTest):
@@ -385,16 +432,17 @@ class TestRedirectRequest(tservers.HTTPProxTest):
assert self.server.last_log()
assert not self.server2.last_log()
- assert r3.content == r2.content == r1.content
+ assert r1.content == r2.content == r3.content
+ assert r1.headers.get_first("server-conn-id") == r3.headers.get_first("server-conn-id")
# Make sure that we actually use the same connection in this test case
class MasterStreamRequest(tservers.TestMaster):
"""
Enables the stream flag on the flow for all requests
"""
- def handle_responseheaders(self, r):
- r.stream = True
- r.reply()
+ def handle_responseheaders(self, f):
+ f.response.stream = True
+ f.reply()
class TestStreamRequest(tservers.HTTPProxTest):
masterclass = MasterStreamRequest
@@ -445,9 +493,9 @@ class TestStreamRequest(tservers.HTTPProxTest):
class MasterFakeResponse(tservers.TestMaster):
- def handle_request(self, m):
+ def handle_request(self, f):
resp = tutils.tresp()
- m.reply(resp)
+ f.reply(resp)
class TestFakeResponse(tservers.HTTPProxTest):
@@ -458,8 +506,8 @@ class TestFakeResponse(tservers.HTTPProxTest):
class MasterKillRequest(tservers.TestMaster):
- def handle_request(self, m):
- m.reply(KILL)
+ def handle_request(self, f):
+ f.reply(KILL)
class TestKillRequest(tservers.HTTPProxTest):
@@ -471,8 +519,8 @@ class TestKillRequest(tservers.HTTPProxTest):
class MasterKillResponse(tservers.TestMaster):
- def handle_response(self, m):
- m.reply(KILL)
+ def handle_response(self, f):
+ f.reply(KILL)
class TestKillResponse(tservers.HTTPProxTest):
@@ -495,10 +543,10 @@ class TestTransparentResolveError(tservers.TransparentProxTest):
class MasterIncomplete(tservers.TestMaster):
- def handle_request(self, m):
+ def handle_request(self, f):
resp = tutils.tresp()
resp.content = CONTENT_MISSING
- m.reply(resp)
+ f.reply(resp)
class TestIncompleteResponse(tservers.HTTPProxTest):
@@ -510,6 +558,132 @@ class TestIncompleteResponse(tservers.HTTPProxTest):
class TestCertForward(tservers.HTTPProxTest):
certforward = True
ssl = True
+
def test_app_err(self):
tutils.raises("handshake error", self.pathod, "200:b@100")
+
+class TestUpstreamProxy(tservers.HTTPUpstreamProxTest, CommonMixin, AppMixin):
+ ssl = False
+
+ def test_order(self):
+ self.proxy.tmaster.replacehooks.add("~q", "foo", "bar") # replace in request
+ self.chain[0].tmaster.replacehooks.add("~q", "bar", "baz")
+ self.chain[1].tmaster.replacehooks.add("~q", "foo", "oh noes!")
+ self.chain[0].tmaster.replacehooks.add("~s", "baz", "ORLY") # replace in response
+
+ p = self.pathoc()
+ req = p.request("get:'%s/p/418:b\"foo\"'" % self.server.urlbase)
+ assert req.content == "ORLY"
+ assert req.status_code == 418
+
+
+class TestUpstreamProxySSL(tservers.HTTPUpstreamProxTest, CommonMixin, TcpMixin):
+ ssl = True
+
+ def _ignore_on(self):
+ super(TestUpstreamProxySSL, self)._ignore_on()
+ conf = ProxyConfig(ignore=[".+:%s" % self.server.port])
+ for proxy in self.chain:
+ proxy.tmaster.server.config.ignore.append(conf.ignore[0])
+
+ def _ignore_off(self):
+ super(TestUpstreamProxySSL, self)._ignore_off()
+ for proxy in self.chain:
+ proxy.tmaster.server.config.ignore.pop()
+
+ def test_simple(self):
+ p = self.pathoc()
+ req = p.request("get:'/p/418:b\"content\"'")
+ assert req.content == "content"
+ assert req.status_code == 418
+
+ assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT from pathoc to chain[0],
+ # request from pathoc to chain[0]
+ assert self.chain[0].tmaster.state.flow_count() == 2 # CONNECT from proxy to chain[1],
+ # request from proxy to chain[1]
+ assert self.chain[1].tmaster.state.flow_count() == 1 # request from chain[0] (regular proxy doesn't store CONNECTs)
+
+ def test_closing_connect_response(self):
+ """
+ https://github.com/mitmproxy/mitmproxy/issues/313
+ """
+ def handle_request(f):
+ f.request.httpversion = (1, 0)
+ del f.request.headers["Content-Length"]
+ f.reply()
+ _handle_request = self.chain[0].tmaster.handle_request
+ self.chain[0].tmaster.handle_request = handle_request
+ try:
+ assert self.pathoc().request("get:/p/418").status_code == 418
+ finally:
+ self.chain[0].tmaster.handle_request = _handle_request
+
+
+class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
+ ssl = True
+
+ def test_reconnect(self):
+ """
+ Tests proper functionality of ConnectionHandler.server_reconnect mock.
+ If we have a disconnect on a secure connection that's transparently proxified to
+ an upstream http proxy, we need to send the CONNECT request again.
+ """
+ def kill_requests(master, attr, exclude):
+ k = [0] # variable scope workaround: put into array
+ _func = getattr(master, attr)
+ def handler(f):
+ k[0] += 1
+ if not (k[0] in exclude):
+ f.client_conn.finish()
+ f.error = Error("terminated")
+ f.reply(KILL)
+ return _func(f)
+ setattr(master, attr, handler)
+
+ kill_requests(self.chain[1].tmaster, "handle_request",
+ exclude=[
+ # fail first request
+ 2, # allow second request
+ ])
+
+ kill_requests(self.chain[0].tmaster, "handle_request",
+ exclude=[
+ 1, # CONNECT
+ # fail first request
+ 3, # reCONNECT
+ 4, # request
+ ])
+
+ p = self.pathoc()
+ req = p.request("get:'/p/418:b\"content\"'")
+ assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT and request
+ assert self.chain[0].tmaster.state.flow_count() == 4 # CONNECT, failing request,
+ # reCONNECT, request
+ assert self.chain[1].tmaster.state.flow_count() == 2 # failing request, request
+ # (doesn't store (repeated) CONNECTs from chain[0]
+ # as it is a regular proxy)
+ assert req.content == "content"
+ assert req.status_code == 418
+
+ assert not self.chain[1].tmaster.state._flow_list[0].response # killed
+ assert self.chain[1].tmaster.state._flow_list[1].response
+
+ assert self.proxy.tmaster.state._flow_list[0].request.form_in == "authority"
+ assert self.proxy.tmaster.state._flow_list[1].request.form_in == "relative"
+
+ assert self.chain[0].tmaster.state._flow_list[0].request.form_in == "authority"
+ assert self.chain[0].tmaster.state._flow_list[1].request.form_in == "relative"
+ assert self.chain[0].tmaster.state._flow_list[2].request.form_in == "authority"
+ assert self.chain[0].tmaster.state._flow_list[3].request.form_in == "relative"
+
+ assert self.chain[1].tmaster.state._flow_list[0].request.form_in == "relative"
+ assert self.chain[1].tmaster.state._flow_list[1].request.form_in == "relative"
+
+ req = p.request("get:'/p/418:b\"content2\"'")
+
+ assert req.status_code == 502
+ assert self.proxy.tmaster.state.flow_count() == 3 # + new request
+ assert self.chain[0].tmaster.state.flow_count() == 6 # + new request, repeated CONNECT from chain[1]
+ # (both terminated)
+ assert self.chain[1].tmaster.state.flow_count() == 2 # nothing happened here