aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_server.py
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2016-02-15 14:58:46 +0100
committerMaximilian Hils <git@maximilianhils.com>2016-02-15 14:58:46 +0100
commit33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04 (patch)
tree31914a601302579ff817504019296fd7e9e46765 /test/test_server.py
parent36f34f701991b5d474c005ec45e3b66e20f326a8 (diff)
downloadmitmproxy-33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04.tar.gz
mitmproxy-33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04.tar.bz2
mitmproxy-33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04.zip
move mitmproxy
Diffstat (limited to 'test/test_server.py')
-rw-r--r--test/test_server.py1001
1 files changed, 0 insertions, 1001 deletions
diff --git a/test/test_server.py b/test/test_server.py
deleted file mode 100644
index 1b7e6966..00000000
--- a/test/test_server.py
+++ /dev/null
@@ -1,1001 +0,0 @@
-import os
-import socket
-import time
-from OpenSSL import SSL
-from netlib.exceptions import HttpReadDisconnect, HttpException
-from netlib.tcp import Address
-
-import netlib.tutils
-from netlib import tcp, http, socks
-from netlib.certutils import SSLCert
-from netlib.http import authentication, CONTENT_MISSING, http1
-from netlib.tutils import raises
-from libpathod import pathoc, pathod
-
-from libmproxy.proxy.config import HostMatcher
-from libmproxy.protocol import Kill
-from libmproxy.models import Error, HTTPResponse
-
-from . import tutils, tservers
-
-"""
- Note that the choice of response code in these tests matters more than you
- might think. libcurl treats a 304 response code differently from, say, a
- 200 response code - it will correctly terminate a 304 response with no
- content-length header, whereas it will block forever waiting for content
- for a 200 response.
-"""
-
-
-class CommonMixin:
-
- def test_large(self):
- assert len(self.pathod("200:b@50k").content) == 1024 * 50
-
- @staticmethod
- def wait_until_not_live(flow):
- """
- Race condition: We don't want to replay the flow while it is still live.
- """
- s = time.time()
- while flow.live:
- time.sleep(0.001)
- if time.time() - s > 5:
- raise RuntimeError("Flow is live for too long.")
-
- def test_replay(self):
- assert self.pathod("304").status_code == 304
- if isinstance(self, tservers.HTTPUpstreamProxTest) and self.ssl:
- assert len(self.master.state.view) == 2
- else:
- assert len(self.master.state.view) == 1
- l = self.master.state.view[-1]
- assert l.response.status_code == 304
- l.request.path = "/p/305"
- self.wait_until_not_live(l)
- rt = self.master.replay_request(l, block=True)
- assert l.response.status_code == 305
-
- # Disconnect error
- l.request.path = "/p/305:d0"
- rt = self.master.replay_request(l, block=True)
- assert not rt
- if isinstance(self, tservers.HTTPUpstreamProxTest):
- assert l.response.status_code == 502
- else:
- assert l.error
-
- # Port error
- l.request.port = 1
- # In upstream mode, we get a 502 response from the upstream proxy server.
- # In upstream mode with ssl, the replay will fail as we cannot establish
- # SSL with the upstream proxy.
- rt = self.master.replay_request(l, block=True)
- assert not rt
- if isinstance(self, tservers.HTTPUpstreamProxTest):
- assert l.response.status_code == 502
- else:
- assert l.error
-
- def test_http(self):
- f = self.pathod("304")
- assert f.status_code == 304
-
- # In Upstream mode with SSL, we may already have a previous CONNECT
- # request.
- l = self.master.state.view[-1]
- assert l.client_conn.address
- assert "host" in l.request.headers
- assert l.response.status_code == 304
-
- def test_invalid_http(self):
- t = tcp.TCPClient(("127.0.0.1", self.proxy.port))
- t.connect()
- t.wfile.write("invalid\r\n\r\n")
- t.wfile.flush()
- line = t.rfile.readline()
- assert ("Bad Request" in line) or ("Bad Gateway" in line)
-
- def test_sni(self):
- if not self.ssl:
- return
-
- f = self.pathod("304", sni="testserver.com")
- assert f.status_code == 304
- log = self.server.last_log()
- assert log["request"]["sni"] == "testserver.com"
-
-
-class TcpMixin:
-
- def _ignore_on(self):
- assert not hasattr(self, "_ignore_backup")
- self._ignore_backup = self.config.check_ignore
- self.config.check_ignore = HostMatcher(
- [".+:%s" % self.server.port] + self.config.check_ignore.patterns)
-
- def _ignore_off(self):
- assert hasattr(self, "_ignore_backup")
- self.config.check_ignore = self._ignore_backup
- del self._ignore_backup
-
- def test_ignore(self):
- n = self.pathod("304")
- self._ignore_on()
- i = self.pathod("305")
- i2 = self.pathod("306")
- self._ignore_off()
-
- self.master.masterq.join()
-
- assert n.status_code == 304
- assert i.status_code == 305
- assert i2.status_code == 306
- assert any(f.response.status_code == 304 for f in self.master.state.flows)
- assert not any(f.response.status_code == 305 for f in self.master.state.flows)
- assert not any(f.response.status_code == 306 for f in self.master.state.flows)
-
- # Test that we get the original SSL cert
- if self.ssl:
- i_cert = SSLCert(i.sslinfo.certchain[0])
- i2_cert = SSLCert(i2.sslinfo.certchain[0])
- n_cert = SSLCert(n.sslinfo.certchain[0])
-
- assert i_cert == i2_cert
- assert i_cert != n_cert
-
- # Test Non-HTTP traffic
- spec = "200:i0,@100:d0" # this results in just 100 random bytes
- # mitmproxy responds with bad gateway
- assert self.pathod(spec).status_code == 502
- self._ignore_on()
- with raises(HttpException):
- self.pathod(spec) # pathoc tries to parse answer as HTTP
-
- self._ignore_off()
-
- def _tcpproxy_on(self):
- assert not hasattr(self, "_tcpproxy_backup")
- self._tcpproxy_backup = self.config.check_tcp
- self.config.check_tcp = HostMatcher(
- [".+:%s" % self.server.port] + self.config.check_tcp.patterns)
-
- def _tcpproxy_off(self):
- assert hasattr(self, "_tcpproxy_backup")
- self.config.check_tcp = self._tcpproxy_backup
- del self._tcpproxy_backup
-
- def test_tcp(self):
- n = self.pathod("304")
- self._tcpproxy_on()
- i = self.pathod("305")
- i2 = self.pathod("306")
- self._tcpproxy_off()
-
- self.master.masterq.join()
-
- assert n.status_code == 304
- assert i.status_code == 305
- assert i2.status_code == 306
- assert any(f.response.status_code == 304 for f in self.master.state.flows)
- assert not any(f.response.status_code == 305 for f in self.master.state.flows)
- assert not any(f.response.status_code == 306 for f in self.master.state.flows)
-
- # Test that we get the original SSL cert
- if self.ssl:
- i_cert = SSLCert(i.sslinfo.certchain[0])
- i2_cert = SSLCert(i2.sslinfo.certchain[0])
- n_cert = SSLCert(n.sslinfo.certchain[0])
-
- assert i_cert == i2_cert == n_cert
-
- # Make sure that TCP messages are in the event log.
- assert any("305" in m for m in self.master.log)
- assert any("306" in m for m in self.master.log)
-
-
-class AppMixin:
-
- def test_app(self):
- ret = self.app("/")
- assert ret.status_code == 200
- assert "mitmproxy" in ret.content
-
-
-class TestHTTP(tservers.HTTPProxTest, CommonMixin, AppMixin):
-
- def test_app_err(self):
- p = self.pathoc()
- ret = p.request("get:'http://errapp/'")
- assert ret.status_code == 500
- assert "ValueError" in ret.content
-
- def test_invalid_connect(self):
- t = tcp.TCPClient(("127.0.0.1", self.proxy.port))
- t.connect()
- t.wfile.write("CONNECT invalid\n\n")
- t.wfile.flush()
- assert "Bad Request" in t.rfile.readline()
-
- def test_upstream_ssl_error(self):
- p = self.pathoc()
- ret = p.request("get:'https://localhost:%s/'" % self.server.port)
- assert ret.status_code == 400
-
- def test_connection_close(self):
- # Add a body, so we have a content-length header, which combined with
- # HTTP1.1 means the connection is kept alive.
- response = '%s/p/200:b@1' % self.server.urlbase
-
- # Lets sanity check that the connection does indeed stay open by
- # issuing two requests over the same connection
- p = self.pathoc()
- assert p.request("get:'%s'" % response)
- assert p.request("get:'%s'" % response)
-
- # Now check that the connection is closed as the client specifies
- p = self.pathoc()
- assert p.request("get:'%s':h'Connection'='close'" % response)
- # There's a race here, which means we can get any of a number of errors.
- # Rather than introduce yet another sleep into the test suite, we just
- # relax the Exception specification.
- with raises(Exception):
- p.request("get:'%s'" % response)
-
- def test_reconnect(self):
- req = "get:'%s/p/200:b@1:da'" % self.server.urlbase
- p = self.pathoc()
- assert p.request(req)
- # Server has disconnected. Mitmproxy should detect this, and reconnect.
- assert p.request(req)
- assert p.request(req)
-
- def test_get_connection_switching(self):
- def switched(l):
- for i in l:
- if "serverdisconnect" in i:
- return True
-
- req = "get:'%s/p/200:b@1'"
- p = self.pathoc()
- assert p.request(req % self.server.urlbase)
- assert p.request(req % self.server2.urlbase)
- assert switched(self.proxy.log)
-
- def test_blank_leading_line(self):
- p = self.pathoc()
- req = "get:'%s/p/201':i0,'\r\n'"
- assert p.request(req % self.server.urlbase).status_code == 201
-
- def test_invalid_headers(self):
- p = self.pathoc()
- resp = p.request("get:'http://foo':h':foo'='bar'")
- assert resp.status_code == 400
-
- def test_stream(self):
- self.master.set_stream_large_bodies(1024 * 2)
-
- self.pathod("200:b@1k")
- assert not self.master.state.view[-1].response.stream
- assert len(self.master.state.view[-1].response.content) == 1024 * 1
-
- self.pathod("200:b@3k")
- assert self.master.state.view[-1].response.stream
- assert self.master.state.view[-1].response.content == CONTENT_MISSING
- self.master.set_stream_large_bodies(None)
-
- def test_stream_modify(self):
- self.master.load_script(
- tutils.test_data.path("scripts/stream_modify.py"))
- d = self.pathod('200:b"foo"')
- assert d.content == "bar"
- self.master.unload_scripts()
-
-
-class TestHTTPAuth(tservers.HTTPProxTest):
- authenticator = http.authentication.BasicProxyAuth(
- http.authentication.PassManSingleUser(
- "test",
- "test"),
- "realm")
-
- def test_auth(self):
- assert self.pathod("202").status_code == 407
- p = self.pathoc()
- ret = p.request("""
- get
- 'http://localhost:%s/p/202'
- h'%s'='%s'
- """ % (
- self.server.port,
- http.authentication.BasicProxyAuth.AUTH_HEADER,
- authentication.assemble_http_basic_auth("basic", "test", "test")
- ))
- assert ret.status_code == 202
-
-
-class TestHTTPS(tservers.HTTPProxTest, CommonMixin, TcpMixin):
- ssl = True
- ssloptions = pathod.SSLOptions(request_client_cert=True)
-
- def test_clientcert_file(self):
- try:
- self.config.clientcerts = os.path.join(
- tutils.test_data.path("data/clientcert"), "client.pem")
- f = self.pathod("304")
- assert f.status_code == 304
- assert self.server.last_log()["request"]["clientcert"]["keyinfo"]
- finally:
- self.config.clientcerts = None
-
- def test_clientcert_dir(self):
- try:
- self.config.clientcerts = tutils.test_data.path("data/clientcert")
- f = self.pathod("304")
- assert f.status_code == 304
- assert self.server.last_log()["request"]["clientcert"]["keyinfo"]
- finally:
- self.config.clientcerts = None
-
- def test_error_post_connect(self):
- p = self.pathoc()
- assert p.request("get:/:i0,'invalid\r\n\r\n'").status_code == 400
-
-
-class TestHTTPSCertfile(tservers.HTTPProxTest, CommonMixin):
- ssl = True
- certfile = True
-
- def test_certfile(self):
- assert self.pathod("304")
-
-
-class TestHTTPSUpstreamServerVerificationWTrustedCert(tservers.HTTPProxTest):
-
- """
- Test upstream server certificate verification with a trusted server cert.
- """
- ssl = True
- ssloptions = pathod.SSLOptions(
- cn="trusted-cert",
- certs=[
- ("trusted-cert", tutils.test_data.path("data/trusted-server.crt"))
- ])
-
- def test_verification_w_cadir(self):
- self.config.openssl_verification_mode_server = SSL.VERIFY_PEER
- self.config.openssl_trusted_cadir_server = tutils.test_data.path(
- "data/trusted-cadir/")
-
- self.pathoc()
-
- def test_verification_w_pemfile(self):
- self.config.openssl_verification_mode_server = SSL.VERIFY_PEER
- self.config.openssl_trusted_ca_server = tutils.test_data.path(
- "data/trusted-cadir/trusted-ca.pem")
-
- self.pathoc()
-
-
-class TestHTTPSUpstreamServerVerificationWBadCert(tservers.HTTPProxTest):
-
- """
- Test upstream server certificate verification with an untrusted server cert.
- """
- ssl = True
- ssloptions = pathod.SSLOptions(
- cn="untrusted-cert",
- certs=[
- ("untrusted-cert", tutils.test_data.path("data/untrusted-server.crt"))
- ])
-
- def _request(self):
- p = self.pathoc()
- # We need to make an actual request because the upstream connection is lazy-loaded.
- return p.request("get:/p/242")
-
- def test_default_verification_w_bad_cert(self):
- """Should use no verification."""
- self.config.openssl_trusted_ca_server = tutils.test_data.path(
- "data/trusted-cadir/trusted-ca.pem")
-
- assert self._request().status_code == 242
-
- def test_no_verification_w_bad_cert(self):
- self.config.openssl_verification_mode_server = SSL.VERIFY_NONE
- self.config.openssl_trusted_ca_server = tutils.test_data.path(
- "data/trusted-cadir/trusted-ca.pem")
-
- assert self._request().status_code == 242
-
- def test_verification_w_bad_cert(self):
- self.config.openssl_verification_mode_server = SSL.VERIFY_PEER
- self.config.openssl_trusted_ca_server = tutils.test_data.path(
- "data/trusted-cadir/trusted-ca.pem")
-
- assert self._request().status_code == 502
-
-
-class TestHTTPSNoCommonName(tservers.HTTPProxTest):
-
- """
- Test what happens if we get a cert without common name back.
- """
- ssl = True
- ssloptions = pathod.SSLOptions(
- certs=[
- ("*", tutils.test_data.path("data/no_common_name.pem"))
- ]
- )
-
- def test_http(self):
- f = self.pathod("202")
- assert f.sslinfo.certchain[0].get_subject().CN == "127.0.0.1"
-
-
-class TestReverse(tservers.ReverseProxTest, CommonMixin, TcpMixin):
- reverse = True
-
-
-class TestSocks5(tservers.SocksModeTest):
-
- def test_simple(self):
- p = self.pathoc()
- p.socks_connect(("localhost", self.server.port))
- f = p.request("get:/p/200")
- assert f.status_code == 200
-
- def test_with_authentication_only(self):
- p = self.pathoc()
- f = p.request("get:/p/200")
- assert f.status_code == 502
- assert "SOCKS5 mode failure" in f.content
-
- def test_no_connect(self):
- """
- mitmproxy doesn't support UDP or BIND SOCKS CMDs
- """
- p = self.pathoc()
-
- socks.ClientGreeting(
- socks.VERSION.SOCKS5,
- [socks.METHOD.NO_AUTHENTICATION_REQUIRED]
- ).to_file(p.wfile)
- socks.Message(
- socks.VERSION.SOCKS5,
- socks.CMD.BIND,
- socks.ATYP.DOMAINNAME,
- ("example.com", 8080)
- ).to_file(p.wfile)
-
- p.wfile.flush()
- p.rfile.read(2) # read server greeting
- f = p.request("get:/p/200") # the request doesn't matter, error response from handshake will be read anyway.
- assert f.status_code == 502
- assert "SOCKS5 mode failure" in f.content
-
-
-class TestHttps2Http(tservers.ReverseProxTest):
-
- @classmethod
- def get_proxy_config(cls):
- d = super(TestHttps2Http, cls).get_proxy_config()
- d["upstream_server"] = ("http", d["upstream_server"][1])
- return d
-
- def pathoc(self, ssl, sni=None):
- """
- Returns a connected Pathoc instance.
- """
- p = pathoc.Pathoc(
- ("localhost", self.proxy.port), ssl=True, sni=sni, fp=None
- )
- p.connect()
- return p
-
- def test_all(self):
- p = self.pathoc(ssl=True)
- assert p.request("get:'/p/200'").status_code == 200
-
- def test_sni(self):
- p = self.pathoc(ssl=True, sni="example.com")
- assert p.request("get:'/p/200'").status_code == 200
- assert all("Error in handle_sni" not in msg for msg in self.proxy.log)
-
- def test_http(self):
- p = self.pathoc(ssl=False)
- assert p.request("get:'/p/200'").status_code == 200
-
-
-class TestTransparent(tservers.TransparentProxTest, CommonMixin, TcpMixin):
- ssl = False
-
- def test_tcp_stream_modify(self):
- self.master.load_script(
- tutils.test_data.path("scripts/tcp_stream_modify.py"))
-
- self._tcpproxy_on()
- d = self.pathod('200:b"foo"')
- self._tcpproxy_off()
-
- assert d.content == "bar"
-
- self.master.unload_scripts()
-
-
-class TestTransparentSSL(tservers.TransparentProxTest, CommonMixin, TcpMixin):
- ssl = True
-
- def test_sslerr(self):
- p = pathoc.Pathoc(("localhost", self.proxy.port), fp=None)
- p.connect()
- r = p.request("get:/")
- assert r.status_code == 502
-
-
-class TestProxy(tservers.HTTPProxTest):
-
- def test_http(self):
- f = self.pathod("304")
- assert f.status_code == 304
-
- f = self.master.state.view[0]
- assert f.client_conn.address
- assert "host" in f.request.headers
- assert f.response.status_code == 304
-
- @tutils.skip_appveyor
- def test_response_timestamps(self):
- # test that we notice at least 1 sec delay between timestamps
- # in response object
- f = self.pathod("304:b@1k:p50,1")
- assert f.status_code == 304
-
- response = self.master.state.view[0].response
- # timestamp_start might fire a bit late, so we play safe and only require 300ms.
- assert 0.3 <= response.timestamp_end - response.timestamp_start
-
- @tutils.skip_appveyor
- def test_request_timestamps(self):
- # test that we notice a delay between timestamps in request object
- connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- connection.connect(("127.0.0.1", self.proxy.port))
-
- # call pathod server, wait a second to complete the request
- connection.send(
- "GET http://localhost:%d/p/304:b@1k HTTP/1.1\r\n" %
- self.server.port)
- time.sleep(1)
- connection.send("\r\n")
- connection.recv(50000)
- connection.close()
-
- request, response = self.master.state.view[
- 0].request, self.master.state.view[0].response
- assert response.status_code == 304 # sanity test for our low level request
- # timestamp_start might fire a bit late, so we play safe and only require 300ms.
- assert 0.3 <= request.timestamp_end - request.timestamp_start
-
- def test_request_tcp_setup_timestamp_presence(self):
- # tests that the client_conn a tcp connection has a tcp_setup_timestamp
- connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- connection.connect(("localhost", self.proxy.port))
- connection.send(
- "GET http://localhost:%d/p/200:b@1k HTTP/1.1\r\n" %
- self.server.port)
- connection.send("\r\n")
- # a bit hacky: make sure that we don't just read the headers only.
- recvd = 0
- while recvd < 1024:
- recvd += len(connection.recv(5000))
- connection.send(
- "GET http://localhost:%d/p/200:b@1k HTTP/1.1\r\n" %
- self.server.port)
- connection.send("\r\n")
- recvd = 0
- while recvd < 1024:
- recvd += len(connection.recv(5000))
- connection.close()
-
- first_flow = self.master.state.view[0]
- second_flow = self.master.state.view[1]
- assert first_flow.server_conn.timestamp_tcp_setup
- assert first_flow.server_conn.timestamp_ssl_setup is None
- assert second_flow.server_conn.timestamp_tcp_setup
- assert first_flow.server_conn.timestamp_tcp_setup == second_flow.server_conn.timestamp_tcp_setup
-
- def test_request_ip(self):
- f = self.pathod("200:b@100")
- assert f.status_code == 200
- f = self.master.state.view[0]
- assert f.server_conn.address == ("127.0.0.1", self.server.port)
-
-
-class TestProxySSL(tservers.HTTPProxTest):
- ssl = True
-
- def test_request_ssl_setup_timestamp_presence(self):
- # tests that the ssl timestamp is present when ssl is used
- f = self.pathod("304:b@10k")
- assert f.status_code == 304
- first_flow = self.master.state.view[0]
- assert first_flow.server_conn.timestamp_ssl_setup
-
-
-class MasterRedirectRequest(tservers.TestMaster):
- redirect_port = None # Set by TestRedirectRequest
-
- def handle_request(self, f):
- if f.request.path == "/p/201":
-
- # This part should have no impact, but it should also not cause any exceptions.
- addr = f.live.server_conn.address
- addr2 = Address(("127.0.0.1", self.redirect_port))
- f.live.set_server(addr2)
- f.live.set_server(addr)
-
- # This is the actual redirection.
- f.request.port = self.redirect_port
- super(MasterRedirectRequest, self).handle_request(f)
-
- def handle_response(self, f):
- f.response.content = str(f.client_conn.address.port)
- f.response.headers["server-conn-id"] = str(f.server_conn.source_address.port)
- super(MasterRedirectRequest, self).handle_response(f)
-
-
-class TestRedirectRequest(tservers.HTTPProxTest):
- masterclass = MasterRedirectRequest
- ssl = True
-
- def test_redirect(self):
- """
- Imagine a single HTTPS connection with three requests:
-
- 1. First request should pass through unmodified
- 2. Second request will be redirected to a different host by an inline script
- 3. Third request should pass through unmodified
-
- This test verifies that the original destination is restored for the third request.
- """
- self.master.redirect_port = self.server2.port
-
- p = self.pathoc()
-
- self.server.clear_log()
- self.server2.clear_log()
- r1 = p.request("get:'/p/200'")
- assert r1.status_code == 200
- assert self.server.last_log()
- assert not self.server2.last_log()
-
- self.server.clear_log()
- self.server2.clear_log()
- r2 = p.request("get:'/p/201'")
- assert r2.status_code == 201
- assert not self.server.last_log()
- assert self.server2.last_log()
-
- self.server.clear_log()
- self.server2.clear_log()
- r3 = p.request("get:'/p/202'")
- assert r3.status_code == 202
- assert self.server.last_log()
- assert not self.server2.last_log()
-
- assert r1.content == r2.content == r3.content
-
-
-class MasterStreamRequest(tservers.TestMaster):
-
- """
- Enables the stream flag on the flow for all requests
- """
-
- def handle_responseheaders(self, f):
- f.response.stream = True
- f.reply()
-
-
-class TestStreamRequest(tservers.HTTPProxTest):
- masterclass = MasterStreamRequest
-
- def test_stream_simple(self):
- p = self.pathoc()
-
- # a request with 100k of data but without content-length
- r1 = p.request("get:'%s/p/200:r:b@100k:d102400'" % self.server.urlbase)
- assert r1.status_code == 200
- assert len(r1.content) > 100000
-
- def test_stream_multiple(self):
- p = self.pathoc()
-
- # simple request with streaming turned on
- r1 = p.request("get:'%s/p/200'" % self.server.urlbase)
- assert r1.status_code == 200
-
- # now send back 100k of data, streamed but not chunked
- r1 = p.request("get:'%s/p/201:b@100k'" % self.server.urlbase)
- assert r1.status_code == 201
-
- def test_stream_chunked(self):
- connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- connection.connect(("127.0.0.1", self.proxy.port))
- fconn = connection.makefile()
- spec = '200:h"Transfer-Encoding"="chunked":r:b"4\\r\\nthis\\r\\n11\\r\\nisatest__reachhex\\r\\n0\\r\\n\\r\\n"'
- connection.send(
- "GET %s/p/%s HTTP/1.1\r\n" %
- (self.server.urlbase, spec))
- connection.send("\r\n")
-
- resp = http1.read_response_head(fconn)
-
- assert resp.headers["Transfer-Encoding"] == 'chunked'
- assert resp.status_code == 200
-
- chunks = list(http1.read_body(fconn, None))
- assert chunks == ["this", "isatest__reachhex"]
-
- connection.close()
-
-
-class MasterFakeResponse(tservers.TestMaster):
-
- def handle_request(self, f):
- resp = HTTPResponse.wrap(netlib.tutils.tresp())
- f.reply(resp)
-
-
-class TestFakeResponse(tservers.HTTPProxTest):
- masterclass = MasterFakeResponse
-
- def test_fake(self):
- f = self.pathod("200")
- assert "header-response" in f.headers
-
-
-class TestServerConnect(tservers.HTTPProxTest):
- masterclass = MasterFakeResponse
- no_upstream_cert = True
- ssl = True
-
- def test_unnecessary_serverconnect(self):
- """A replayed/fake response with no_upstream_cert should not connect to an upstream server"""
- assert self.pathod("200").status_code == 200
- for msg in self.proxy.tmaster.log:
- assert "serverconnect" not in msg
-
-
-class MasterKillRequest(tservers.TestMaster):
-
- def handle_request(self, f):
- f.reply(Kill)
-
-
-class TestKillRequest(tservers.HTTPProxTest):
- masterclass = MasterKillRequest
-
- def test_kill(self):
- with raises(HttpReadDisconnect):
- self.pathod("200")
- # Nothing should have hit the server
- assert not self.server.last_log()
-
-
-class MasterKillResponse(tservers.TestMaster):
-
- def handle_response(self, f):
- f.reply(Kill)
-
-
-class TestKillResponse(tservers.HTTPProxTest):
- masterclass = MasterKillResponse
-
- def test_kill(self):
- with raises(HttpReadDisconnect):
- self.pathod("200")
- # The server should have seen a request
- assert self.server.last_log()
-
-
-class EResolver(tservers.TResolver):
-
- def original_addr(self, sock):
- raise RuntimeError("Could not resolve original destination.")
-
-
-class TestTransparentResolveError(tservers.TransparentProxTest):
- resolver = EResolver
-
- def test_resolve_error(self):
- assert self.pathod("304").status_code == 502
-
-
-class MasterIncomplete(tservers.TestMaster):
-
- def handle_request(self, f):
- resp = HTTPResponse.wrap(netlib.tutils.tresp())
- resp.content = CONTENT_MISSING
- f.reply(resp)
-
-
-class TestIncompleteResponse(tservers.HTTPProxTest):
- masterclass = MasterIncomplete
-
- def test_incomplete(self):
- assert self.pathod("200").status_code == 502
-
-
-class TestUpstreamProxy(tservers.HTTPUpstreamProxTest, CommonMixin, AppMixin):
- ssl = False
-
- def test_order(self):
- self.proxy.tmaster.replacehooks.add(
- "~q",
- "foo",
- "bar") # replace in request
- self.chain[0].tmaster.replacehooks.add("~q", "bar", "baz")
- self.chain[1].tmaster.replacehooks.add("~q", "foo", "oh noes!")
- self.chain[0].tmaster.replacehooks.add(
- "~s",
- "baz",
- "ORLY") # replace in response
-
- p = self.pathoc()
- req = p.request("get:'%s/p/418:b\"foo\"'" % self.server.urlbase)
- assert req.content == "ORLY"
- assert req.status_code == 418
-
-
-class TestUpstreamProxySSL(
- tservers.HTTPUpstreamProxTest,
- CommonMixin,
- TcpMixin):
- ssl = True
-
- def _host_pattern_on(self, attr):
- """
- Updates config.check_tcp or check_ignore, depending on attr.
- """
- assert not hasattr(self, "_ignore_%s_backup" % attr)
- backup = []
- for proxy in self.chain:
- old_matcher = getattr(
- proxy.tmaster.server.config,
- "check_%s" %
- attr)
- backup.append(old_matcher)
- setattr(
- proxy.tmaster.server.config,
- "check_%s" % attr,
- HostMatcher([".+:%s" % self.server.port] + old_matcher.patterns)
- )
-
- setattr(self, "_ignore_%s_backup" % attr, backup)
-
- def _host_pattern_off(self, attr):
- backup = getattr(self, "_ignore_%s_backup" % attr)
- for proxy in reversed(self.chain):
- setattr(
- proxy.tmaster.server.config,
- "check_%s" % attr,
- backup.pop()
- )
-
- assert not backup
- delattr(self, "_ignore_%s_backup" % attr)
-
- def _ignore_on(self):
- super(TestUpstreamProxySSL, self)._ignore_on()
- self._host_pattern_on("ignore")
-
- def _ignore_off(self):
- super(TestUpstreamProxySSL, self)._ignore_off()
- self._host_pattern_off("ignore")
-
- def _tcpproxy_on(self):
- super(TestUpstreamProxySSL, self)._tcpproxy_on()
- self._host_pattern_on("tcp")
-
- def _tcpproxy_off(self):
- super(TestUpstreamProxySSL, self)._tcpproxy_off()
- self._host_pattern_off("tcp")
-
- def test_simple(self):
- p = self.pathoc()
- req = p.request("get:'/p/418:b\"content\"'")
- assert req.content == "content"
- assert req.status_code == 418
-
- # CONNECT from pathoc to chain[0],
- assert self.proxy.tmaster.state.flow_count() == 2
- # request from pathoc to chain[0]
- # CONNECT from proxy to chain[1],
- assert self.chain[0].tmaster.state.flow_count() == 2
- # request from proxy to chain[1]
- # request from chain[0] (regular proxy doesn't store CONNECTs)
- assert self.chain[1].tmaster.state.flow_count() == 1
-
-
-class TestProxyChainingSSLReconnect(tservers.HTTPUpstreamProxTest):
- ssl = True
-
- def test_reconnect(self):
- """
- Tests proper functionality of ConnectionHandler.server_reconnect mock.
- If we have a disconnect on a secure connection that's transparently proxified to
- an upstream http proxy, we need to send the CONNECT request again.
- """
-
- def kill_requests(master, attr, exclude):
- k = [0] # variable scope workaround: put into array
- _func = getattr(master, attr)
-
- def handler(f):
- k[0] += 1
- if not (k[0] in exclude):
- f.client_conn.finish()
- f.error = Error("terminated")
- f.reply(Kill)
- return _func(f)
-
- setattr(master, attr, handler)
-
- kill_requests(self.chain[1].tmaster, "handle_request",
- exclude=[
- # fail first request
- 2, # allow second request
- ])
-
- kill_requests(self.chain[0].tmaster, "handle_request",
- exclude=[
- 1, # CONNECT
- # fail first request
- 3, # reCONNECT
- 4, # request
- ])
-
- p = self.pathoc()
- req = p.request("get:'/p/418:b\"content\"'")
- assert req.content == "content"
- assert req.status_code == 418
-
- assert self.proxy.tmaster.state.flow_count() == 2 # CONNECT and request
- # CONNECT, failing request,
- assert self.chain[0].tmaster.state.flow_count() == 4
- # reCONNECT, request
- # failing request, request
- assert self.chain[1].tmaster.state.flow_count() == 2
- # (doesn't store (repeated) CONNECTs from chain[0]
- # as it is a regular proxy)
-
- assert not self.chain[1].tmaster.state.flows[0].response # killed
- assert self.chain[1].tmaster.state.flows[1].response
-
- assert self.proxy.tmaster.state.flows[0].request.form_in == "authority"
- assert self.proxy.tmaster.state.flows[1].request.form_in == "relative"
-
- assert self.chain[0].tmaster.state.flows[
- 0].request.form_in == "authority"
- assert self.chain[0].tmaster.state.flows[
- 1].request.form_in == "relative"
- assert self.chain[0].tmaster.state.flows[
- 2].request.form_in == "authority"
- assert self.chain[0].tmaster.state.flows[
- 3].request.form_in == "relative"
-
- assert self.chain[1].tmaster.state.flows[
- 0].request.form_in == "relative"
- assert self.chain[1].tmaster.state.flows[
- 1].request.form_in == "relative"
-
- req = p.request("get:'/p/418:b\"content2\"'")
-
- assert req.status_code == 502
- assert self.proxy.tmaster.state.flow_count() == 3 # + new request
- # + new request, repeated CONNECT from chain[1]
- assert self.chain[0].tmaster.state.flow_count() == 6
- # (both terminated)
- # nothing happened here
- assert self.chain[1].tmaster.state.flow_count() == 2