aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_proxy.py
blob: 27ae70a8ff49b08e1c7d4f397816bb6aff2ef6bb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import os
import mock
from OpenSSL import SSL

from libmproxy import cmdline
from libmproxy.proxy import ProxyConfig
from libmproxy.proxy.config import process_proxy_options
from libmproxy.models.connections import ServerConnection
from libmproxy.proxy.server import DummyServer, ProxyServer, ConnectionHandler
from netlib.exceptions import TcpDisconnect
from libpathod import test
from netlib.http import http1
from . import tutils


class TestServerConnection(object):

    def test_simple(self):
        self.d = test.Daemon()
        sc = ServerConnection((self.d.IFACE, self.d.port))
        sc.connect()
        f = tutils.tflow()
        f.server_conn = sc
        f.request.path = "/p/200:da"

        # use this protocol just to assemble - not for actual sending
        sc.wfile.write(http1.assemble_request(f.request))
        sc.wfile.flush()

        assert http1.read_response(sc.rfile, f.request, 1000)
        assert self.d.last_log()

        sc.finish()
        self.d.shutdown()

    def test_terminate_error(self):
        self.d = test.Daemon()
        sc = ServerConnection((self.d.IFACE, self.d.port))
        sc.connect()
        sc.connection = mock.Mock()
        sc.connection.recv = mock.Mock(return_value=False)
        sc.connection.flush = mock.Mock(side_effect=TcpDisconnect)
        sc.finish()
        self.d.shutdown()

    def test_repr(self):
        sc = tutils.tserver_conn()
        assert "address:22" in repr(sc)
        assert "ssl" not in repr(sc)
        sc.ssl_established = True
        assert "ssl" in repr(sc)
        sc.sni = "foo"
        assert "foo" in repr(sc)


class TestProcessProxyOptions:

    def p(self, *args):
        parser = tutils.MockParser()
        cmdline.common_options(parser)
        opts = parser.parse_args(args=args)
        return parser, process_proxy_options(parser, opts)

    def assert_err(self, err, *args):
        tutils.raises(err, self.p, *args)

    def assert_noerr(self, *args):
        m, p = self.p(*args)
        assert p
        return p

    def test_simple(self):
        assert self.p()

    def test_cadir(self):
        with tutils.tmpdir() as cadir:
            self.assert_noerr("--cadir", cadir)

    @mock.patch("libmproxy.platform.resolver", None)
    def test_no_transparent(self):
        self.assert_err("transparent mode not supported", "-T")

    @mock.patch("libmproxy.platform.resolver")
    def test_modes(self, _):
        self.assert_noerr("-R", "http://localhost")
        self.assert_err("expected one argument", "-R")
        self.assert_err("Invalid server specification", "-R", "reverse")

        self.assert_noerr("-T")

        self.assert_noerr("-U", "http://localhost")
        self.assert_err("expected one argument", "-U")
        self.assert_err("Invalid server specification", "-U", "upstream")

        self.assert_err("not allowed with", "-R", "http://localhost", "-T")

    def test_socks_auth(self):
        self.assert_err("Proxy Authentication not supported in SOCKS mode.", "--socks", "--nonanonymous")

    def test_client_certs(self):
        with tutils.tmpdir() as cadir:
            self.assert_noerr("--client-certs", cadir)
            self.assert_noerr(
                "--client-certs",
                os.path.join(tutils.test_data.path("data/clientcert"), "client.pem"))
            self.assert_err(
                "path does not exist",
                "--client-certs",
                "nonexistent")

    def test_certs(self):
        with tutils.tmpdir() as cadir:
            self.assert_noerr(
                "--cert",
                tutils.test_data.path("data/testkey.pem"))
            self.assert_err("does not exist", "--cert", "nonexistent")

    def test_auth(self):
        p = self.assert_noerr("--nonanonymous")
        assert p.authenticator

        p = self.assert_noerr(
            "--htpasswd",
            tutils.test_data.path("data/htpasswd"))
        assert p.authenticator
        self.assert_err(
            "malformed htpasswd file",
            "--htpasswd",
            tutils.test_data.path("data/htpasswd.invalid"))

        p = self.assert_noerr("--singleuser", "test:test")
        assert p.authenticator
        self.assert_err(
            "invalid single-user specification",
            "--singleuser",
            "test")

    def test_verify_upstream_cert(self):
        p = self.assert_noerr("--verify-upstream-cert")
        assert p.openssl_verification_mode_server == SSL.VERIFY_PEER

    def test_upstream_trusted_cadir(self):
        expected_dir = "/path/to/a/ca/dir"
        p = self.assert_noerr("--upstream-trusted-cadir", expected_dir)
        assert p.openssl_trusted_cadir_server == expected_dir

    def test_upstream_trusted_ca(self):
        expected_file = "/path/to/a/cert/file"
        p = self.assert_noerr("--upstream-trusted-ca", expected_file)
        assert p.openssl_trusted_ca_server == expected_file


class TestProxyServer:
    # binding to 0.0.0.0:1 works without special permissions on Windows

    @tutils.skip_windows
    def test_err(self):
        conf = ProxyConfig(
            port=1
        )
        tutils.raises("error starting proxy server", ProxyServer, conf)

    def test_err_2(self):
        conf = ProxyConfig(
            host="invalidhost"
        )
        tutils.raises("error starting proxy server", ProxyServer, conf)


class TestDummyServer:

    def test_simple(self):
        d = DummyServer(None)
        d.start_slave()
        d.shutdown()


class TestConnectionHandler:

    def test_fatal_error(self):
        config = mock.Mock()
        root_layer = mock.Mock()
        root_layer.side_effect = RuntimeError
        config.mode.return_value = root_layer
        channel = mock.Mock()

        def ask(_, x):
            return x
        channel.ask = ask
        c = ConnectionHandler(
            mock.MagicMock(),
            ("127.0.0.1", 8080),
            config,
            channel
        )
        with tutils.capture_stderr(c.handle) as output:
            assert "mitmproxy has crashed" in output