aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_protocol_http.py
blob: 290d82a10d7b84aadea61f2292678e77a38d5648 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
from libmproxy.protocol.http import *
from libmproxy.protocol import KILL
from cStringIO import StringIO
import tutils, tservers


def test_HttpAuthenticationError():
    x = HttpAuthenticationError({"foo": "bar"})
    assert str(x)
    assert "foo" in x.auth_headers


def test_stripped_chunked_encoding_no_content():
    """
    https://github.com/mitmproxy/mitmproxy/issues/186
    """
    r = tutils.tresp(content="")
    r.headers["Transfer-Encoding"] = ["chunked"]
    assert "Content-Length" in r._assemble_headers()

    r = tutils.treq(content="")
    r.headers["Transfer-Encoding"] = ["chunked"]
    assert "Content-Length" in r._assemble_headers()


class TestHTTPRequest:
    def test_asterisk_form(self):
        s = StringIO("OPTIONS * HTTP/1.1")
        f = tutils.tflow_noreq()
        f.request = HTTPRequest.from_stream(s)
        assert f.request.form_in == "relative"
        x = f.request._assemble()
        assert f.request._assemble() == "OPTIONS * HTTP/1.1\r\nHost: address:22\r\n\r\n"

    def test_origin_form(self):
        s = StringIO("GET /foo\xff HTTP/1.1")
        tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s)

    def test_authority_form(self):
        s = StringIO("CONNECT oops-no-port.com HTTP/1.1")
        tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s)
        s = StringIO("CONNECT address:22 HTTP/1.1")
        r = HTTPRequest.from_stream(s)
        assert r._assemble() == "CONNECT address:22 HTTP/1.1\r\nHost: address:22\r\n\r\n"

    def test_absolute_form(self):
        s = StringIO("GET oops-no-protocol.com HTTP/1.1")
        tutils.raises("Bad HTTP request line", HTTPRequest.from_stream, s)
        s = StringIO("GET http://address:22/ HTTP/1.1")
        r = HTTPRequest.from_stream(s)
        assert r._assemble() == "GET http://address:22/ HTTP/1.1\r\nHost: address:22\r\n\r\n"

    def test_assemble_unknown_form(self):
        r = tutils.treq()
        tutils.raises("Invalid request form", r._assemble, "antiauthority")


    def test_set_url(self):
        r = tutils.treq_absolute()
        r.set_url("https://otheraddress:42/ORLY")
        assert r.scheme == "https"
        assert r.host == "otheraddress"
        assert r.port == 42
        assert r.path == "/ORLY"


class TestHTTPResponse:
    def test_read_from_stringio(self):
        _s = "HTTP/1.1 200 OK\r\n" \
             "Content-Length: 7\r\n" \
             "\r\n"\
             "content\r\n" \
             "HTTP/1.1 204 OK\r\n" \
             "\r\n"
        s = StringIO(_s)
        r = HTTPResponse.from_stream(s, "GET")
        assert r.code == 200
        assert r.content == "content"
        assert HTTPResponse.from_stream(s, "GET").code == 204

        s = StringIO(_s)
        r = HTTPResponse.from_stream(s, "HEAD")  # HEAD must not have content by spec. We should leave it on the pipe.
        assert r.code == 200
        assert r.content == ""
        tutils.raises("Invalid server response: 'content", HTTPResponse.from_stream, s, "GET")


class TestInvalidRequests(tservers.HTTPProxTest):
    ssl = True
    def test_double_connect(self):
        p = self.pathoc()
        r = p.request("connect:'%s:%s'" % ("127.0.0.1", self.server2.port))
        assert r.status_code == 400
        assert "Must not CONNECT on already encrypted connection" in r.content

    def test_relative_request(self):
        p = self.pathoc_raw()
        p.connect()
        r = p.request("get:/p/200")
        assert r.status_code == 400
        assert "Invalid HTTP request form" in r.content


class TestProxyChaining(tservers.HTTPChainProxyTest):
    def test_all(self):
        self.chain[1].tmaster.replacehooks.add("~q", "foo", "bar") # replace in request
        self.chain[0].tmaster.replacehooks.add("~q", "foo", "oh noes!")
        self.proxy.tmaster.replacehooks.add("~q", "bar", "baz")
        self.chain[0].tmaster.replacehooks.add("~s", "baz", "ORLY")  # replace in response

        p = self.pathoc()
        req = p.request("get:'%s/p/418:b\"foo\"'" % self.server.urlbase)
        assert req.content == "ORLY"
        assert req.status_code == 418

class TestProxyChainingSSL(tservers.HTTPChainProxyTest):
    ssl = True
    def test_simple(self):
        p = self.pathoc()
        req = p.request("get:'/p/418:b\"content\"'")
        assert req.content == "content"
        assert req.status_code == 418

        assert self.chain[1].tmaster.state.flow_count() == 2  # CONNECT from pathoc to chain[0],
                                                              # request from pathoc to chain[0]
        assert self.chain[0].tmaster.state.flow_count() == 2  # CONNECT from chain[1] to proxy,
                                                              # request from chain[1] to proxy
        assert self.proxy.tmaster.state.flow_count() == 1  # request from chain[0] (regular proxy doesn't store CONNECTs)

class TestProxyChainingSSLReconnect(tservers.HTTPChainProxyTest):
    ssl = True

    def test_reconnect(self):
        """
        Tests proper functionality of ConnectionHandler.server_reconnect mock.
        If we have a disconnect on a secure connection that's transparently proxified to
        an upstream http proxy, we need to send the CONNECT request again.
        """
        def kill_requests(master, attr, exclude):
            k = [0]  # variable scope workaround: put into array
            _func = getattr(master, attr)
            def handler(r):
                k[0] += 1
                if not (k[0] in exclude):
                    r.flow.client_conn.finish()
                    r.flow.error = Error("terminated")
                    r.reply(KILL)
                return _func(r)
            setattr(master, attr, handler)

        kill_requests(self.proxy.tmaster, "handle_request",
                      exclude=[
                              # fail first request
                          2,  # allow second request
                      ])

        kill_requests(self.chain[0].tmaster, "handle_request",
                      exclude=[
                          1,  # CONNECT
                              # fail first request
                          3,  # reCONNECT
                          4,  # request
                      ])

        p = self.pathoc()
        req = p.request("get:'/p/418:b\"content\"'")
        assert self.chain[1].tmaster.state.flow_count() == 2  # CONNECT and request
        assert self.chain[0].tmaster.state.flow_count() == 4  # CONNECT, failing request,
                                                              # reCONNECT, request
        assert self.proxy.tmaster.state.flow_count() == 2  # failing request, request
                                                           # (doesn't store (repeated) CONNECTs from chain[0]
                                                           #  as it is a regular proxy)
        assert req.content == "content"
        assert req.status_code == 418

        assert not self.proxy.tmaster.state._flow_list[0].response  # killed
        assert self.proxy.tmaster.state._flow_list[1].response

        assert self.chain[1].tmaster.state._flow_list[0].request.form_in == "authority"
        assert self.chain[1].tmaster.state._flow_list[1].request.form_in == "relative"

        assert self.chain[0].tmaster.state._flow_list[0].request.form_in == "authority"
        assert self.chain[0].tmaster.state._flow_list[1].request.form_in == "relative"
        assert self.chain[0].tmaster.state._flow_list[2].request.form_in == "authority"
        assert self.chain[0].tmaster.state._flow_list[3].request.form_in == "relative"

        assert self.proxy.tmaster.state._flow_list[0].request.form_in == "relative"
        assert self.proxy.tmaster.state._flow_list[1].request.form_in == "relative"

        req = p.request("get:'/p/418:b\"content2\"'")

        assert req.status_code == 502
        assert self.chain[1].tmaster.state.flow_count() == 3  # + new request
        assert self.chain[0].tmaster.state.flow_count() == 6  # + new request, repeated CONNECT from chain[1]
                                                              # (both terminated)
        assert self.proxy.tmaster.state.flow_count() == 2  # nothing happened here