import mock from OpenSSL import SSL from libmproxy import cmdline from libmproxy.proxy import ProxyConfig from libmproxy.proxy.config import process_proxy_options from libmproxy.models.connections import ServerConnection from libmproxy.proxy.server import DummyServer, ProxyServer, ConnectionHandler from netlib.exceptions import TcpDisconnect import tutils from libpathod import test from netlib import http, tcp from netlib.http import http1 class TestServerConnection: def setUp(self): self.d = test.Daemon() def tearDown(self): self.d.shutdown() def test_simple(self): sc = ServerConnection((self.d.IFACE, self.d.port)) sc.connect() f = tutils.tflow() f.server_conn = sc f.request.path = "/p/200:da" # use this protocol just to assemble - not for actual sending sc.wfile.write(http1.assemble_request(f.request)) sc.wfile.flush() assert http1.read_response(sc.rfile, f.request, 1000) assert self.d.last_log() sc.finish() def test_terminate_error(self): sc = ServerConnection((self.d.IFACE, self.d.port)) sc.connect() sc.connection = mock.Mock() sc.connection.recv = mock.Mock(return_value=False) sc.connection.flush = mock.Mock(side_effect=TcpDisconnect) sc.finish() def test_repr(self): sc = tutils.tserver_conn() assert "address:22" in repr(sc) assert "ssl" not in repr(sc) sc.ssl_established = True assert "ssl" in repr(sc) sc.sni = "foo" assert "foo" in repr(sc) class TestProcessProxyOptions: def p(self, *args): parser = tutils.MockParser() cmdline.common_options(parser) opts = parser.parse_args(args=args) return parser, process_proxy_options(parser, opts) def assert_err(self, err, *args): tutils.raises(err, self.p, *args) def assert_noerr(self, *args): m, p = self.p(*args) assert p return p def test_simple(self): assert self.p() def test_cadir(self): with tutils.tmpdir() as cadir: self.assert_noerr("--cadir", cadir) @mock.patch("libmproxy.platform.resolver", None) def test_no_transparent(self): self.assert_err("transparent mode not supported", "-T") @mock.patch("libmproxy.platform.resolver") def test_modes(self, _): self.assert_noerr("-R", "http://localhost") self.assert_err("expected one argument", "-R") self.assert_err("Invalid server specification", "-R", "reverse") self.assert_noerr("-T") self.assert_noerr("-U", "http://localhost") self.assert_err("expected one argument", "-U") self.assert_err("Invalid server specification", "-U", "upstream") self.assert_err("not allowed with", "-R", "http://localhost", "-T") def test_socks_auth(self): self.assert_err("Proxy Authentication not supported in SOCKS mode.", "--socks", "--nonanonymous") def test_client_certs(self): with tutils.tmpdir() as cadir: self.assert_noerr("--client-certs", cadir) self.assert_err( "directory does not exist", "--client-certs", "nonexistent") def test_certs(self): with tutils.tmpdir() as cadir: self.assert_noerr( "--cert", tutils.test_data.path("data/testkey.pem")) self.assert_err("does not exist", "--cert", "nonexistent") def test_auth(self): p = self.assert_noerr("--nonanonymous") assert p.authenticator p = self.assert_noerr( "--htpasswd", tutils.test_data.path("data/htpasswd")) assert p.authenticator self.assert_err( "malformed htpasswd file", "--htpasswd", tutils.test_data.path("data/htpasswd.invalid")) p = self.assert_noerr("--singleuser", "test:test") assert p.authenticator self.assert_err( "invalid single-user specification", "--singleuser", "test") def test_verify_upstream_cert(self): p = self.assert_noerr("--verify-upstream-cert") assert p.openssl_verification_mode_server == SSL.VERIFY_PEER def test_upstream_trusted_cadir(self): expected_dir = "/path/to/a/ca/dir" p = self.assert_noerr("--upstream-trusted-cadir", expected_dir) assert p.openssl_trusted_cadir_server == expected_dir def test_upstream_trusted_ca(self): expected_file = "/path/to/a/cert/file" p = self.assert_noerr("--upstream-trusted-ca", expected_file) assert p.openssl_trusted_ca_server == expected_file class TestProxyServer: # binding to 0.0.0.0:1 works without special permissions on Windows @tutils.SkipWindows def test_err(self): conf = ProxyConfig( port=1 ) tutils.raises("error starting proxy server", ProxyServer, conf) def test_err_2(self): conf = ProxyConfig( host="invalidhost" ) tutils.raises("error starting proxy server", ProxyServer, conf) class TestDummyServer: def test_simple(self): d = DummyServer(None) d.start_slave() d.shutdown() class TestConnectionHandler: def test_fatal_error(self): config = mock.Mock() root_layer = mock.Mock() root_layer.side_effect = RuntimeError config.mode.return_value = root_layer channel = mock.Mock() def ask(_, x): return x channel.ask = ask c = ConnectionHandler( mock.MagicMock(), ("127.0.0.1", 8080), config, channel ) with tutils.capture_stderr(c.handle) as output: assert "mitmproxy has crashed" in output