aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/net/http/response.py
blob: 2e864405804e3f0ccf0833fc77163cb10d56aa49 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import time
from email.utils import parsedate_tz, formatdate, mktime_tz
from mitmproxy.utils import human
from mitmproxy.coretypes import multidict
from mitmproxy.net.http import cookies
from mitmproxy.net.http import headers as nheaders
from mitmproxy.net.http import message
from mitmproxy.net.http import status_codes
from mitmproxy.utils import strutils
from typing import AnyStr
from typing import Dict
from typing import Iterable
from typing import Tuple
from typing import Union


class ResponseData(message.MessageData):
    def __init__(
        self,
        http_version,
        status_code,
        reason=None,
        headers=(),
        content=None,
        timestamp_start=None,
        timestamp_end=None
    ):
        if isinstance(http_version, str):
            http_version = http_version.encode("ascii", "strict")
        if isinstance(reason, str):
            reason = reason.encode("ascii", "strict")
        if not isinstance(headers, nheaders.Headers):
            headers = nheaders.Headers(headers)
        if isinstance(content, str):
            raise ValueError("Content must be bytes, not {}".format(type(content).__name__))

        self.http_version = http_version
        self.status_code = status_code
        self.reason = reason
        self.headers = headers
        self.content = content
        self.timestamp_start = timestamp_start
        self.timestamp_end = timestamp_end


class Response(message.Message):
    """
    An HTTP response.
    """
    data: ResponseData

    def __init__(self, *args, **kwargs):
        super().__init__()
        self.data = ResponseData(*args, **kwargs)

    def __repr__(self):
        if self.raw_content:
            details = "{}, {}".format(
                self.headers.get("content-type", "unknown content type"),
                human.pretty_size(len(self.raw_content))
            )
        else:
            details = "no content"
        return "Response({status_code} {reason}, {details})".format(
            status_code=self.status_code,
            reason=self.reason,
            details=details
        )

    @classmethod
    def make(
            cls,
            status_code: int=200,
            content: Union[bytes, str]=b"",
            headers: Union[Dict[str, AnyStr], Iterable[Tuple[bytes, bytes]]]=()
    ):
        """
        Simplified API for creating response objects.
        """
        resp = cls(
            b"HTTP/1.1",
            status_code,
            status_codes.RESPONSES.get(status_code, "").encode(),
            (),
            None
        )

        # Headers can be list or dict, we differentiate here.
        if isinstance(headers, dict):
            resp.headers = nheaders.Headers(**headers)
        elif isinstance(headers, Iterable):
            resp.headers = nheaders.Headers(headers)
        else:
            raise TypeError("Expected headers to be an iterable or dict, but is {}.".format(
                type(headers).__name__
            ))

        # Assign this manually to update the content-length header.
        if isinstance(content, bytes):
            resp.content = content
        elif isinstance(content, str):
            resp.text = content
        else:
            raise TypeError("Expected content to be str or bytes, but is {}.".format(
                type(content).__name__
            ))

        return resp

    @property
    def status_code(self):
        """
        HTTP Status Code, e.g. ``200``.
        """
        return self.data.status_code

    @status_code.setter
    def status_code(self, status_code):
        self.data.status_code = status_code

    @property
    def reason(self):
        """
        HTTP Reason Phrase, e.g. "Not Found".
        This is always :py:obj:`None` for HTTP2 requests, because HTTP2 responses do not contain a reason phrase.
        """
        # Encoding: http://stackoverflow.com/a/16674906/934719
        return self.data.reason.decode("ISO-8859-1", "surrogateescape")

    @reason.setter
    def reason(self, reason):
        self.data.reason = strutils.always_bytes(reason, "ISO-8859-1", "surrogateescape")

    def _get_cookies(self):
        h = self.headers.get_all("set-cookie")
        all_cookies = cookies.parse_set_cookie_headers(h)
        return tuple(
            (name, (value, attrs))
            for name, value, attrs in all_cookies
        )

    def _set_cookies(self, value):
        cookie_headers = []
        for k, v in value:
            header = cookies.format_set_cookie_header([(k, v[0], v[1])])
            cookie_headers.append(header)
        self.headers.set_all("set-cookie", cookie_headers)

    @property
    def cookies(self) -> multidict.MultiDictView:
        """
        The response cookies. A possibly empty
        :py:class:`~mitmproxy.net.multidict.MultiDictView`, where the keys are cookie
        name strings, and values are (value, attr) tuples. Value is a string,
        and attr is an MultiDictView containing cookie attributes. Within
        attrs, unary attributes (e.g. HTTPOnly) are indicated by a Null value.

        Caveats:
            Updating the attr
        """
        return multidict.MultiDictView(
            self._get_cookies,
            self._set_cookies
        )

    @cookies.setter
    def cookies(self, value):
        self._set_cookies(value)

    def refresh(self, now=None):
        """
        This fairly complex and heuristic function refreshes a server
        response for replay.

            - It adjusts date, expires and last-modified headers.
            - It adjusts cookie expiration.
        """
        if not now:
            now = time.time()
        delta = now - self.timestamp_start
        refresh_headers = [
            "date",
            "expires",
            "last-modified",
        ]
        for i in refresh_headers:
            if i in self.headers:
                d = parsedate_tz(self.headers[i])
                if d:
                    new = mktime_tz(d) + delta
                    self.headers[i] = formatdate(new, usegmt=True)
        c = []
        for set_cookie_header in self.headers.get_all("set-cookie"):
            try:
                refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
            except ValueError:
                refreshed = set_cookie_header
            c.append(refreshed)
        if c:
            self.headers.set_all("set-cookie", c)