aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/net/http/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'mitmproxy/net/http/message.py')
-rw-r--r--mitmproxy/net/http/message.py300
1 files changed, 300 insertions, 0 deletions
diff --git a/mitmproxy/net/http/message.py b/mitmproxy/net/http/message.py
new file mode 100644
index 00000000..af1d16be
--- /dev/null
+++ b/mitmproxy/net/http/message.py
@@ -0,0 +1,300 @@
+import re
+import warnings
+from typing import Optional
+
+from mitmproxy.utils import strutils
+from mitmproxy.net.http import encoding
+from mitmproxy.types import serializable
+from mitmproxy.net.http import headers
+
+
+# While headers _should_ be ASCII, it's not uncommon for certain headers to be utf-8 encoded.
+def _native(x):
+ return x.decode("utf-8", "surrogateescape")
+
+
+def _always_bytes(x):
+ return strutils.always_bytes(x, "utf-8", "surrogateescape")
+
+
+class MessageData(serializable.Serializable):
+ def __eq__(self, other):
+ if isinstance(other, MessageData):
+ return self.__dict__ == other.__dict__
+ return False
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def set_state(self, state):
+ for k, v in state.items():
+ if k == "headers":
+ v = headers.Headers.from_state(v)
+ setattr(self, k, v)
+
+ def get_state(self):
+ state = vars(self).copy()
+ state["headers"] = state["headers"].get_state()
+ return state
+
+ @classmethod
+ def from_state(cls, state):
+ state["headers"] = headers.Headers.from_state(state["headers"])
+ return cls(**state)
+
+
+class Message(serializable.Serializable):
+ def __eq__(self, other):
+ if isinstance(other, Message):
+ return self.data == other.data
+ return False
+
+ def __ne__(self, other):
+ return not self.__eq__(other)
+
+ def get_state(self):
+ return self.data.get_state()
+
+ def set_state(self, state):
+ self.data.set_state(state)
+
+ @classmethod
+ def from_state(cls, state):
+ state["headers"] = headers.Headers.from_state(state["headers"])
+ return cls(**state)
+
+ @property
+ def headers(self):
+ """
+ Message headers object
+
+ Returns:
+ mitmproxy.net.http.Headers
+ """
+ return self.data.headers
+
+ @headers.setter
+ def headers(self, h):
+ self.data.headers = h
+
+ @property
+ def raw_content(self) -> bytes:
+ """
+ The raw (encoded) HTTP message body
+
+ See also: :py:attr:`content`, :py:class:`text`
+ """
+ return self.data.content
+
+ @raw_content.setter
+ def raw_content(self, content):
+ self.data.content = content
+
+ def get_content(self, strict: bool=True) -> bytes:
+ """
+ The HTTP message body decoded with the content-encoding header (e.g. gzip)
+
+ Raises:
+ ValueError, when the content-encoding is invalid and strict is True.
+
+ See also: :py:class:`raw_content`, :py:attr:`text`
+ """
+ if self.raw_content is None:
+ return None
+ ce = self.headers.get("content-encoding")
+ if ce:
+ try:
+ return encoding.decode(self.raw_content, ce)
+ except ValueError:
+ if strict:
+ raise
+ return self.raw_content
+ else:
+ return self.raw_content
+
+ def set_content(self, value):
+ if value is None:
+ self.raw_content = None
+ return
+ if not isinstance(value, bytes):
+ raise TypeError(
+ "Message content must be bytes, not {}. "
+ "Please use .text if you want to assign a str."
+ .format(type(value).__name__)
+ )
+ ce = self.headers.get("content-encoding")
+ try:
+ self.raw_content = encoding.encode(value, ce or "identity")
+ except ValueError:
+ # So we have an invalid content-encoding?
+ # Let's remove it!
+ del self.headers["content-encoding"]
+ self.raw_content = value
+ self.headers["content-length"] = str(len(self.raw_content))
+
+ content = property(get_content, set_content)
+
+ @property
+ def http_version(self):
+ """
+ Version string, e.g. "HTTP/1.1"
+ """
+ return _native(self.data.http_version)
+
+ @http_version.setter
+ def http_version(self, http_version):
+ self.data.http_version = _always_bytes(http_version)
+
+ @property
+ def timestamp_start(self):
+ """
+ First byte timestamp
+ """
+ return self.data.timestamp_start
+
+ @timestamp_start.setter
+ def timestamp_start(self, timestamp_start):
+ self.data.timestamp_start = timestamp_start
+
+ @property
+ def timestamp_end(self):
+ """
+ Last byte timestamp
+ """
+ return self.data.timestamp_end
+
+ @timestamp_end.setter
+ def timestamp_end(self, timestamp_end):
+ self.data.timestamp_end = timestamp_end
+
+ def _get_content_type_charset(self) -> Optional[str]:
+ ct = headers.parse_content_type(self.headers.get("content-type", ""))
+ if ct:
+ return ct[2].get("charset")
+
+ def _guess_encoding(self) -> str:
+ enc = self._get_content_type_charset()
+ if enc:
+ return enc
+
+ if "json" in self.headers.get("content-type", ""):
+ return "utf8"
+ else:
+ # We may also want to check for HTML meta tags here at some point.
+ return "latin-1"
+
+ def get_text(self, strict: bool=True) -> str:
+ """
+ The HTTP message body decoded with both content-encoding header (e.g. gzip)
+ and content-type header charset.
+
+ Raises:
+ ValueError, when either content-encoding or charset is invalid and strict is True.
+
+ See also: :py:attr:`content`, :py:class:`raw_content`
+ """
+ if self.raw_content is None:
+ return None
+ enc = self._guess_encoding()
+
+ content = self.get_content(strict)
+ try:
+ return encoding.decode(content, enc)
+ except ValueError:
+ if strict:
+ raise
+ return content.decode("utf8", "surrogateescape")
+
+ def set_text(self, text):
+ if text is None:
+ self.content = None
+ return
+ enc = self._guess_encoding()
+
+ try:
+ self.content = encoding.encode(text, enc)
+ except ValueError:
+ # Fall back to UTF-8 and update the content-type header.
+ ct = headers.parse_content_type(self.headers.get("content-type", "")) or ("text", "plain", {})
+ ct[2]["charset"] = "utf-8"
+ self.headers["content-type"] = headers.assemble_content_type(*ct)
+ enc = "utf8"
+ self.content = text.encode(enc, "surrogateescape")
+
+ text = property(get_text, set_text)
+
+ def decode(self, strict=True):
+ """
+ Decodes body based on the current Content-Encoding header, then
+ removes the header. If there is no Content-Encoding header, no
+ action is taken.
+
+ Raises:
+ ValueError, when the content-encoding is invalid and strict is True.
+ """
+ self.raw_content = self.get_content(strict)
+ self.headers.pop("content-encoding", None)
+
+ def encode(self, e):
+ """
+ Encodes body with the encoding e, where e is "gzip", "deflate", "identity", or "br".
+ Any existing content-encodings are overwritten,
+ the content is not decoded beforehand.
+
+ Raises:
+ ValueError, when the specified content-encoding is invalid.
+ """
+ self.headers["content-encoding"] = e
+ self.content = self.raw_content
+ if "content-encoding" not in self.headers:
+ raise ValueError("Invalid content encoding {}".format(repr(e)))
+
+ def replace(self, pattern, repl, flags=0, count=0):
+ """
+ Replaces a regular expression pattern with repl in both the headers
+ and the body of the message. Encoded body will be decoded
+ before replacement, and re-encoded afterwards.
+
+ Returns:
+ The number of replacements made.
+ """
+ if isinstance(pattern, str):
+ pattern = strutils.escaped_str_to_bytes(pattern)
+ if isinstance(repl, str):
+ repl = strutils.escaped_str_to_bytes(repl)
+ replacements = 0
+ if self.content:
+ self.content, replacements = re.subn(
+ pattern, repl, self.content, flags=flags, count=count
+ )
+ replacements += self.headers.replace(pattern, repl, flags=flags, count=count)
+ return replacements
+
+ # Legacy
+
+ @property
+ def body(self): # pragma: no cover
+ warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning)
+ return self.content
+
+ @body.setter
+ def body(self, body): # pragma: no cover
+ warnings.warn(".body is deprecated, use .content instead.", DeprecationWarning)
+ self.content = body
+
+
+class decoded:
+ """
+ Deprecated: You can now directly use :py:attr:`content`.
+ :py:attr:`raw_content` has the encoded content.
+ """
+
+ def __init__(self, message): # pragma no cover
+ warnings.warn("decoded() is deprecated, you can now directly use .content instead. "
+ ".raw_content has the encoded content.", DeprecationWarning)
+
+ def __enter__(self): # pragma no cover
+ pass
+
+ def __exit__(self, type, value, tb): # pragma no cover
+ pass