import os import mock from OpenSSL import SSL from libmproxy import cmdline from libmproxy.proxy import ProxyConfig from libmproxy.proxy.config import process_proxy_options from libmproxy.models.connections import ServerConnection from libmproxy.proxy.server import DummyServer, ProxyServer, ConnectionHandler from netlib.exceptions import TcpDisconnect from libpathod import test from netlib.http import http1 from . import tutils class TestServerConnection(object): def test_simple(self): self.d = test.Daemon() sc = ServerConnection((self.d.IFACE, self.d.port)) sc.connect() f = tutils.tflow() f.server_conn = sc f.request.path = "/p/200:da" # use this protocol just to assemble - not for actual sending sc.wfile.write(http1.assemble_request(f.request)) sc.wfile.flush() assert http1.read_response(sc.rfile, f.request, 1000) assert self.d.last_log() sc.finish() self.d.shutdown() def test_terminate_error(self): self.d = test.Daemon() sc = ServerConnection((self.d.IFACE, self.d.port)) sc.connect() sc.connection = mock.Mock() sc.connection.recv = mock.Mock(return_value=False) sc.connection.flush = mock.Mock(side_effect=TcpDisconnect) sc.finish() self.d.shutdown() def test_repr(self): sc = tutils.tserver_conn() assert "address:22" in repr(sc) assert "ssl" not in repr(sc) sc.ssl_established = True assert "ssl" in repr(sc) sc.sni = "foo" assert "foo" in repr(sc) class TestProcessProxyOptions: def p(self, *args): parser = tutils.MockParser() cmdline.common_options(parser) opts = parser.parse_args(args=args) return parser, process_proxy_options(parser, opts) def assert_err(self, err, *args): tutils.raises(err, self.p, *args) def assert_noerr(self, *args): m, p = self.p(*args) assert p return p def test_simple(self): assert self.p() def test_cadir(self): with tutils.tmpdir() as cadir: self.assert_noerr("--cadir", cadir) @mock.patch("libmproxy.platform.resolver", None) def test_no_transparent(self): self.assert_err("transparent mode not supported", "-T") @mock.patch("libmproxy.platform.resolver") def test_modes(self, _): self.assert_noerr("-R", "http://localhost") self.assert_err("expected one argument", "-R") self.assert_err("Invalid server specification", "-R", "reverse") self.assert_noerr("-T") self.assert_noerr("-U", "http://localhost") self.assert_err("expected one argument", "-U") self.assert_err("Invalid server specification", "-U", "upstream") self.assert_err("not allowed with", "-R", "http://localhost", "-T") def test_socks_auth(self): self.assert_err("Proxy Authentication not supported in SOCKS mode.", "--socks", "--nonanonymous") def test_client_certs(self): with tutils.tmpdir() as cadir: self.assert_noerr("--client-certs", cadir) self.assert_noerr( "--client-certs", os.path.join(tutils.test_data.path("data/clientcert"), "client.pem")) self.assert_err( "path does not exist", "--client-certs", "nonexistent") def test_certs(self): with tutils.tmpdir() as cadir: self.assert_noerr( "--cert", tutils.test_data.path("data/testkey.pem")) self.assert_err("does not exist", "--cert", "nonexistent") def test_auth(self): p = self.assert_noerr("--nonanonymous") assert p.authenticator p = self.assert_noerr( "--htpasswd", tutils.test_data.path("data/htpasswd")) assert p.authenticator self.assert_err( "malformed htpasswd file", "--htpasswd", tutils.test_data.path("data/htpasswd.invalid")) p = self.assert_noerr("--singleuser", "test:test") assert p.authenticator self.assert_err( "invalid single-user specification", "--singleuser", "test") def test_verify_upstream_cert(self): p = self.assert_noerr("--verify-upstream-cert") assert p.openssl_verification_mode_server == SSL.VERIFY_PEER def test_upstream_trusted_cadir(self): expected_dir = "/path/to/a/ca/dir" p = self.assert_noerr("--upstream-trusted-cadir", expected_dir) assert p.openssl_trusted_cadir_server == expected_dir def test_upstream_trusted_ca(self): expected_file = "/path/to/a/cert/file" p = self.assert_noerr("--upstream-trusted-ca", expected_file) assert p.openssl_trusted_ca_server == expected_file class TestProxyServer: # binding to 0.0.0.0:1 works without special permissions on Windows @tutils.skip_windows def test_err(self): conf = ProxyConfig( port=1 ) tutils.raises("error starting proxy server", ProxyServer, conf) def test_err_2(self): conf = ProxyConfig( host="invalidhost" ) tutils.raises("error starting proxy server", ProxyServer, conf) class TestDummyServer: def test_simple(self): d = DummyServer(None) d.start_slave() d.shutdown() class TestConnectionHandler: def test_fatal_error(self): config = mock.Mock() root_layer = mock.Mock() root_layer.side_effect = RuntimeError config.mode.return_value = root_layer channel = mock.Mock() def ask(_, x): return x channel.ask = ask c = ConnectionHandler( mock.MagicMock(), ("127.0.0.1", 8080), config, channel ) with tutils.capture_stderr(c.handle) as output: assert "mitmproxy has crashed" in output