aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_proxy.py
blob: 6ab19e02f37ca7d7f3baa64b8fcfab894a7353da (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import argparse
from libmproxy import cmdline
from libmproxy.proxy import ProxyConfig, process_proxy_options
from libmproxy.proxy.connection import ServerConnection
from libmproxy.proxy.primitives import ProxyError
from libmproxy.proxy.server import DummyServer, ProxyServer, ConnectionHandler
import tutils
from libpathod import test
from netlib import http, tcp
import mock

from OpenSSL import SSL


def test_proxy_error():
    p = ProxyError(111, "msg")
    assert str(p)


class TestServerConnection:
    def setUp(self):
        self.d = test.Daemon()

    def tearDown(self):
        self.d.shutdown()

    def test_simple(self):
        sc = ServerConnection((self.d.IFACE, self.d.port))
        sc.connect()
        f = tutils.tflow()
        f.server_conn = sc
        f.request.path = "/p/200:da"

        # use this protocol just to assemble - not for actual sending
        protocol = http.http1.HTTP1Protocol(rfile=sc.rfile)
        sc.send(protocol.assemble(f.request))

        protocol = http.http1.HTTP1Protocol(rfile=sc.rfile)
        assert protocol.read_response(f.request.method, 1000)
        assert self.d.last_log()

        sc.finish()

    def test_terminate_error(self):
        sc = ServerConnection((self.d.IFACE, self.d.port))
        sc.connect()
        sc.connection = mock.Mock()
        sc.connection.recv = mock.Mock(return_value=False)
        sc.connection.flush = mock.Mock(side_effect=tcp.NetLibDisconnect)
        sc.finish()

    def test_repr(self):
        sc = tutils.tserver_conn()
        assert "address:22" in repr(sc)
        assert "ssl" not in repr(sc)
        sc.ssl_established = True
        assert "ssl" in repr(sc)
        sc.sni = "foo"
        assert "foo" in repr(sc)


class TestProcessProxyOptions:
    def p(self, *args):
        parser = tutils.MockParser()
        cmdline.common_options(parser)
        opts = parser.parse_args(args=args)
        return parser, process_proxy_options(parser, opts)

    def assert_err(self, err, *args):
        tutils.raises(err, self.p, *args)

    def assert_noerr(self, *args):
        m, p = self.p(*args)
        assert p
        return p

    def test_simple(self):
        assert self.p()

    def test_cadir(self):
        with tutils.tmpdir() as cadir:
            self.assert_noerr("--cadir", cadir)

    @mock.patch("libmproxy.platform.resolver", None)
    def test_no_transparent(self):
        self.assert_err("transparent mode not supported", "-T")

    @mock.patch("libmproxy.platform.resolver")
    def test_modes(self, _):
        self.assert_noerr("-R", "http://localhost")
        self.assert_err("expected one argument", "-R")
        self.assert_err("Invalid server specification", "-R", "reverse")

        self.assert_noerr("-T")

        self.assert_noerr("-U", "http://localhost")
        self.assert_err("expected one argument", "-U")
        self.assert_err("Invalid server specification", "-U", "upstream")

        self.assert_noerr("--spoof")
        self.assert_noerr("--ssl-spoof")

        self.assert_noerr("--spoofed-port", "443")
        self.assert_err("expected one argument", "--spoofed-port")

        self.assert_err("mutually exclusive", "-R", "http://localhost", "-T")

    def test_client_certs(self):
        with tutils.tmpdir() as cadir:
            self.assert_noerr("--client-certs", cadir)
            self.assert_err(
                "directory does not exist",
                "--client-certs",
                "nonexistent")

    def test_certs(self):
        with tutils.tmpdir() as cadir:
            self.assert_noerr(
                "--cert",
                tutils.test_data.path("data/testkey.pem"))
            self.assert_err("does not exist", "--cert", "nonexistent")

    def test_auth(self):
        p = self.assert_noerr("--nonanonymous")
        assert p.authenticator

        p = self.assert_noerr(
            "--htpasswd",
            tutils.test_data.path("data/htpasswd"))
        assert p.authenticator
        self.assert_err(
            "malformed htpasswd file",
            "--htpasswd",
            tutils.test_data.path("data/htpasswd.invalid"))

        p = self.assert_noerr("--singleuser", "test:test")
        assert p.authenticator
        self.assert_err(
            "invalid single-user specification",
            "--singleuser",
            "test")

    def test_verify_upstream_cert(self):
        p = self.assert_noerr("--verify-upstream-cert")
        assert p.openssl_verification_mode_server == SSL.VERIFY_PEER

    def test_upstream_trusted_cadir(self):
        expected_dir = "/path/to/a/ca/dir"
        p = self.assert_noerr("--upstream-trusted-cadir", expected_dir)
        assert p.openssl_trusted_cadir_server == expected_dir

    def test_upstream_trusted_ca(self):
        expected_file = "/path/to/a/cert/file"
        p = self.assert_noerr("--upstream-trusted-ca", expected_file)
        assert p.openssl_trusted_ca_server == expected_file


class TestProxyServer:
    # binding to 0.0.0.0:1 works without special permissions on Windows
    @tutils.SkipWindows
    def test_err(self):
        conf = ProxyConfig(
            port=1
        )
        tutils.raises("error starting proxy server", ProxyServer, conf)

    def test_err_2(self):
        conf = ProxyConfig(
            host="invalidhost"
        )
        tutils.raises("error starting proxy server", ProxyServer, conf)


class TestDummyServer:
    def test_simple(self):
        d = DummyServer(None)
        d.start_slave()
        d.shutdown()


class TestConnectionHandler:
    def test_fatal_error(self):
        config = mock.Mock()
        config.mode.get_upstream_server.side_effect = RuntimeError
        c = ConnectionHandler(
            config,
            mock.MagicMock(),
            ("127.0.0.1",
             8080),
            None,
            mock.MagicMock())
        with tutils.capture_stderr(c.handle) as output:
            assert "mitmproxy has crashed" in output