aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_server.py')
-rw-r--r--test/test_server.py101
1 files changed, 92 insertions, 9 deletions
diff --git a/test/test_server.py b/test/test_server.py
index 0ce5d056..c81eab2b 100644
--- a/test/test_server.py
+++ b/test/test_server.py
@@ -1,5 +1,5 @@
import socket, time
-from libmproxy.proxy.config import parse_host_pattern
+from libmproxy.proxy.config import HostMatcher
from netlib import tcp, http_auth, http
from libpathod import pathoc, pathod
from netlib.certutils import SSLCert
@@ -19,6 +19,17 @@ class CommonMixin:
def test_large(self):
assert len(self.pathod("200:b@50k").content) == 1024*50
+ @staticmethod
+ def wait_until_not_live(flow):
+ """
+ Race condition: We don't want to replay the flow while it is still live.
+ """
+ s = time.time()
+ while flow.live:
+ time.sleep(0.001)
+ if time.time() - s > 5:
+ raise RuntimeError("Flow is live for too long.")
+
def test_replay(self):
assert self.pathod("304").status_code == 304
if isinstance(self, tservers.HTTPUpstreamProxTest) and self.ssl:
@@ -28,6 +39,7 @@ class CommonMixin:
l = self.master.state.view[-1]
assert l.response.code == 304
l.request.path = "/p/305"
+ self.wait_until_not_live(l)
rt = self.master.replay_request(l, block=True)
assert l.response.code == 305
@@ -79,11 +91,14 @@ class CommonMixin:
class TcpMixin:
def _ignore_on(self):
- ignore = parse_host_pattern([".+:%s" % self.server.port])[0]
- self.config.ignore.append(ignore)
+ assert not hasattr(self, "_ignore_backup")
+ self._ignore_backup = self.config.check_ignore
+ self.config.check_ignore = HostMatcher([".+:%s" % self.server.port] + self.config.check_ignore.patterns)
def _ignore_off(self):
- self.config.ignore.pop()
+ assert hasattr(self, "_ignore_backup")
+ self.config.check_ignore = self._ignore_backup
+ del self._ignore_backup
def test_ignore(self):
spec = '304:h"Alternate-Protocol"="mitmproxy-will-remove-this"'
@@ -114,6 +129,40 @@ class TcpMixin:
tutils.raises("invalid server response", self.pathod, spec) # pathoc tries to parse answer as HTTP
self._ignore_off()
+ def _tcpproxy_on(self):
+ assert not hasattr(self, "_tcpproxy_backup")
+ self._tcpproxy_backup = self.config.check_tcp
+ self.config.check_tcp = HostMatcher([".+:%s" % self.server.port] + self.config.check_tcp.patterns)
+
+ def _tcpproxy_off(self):
+ assert hasattr(self, "_tcpproxy_backup")
+ self.config.check_ignore = self._tcpproxy_backup
+ del self._tcpproxy_backup
+
+
+ def test_tcp(self):
+ spec = '304:h"Alternate-Protocol"="mitmproxy-will-remove-this"'
+ n = self.pathod(spec)
+ self._tcpproxy_on()
+ i = self.pathod(spec)
+ i2 = self.pathod(spec)
+ self._tcpproxy_off()
+
+ assert i.status_code == i2.status_code == n.status_code == 304
+ assert "Alternate-Protocol" in i.headers
+ assert "Alternate-Protocol" in i2.headers
+ assert "Alternate-Protocol" not in n.headers
+
+ # Test that we get the original SSL cert
+ if self.ssl:
+ i_cert = SSLCert(i.sslinfo.certchain[0])
+ i2_cert = SSLCert(i2.sslinfo.certchain[0])
+ n_cert = SSLCert(n.sslinfo.certchain[0])
+
+ assert i_cert == i2_cert == n_cert
+
+ # Make sure that TCP messages are in the event log.
+ assert any("mitmproxy-will-remove-this" in m for m in self.master.log)
class AppMixin:
def test_app(self):
@@ -579,16 +628,50 @@ class TestUpstreamProxy(tservers.HTTPUpstreamProxTest, CommonMixin, AppMixin):
class TestUpstreamProxySSL(tservers.HTTPUpstreamProxTest, CommonMixin, TcpMixin):
ssl = True
+ def _host_pattern_on(self, attr):
+ """
+ Updates config.check_tcp or check_ignore, depending on attr.
+ """
+ assert not hasattr(self, "_ignore_%s_backup" % attr)
+ backup = []
+ for proxy in self.chain:
+ old_matcher = getattr(proxy.tmaster.server.config, "check_%s" % attr)
+ backup.append(old_matcher)
+ setattr(
+ proxy.tmaster.server.config,
+ "check_%s" % attr,
+ HostMatcher([".+:%s" % self.server.port] + old_matcher.patterns)
+ )
+
+ setattr(self, "_ignore_%s_backup" % attr, backup)
+
+ def _host_pattern_off(self, attr):
+ backup = getattr(self, "_ignore_%s_backup" % attr)
+ for proxy in reversed(self.chain):
+ setattr(
+ proxy.tmaster.server.config,
+ "check_%s" % attr,
+ backup.pop()
+ )
+
+ assert not backup
+ delattr(self, "_ignore_%s_backup" % attr)
+
def _ignore_on(self):
super(TestUpstreamProxySSL, self)._ignore_on()
- ignore = parse_host_pattern([".+:%s" % self.server.port])[0]
- for proxy in self.chain:
- proxy.tmaster.server.config.ignore.append(ignore)
+ self._host_pattern_on("ignore")
def _ignore_off(self):
super(TestUpstreamProxySSL, self)._ignore_off()
- for proxy in self.chain:
- proxy.tmaster.server.config.ignore.pop()
+ self._host_pattern_off("ignore")
+
+ def _tcpproxy_on(self):
+ super(TestUpstreamProxySSL, self)._tcpproxy_on()
+ self._host_pattern_on("tcp")
+
+ def _tcpproxy_off(self):
+ super(TestUpstreamProxySSL, self)._tcpproxy_off()
+ self._host_pattern_off("tcp")
def test_simple(self):
p = self.pathoc()