diff options
Diffstat (limited to 'mitmproxy/proxy/protocol')
-rw-r--r-- | mitmproxy/proxy/protocol/__init__.py | 52 | ||||
-rw-r--r-- | mitmproxy/proxy/protocol/base.py | 185 | ||||
-rw-r--r-- | mitmproxy/proxy/protocol/http.py | 458 | ||||
-rw-r--r-- | mitmproxy/proxy/protocol/http1.py | 72 | ||||
-rw-r--r-- | mitmproxy/proxy/protocol/http2.py | 614 | ||||
-rw-r--r-- | mitmproxy/proxy/protocol/http_replay.py | 120 | ||||
-rw-r--r-- | mitmproxy/proxy/protocol/rawtcp.py | 65 | ||||
-rw-r--r-- | mitmproxy/proxy/protocol/tls.py | 581 | ||||
-rw-r--r-- | mitmproxy/proxy/protocol/websockets.py | 112 |
9 files changed, 2259 insertions, 0 deletions
diff --git a/mitmproxy/proxy/protocol/__init__.py b/mitmproxy/proxy/protocol/__init__.py new file mode 100644 index 00000000..89b60386 --- /dev/null +++ b/mitmproxy/proxy/protocol/__init__.py @@ -0,0 +1,52 @@ +""" +In mitmproxy, protocols are implemented as a set of layers, which are composed +on top each other. The first layer is usually the proxy mode, e.g. transparent +proxy or normal HTTP proxy. Next, various protocol layers are stacked on top of +each other - imagine WebSockets on top of an HTTP Upgrade request. An actual +mitmproxy connection may look as follows (outermost layer first): + + Transparent HTTP proxy, no TLS: + - TransparentProxy + - Http1Layer + - HttpLayer + + Regular proxy, CONNECT request with WebSockets over SSL: + - ReverseProxy + - Http1Layer + - HttpLayer + - TLSLayer + - WebsocketLayer (or TCPLayer) + +Every layer acts as a read-only context for its inner layers (see +:py:class:`Layer`). To communicate with an outer layer, a layer can use +functions provided in the context. The next layer is always determined by a +call to :py:meth:`.next_layer() <mitmproxy.proxy.RootContext.next_layer>`, +which is provided by the root context. + +Another subtle design goal of this architecture is that upstream connections +should be established as late as possible; this makes server replay without any +outgoing connections possible. +""" + + +from .base import Layer, ServerConnectionMixin +from .http import UpstreamConnectLayer +from .http import HttpLayer +from .http1 import Http1Layer +from .http2 import Http2Layer +from .websockets import WebSocketsLayer +from .rawtcp import RawTCPLayer +from .tls import TlsClientHello +from .tls import TlsLayer +from .tls import is_tls_record_magic + +__all__ = [ + "Layer", "ServerConnectionMixin", + "TlsLayer", "is_tls_record_magic", "TlsClientHello", + "UpstreamConnectLayer", + "HttpLayer", + "Http1Layer", + "Http2Layer", + "WebSocketsLayer", + "RawTCPLayer", +] diff --git a/mitmproxy/proxy/protocol/base.py b/mitmproxy/proxy/protocol/base.py new file mode 100644 index 00000000..00d50721 --- /dev/null +++ b/mitmproxy/proxy/protocol/base.py @@ -0,0 +1,185 @@ +import netlib.exceptions +from mitmproxy import exceptions +from mitmproxy import connections + + +class _LayerCodeCompletion: + + """ + Dummy class that provides type hinting in PyCharm, which simplifies development a lot. + """ + + def __init__(self, **mixin_args): # pragma: no cover + super().__init__(**mixin_args) + if True: + return + self.config = None + """@type: mitmproxy.proxy.ProxyConfig""" + self.client_conn = None + """@type: mitmproxy.connections.ClientConnection""" + self.server_conn = None + """@type: mitmproxy.connections.ServerConnection""" + self.channel = None + """@type: mitmproxy.controller.Channel""" + self.ctx = None + """@type: mitmproxy.proxy.protocol.Layer""" + + +class Layer(_LayerCodeCompletion): + + """ + Base class for all layers. All other protocol layers should inherit from this class. + """ + + def __init__(self, ctx, **mixin_args): + """ + Each layer usually passes itself to its child layers as a context. Properties of the + context are transparently mapped to the layer, so that the following works: + + .. code-block:: python + + root_layer = Layer(None) + root_layer.client_conn = 42 + sub_layer = Layer(root_layer) + print(sub_layer.client_conn) # 42 + + The root layer is passed a :py:class:`mitmproxy.proxy.RootContext` object, + which provides access to :py:attr:`.client_conn <mitmproxy.proxy.RootContext.client_conn>`, + :py:attr:`.next_layer <mitmproxy.proxy.RootContext.next_layer>` and other basic attributes. + + Args: + ctx: The (read-only) parent layer / context. + """ + self.ctx = ctx + """ + The parent layer. + + :type: :py:class:`Layer` + """ + super().__init__(**mixin_args) + + def __call__(self): + """Logic of the layer. + + Returns: + Once the protocol has finished without exceptions. + + Raises: + ~mitmproxy.exceptions.ProtocolException: if an exception occurs. No other exceptions must be raised. + """ + raise NotImplementedError() + + def __getattr__(self, name): + """ + Attributes not present on the current layer are looked up on the context. + """ + return getattr(self.ctx, name) + + @property + def layers(self): + """ + List of all layers, including the current layer (``[self, self.ctx, self.ctx.ctx, ...]``) + """ + return [self] + self.ctx.layers + + def __repr__(self): + return type(self).__name__ + + +class ServerConnectionMixin: + + """ + Mixin that provides a layer with the capabilities to manage a server connection. + The server address can be passed in the constructor or set by calling :py:meth:`set_server`. + Subclasses are responsible for calling :py:meth:`disconnect` before returning. + + Recommended Usage: + + .. code-block:: python + + class MyLayer(Layer, ServerConnectionMixin): + def __call__(self): + try: + # Do something. + finally: + if self.server_conn: + self.disconnect() + """ + + def __init__(self, server_address=None): + super().__init__() + + self.server_conn = None + if self.config.options.spoof_source_address: + self.server_conn = connections.ServerConnection( + server_address, (self.ctx.client_conn.address.host, 0), True) + else: + self.server_conn = connections.ServerConnection( + server_address, (self.config.options.listen_host, 0)) + + self.__check_self_connect() + + def __check_self_connect(self): + """ + We try to protect the proxy from _accidentally_ connecting to itself, + e.g. because of a failed transparent lookup or an invalid configuration. + """ + address = self.server_conn.address + if address: + self_connect = ( + address.port == self.config.options.listen_port and + address.host in ("localhost", "127.0.0.1", "::1") + ) + if self_connect: + raise exceptions.ProtocolException( + "Invalid server address: {}\r\n" + "The proxy shall not connect to itself.".format(repr(address)) + ) + + def set_server(self, address): + """ + Sets a new server address. If there is an existing connection, it will be closed. + """ + if self.server_conn: + self.disconnect() + self.log("Set new server address: " + repr(address), "debug") + self.server_conn.address = address + self.__check_self_connect() + + def disconnect(self): + """ + Deletes (and closes) an existing server connection. + Must not be called if there is no existing connection. + """ + self.log("serverdisconnect", "debug", [repr(self.server_conn.address)]) + address = self.server_conn.address + self.server_conn.finish() + self.server_conn.close() + self.channel.tell("serverdisconnect", self.server_conn) + + self.server_conn = connections.ServerConnection( + address, + (self.server_conn.source_address.host, 0), + self.config.options.spoof_source_address + ) + + def connect(self): + """ + Establishes a server connection. + Must not be called if there is an existing connection. + + Raises: + ~mitmproxy.exceptions.ProtocolException: if the connection could not be established. + """ + if not self.server_conn.address: + raise exceptions.ProtocolException("Cannot connect to server, no server address given.") + self.log("serverconnect", "debug", [repr(self.server_conn.address)]) + self.channel.ask("serverconnect", self.server_conn) + try: + self.server_conn.connect() + except netlib.exceptions.TcpException as e: + raise exceptions.ProtocolException( + "Server connection to {} failed: {}".format( + repr(self.server_conn.address), str(e) + ) + ) diff --git a/mitmproxy/proxy/protocol/http.py b/mitmproxy/proxy/protocol/http.py new file mode 100644 index 00000000..ec018f89 --- /dev/null +++ b/mitmproxy/proxy/protocol/http.py @@ -0,0 +1,458 @@ +import h2.exceptions +import netlib.exceptions +import time +import traceback +from mitmproxy import exceptions +from mitmproxy import http +from mitmproxy import flow +from mitmproxy.proxy.protocol import base +from mitmproxy.proxy.protocol import websockets as pwebsockets +import netlib.http +from netlib import tcp +from netlib import websockets + + +class _HttpTransmissionLayer(base.Layer): + def read_request_headers(self): + raise NotImplementedError() + + def read_request_body(self, request): + raise NotImplementedError() + + def read_request(self): + request = self.read_request_headers() + request.data.content = b"".join( + self.read_request_body(request) + ) + request.timestamp_end = time.time() + return request + + def send_request(self, request): + raise NotImplementedError() + + def read_response_headers(self): + raise NotImplementedError() + + def read_response_body(self, request, response): + raise NotImplementedError() + yield "this is a generator" # pragma: no cover + + def read_response(self, request): + response = self.read_response_headers() + response.data.content = b"".join( + self.read_response_body(request, response) + ) + return response + + def send_response(self, response): + if response.data.content is None: + raise netlib.exceptions.HttpException("Cannot assemble flow with missing content") + self.send_response_headers(response) + self.send_response_body(response, [response.data.content]) + + def send_response_headers(self, response): + raise NotImplementedError() + + def send_response_body(self, response, chunks): + raise NotImplementedError() + + def check_close_connection(self, f): + raise NotImplementedError() + + +class ConnectServerConnection: + + """ + "Fake" ServerConnection to represent state after a CONNECT request to an upstream proxy. + """ + + def __init__(self, address, ctx): + self.address = tcp.Address.wrap(address) + self._ctx = ctx + + @property + def via(self): + return self._ctx.server_conn + + def __getattr__(self, item): + return getattr(self.via, item) + + def __bool__(self): + return bool(self.via) + + +class UpstreamConnectLayer(base.Layer): + + def __init__(self, ctx, connect_request): + super().__init__(ctx) + self.connect_request = connect_request + self.server_conn = ConnectServerConnection( + (connect_request.host, connect_request.port), + self.ctx + ) + + def __call__(self): + layer = self.ctx.next_layer(self) + layer() + + def _send_connect_request(self): + self.send_request(self.connect_request) + resp = self.read_response(self.connect_request) + if resp.status_code != 200: + raise exceptions.ProtocolException("Reconnect: Upstream server refuses CONNECT request") + + def connect(self): + if not self.server_conn: + self.ctx.connect() + self._send_connect_request() + else: + pass # swallow the message + + def change_upstream_proxy_server(self, address): + if address != self.server_conn.via.address: + self.ctx.set_server(address) + + def set_server(self, address): + if self.ctx.server_conn: + self.ctx.disconnect() + address = tcp.Address.wrap(address) + self.connect_request.host = address.host + self.connect_request.port = address.port + self.server_conn.address = address + + +class HttpLayer(base.Layer): + + def __init__(self, ctx, mode): + super().__init__(ctx) + self.mode = mode + + self.__initial_server_conn = None + "Contains the original destination in transparent mode, which needs to be restored" + "if an inline script modified the target server for a single http request" + # We cannot rely on server_conn.tls_established, + # see https://github.com/mitmproxy/mitmproxy/issues/925 + self.__initial_server_tls = None + # Requests happening after CONNECT do not need Proxy-Authorization headers. + self.http_authenticated = False + + def __call__(self): + if self.mode == "transparent": + self.__initial_server_tls = self.server_tls + self.__initial_server_conn = self.server_conn + while True: + f = http.HTTPFlow(self.client_conn, self.server_conn, live=self) + try: + request = self.get_request_from_client(f) + # Make sure that the incoming request matches our expectations + self.validate_request(request) + except netlib.exceptions.HttpReadDisconnect: + # don't throw an error for disconnects that happen before/between requests. + return + except netlib.exceptions.HttpException as e: + # We optimistically guess there might be an HTTP client on the + # other end + self.send_error_response(400, repr(e)) + raise exceptions.ProtocolException( + "HTTP protocol error in client request: {}".format(e) + ) + + self.log("request", "debug", [repr(request)]) + + # Handle Proxy Authentication + # Proxy Authentication conceptually does not work in transparent mode. + # We catch this misconfiguration on startup. Here, we sort out requests + # after a successful CONNECT request (which do not need to be validated anymore) + if not (self.http_authenticated or self.authenticate(request)): + return + + f.request = request + + try: + # Regular Proxy Mode: Handle CONNECT + if self.mode == "regular" and request.first_line_format == "authority": + self.handle_regular_mode_connect(request) + return + except (exceptions.ProtocolException, netlib.exceptions.NetlibException) as e: + # HTTPS tasting means that ordinary errors like resolution and + # connection errors can happen here. + self.send_error_response(502, repr(e)) + f.error = flow.Error(str(e)) + self.channel.ask("error", f) + return + + # update host header in reverse proxy mode + if self.config.options.mode == "reverse": + f.request.headers["Host"] = self.config.upstream_server.address.host + + # set upstream auth + if self.mode == "upstream" and self.config.upstream_auth is not None: + f.request.headers["Proxy-Authorization"] = self.config.upstream_auth + self.process_request_hook(f) + + try: + if websockets.check_handshake(request.headers) and websockets.check_client_version(request.headers): + # We only support RFC6455 with WebSockets version 13 + # allow inline scripts to manipulate the client handshake + self.channel.ask("websocket_handshake", f) + + if not f.response: + self.establish_server_connection( + f.request.host, + f.request.port, + f.request.scheme + ) + self.get_response_from_server(f) + else: + # response was set by an inline script. + # we now need to emulate the responseheaders hook. + self.channel.ask("responseheaders", f) + + self.log("response", "debug", [repr(f.response)]) + self.channel.ask("response", f) + self.send_response_to_client(f) + + if self.check_close_connection(f): + return + + # Handle 101 Switching Protocols + if f.response.status_code == 101: + return self.handle_101_switching_protocols(f) + + # Upstream Proxy Mode: Handle CONNECT + if f.request.first_line_format == "authority" and f.response.status_code == 200: + self.handle_upstream_mode_connect(f.request.copy()) + return + + except (exceptions.ProtocolException, netlib.exceptions.NetlibException) as e: + self.send_error_response(502, repr(e)) + if not f.response: + f.error = flow.Error(str(e)) + self.channel.ask("error", f) + return + else: + raise exceptions.ProtocolException( + "Error in HTTP connection: %s" % repr(e) + ) + finally: + if f: + f.live = False + + def get_request_from_client(self, f): + request = self.read_request() + f.request = request + self.channel.ask("requestheaders", f) + if request.headers.get("expect", "").lower() == "100-continue": + # TODO: We may have to use send_response_headers for HTTP2 here. + self.send_response(http.expect_continue_response) + request.headers.pop("expect") + request.body = b"".join(self.read_request_body(request)) + request.timestamp_end = time.time() + return request + + def send_error_response(self, code, message, headers=None): + try: + response = http.make_error_response(code, message, headers) + self.send_response(response) + except (netlib.exceptions.NetlibException, h2.exceptions.H2Error, exceptions.Http2ProtocolException): + self.log(traceback.format_exc(), "debug") + + def change_upstream_proxy_server(self, address): + # Make set_upstream_proxy_server always available, + # even if there's no UpstreamConnectLayer + if address != self.server_conn.address: + return self.set_server(address) + + def handle_regular_mode_connect(self, request): + self.http_authenticated = True + self.set_server((request.host, request.port)) + self.send_response(http.make_connect_response(request.data.http_version)) + layer = self.ctx.next_layer(self) + layer() + + def handle_upstream_mode_connect(self, connect_request): + layer = UpstreamConnectLayer(self, connect_request) + layer() + + def send_response_to_client(self, f): + if not f.response.stream: + # no streaming: + # we already received the full response from the server and can + # send it to the client straight away. + self.send_response(f.response) + else: + # streaming: + # First send the headers and then transfer the response incrementally + self.send_response_headers(f.response) + chunks = self.read_response_body( + f.request, + f.response + ) + if callable(f.response.stream): + chunks = f.response.stream(chunks) + self.send_response_body(f.response, chunks) + f.response.timestamp_end = time.time() + + def get_response_from_server(self, f): + def get_response(): + self.send_request(f.request) + f.response = self.read_response_headers() + + try: + get_response() + except netlib.exceptions.NetlibException as e: + self.log( + "server communication error: %s" % repr(e), + level="debug" + ) + # In any case, we try to reconnect at least once. This is + # necessary because it might be possible that we already + # initiated an upstream connection after clientconnect that + # has already been expired, e.g consider the following event + # log: + # > clientconnect (transparent mode destination known) + # > serverconnect (required for client tls handshake) + # > read n% of large request + # > server detects timeout, disconnects + # > read (100-n)% of large request + # > send large request upstream + + if isinstance(e, exceptions.Http2ProtocolException): + # do not try to reconnect for HTTP2 + raise exceptions.ProtocolException("First and only attempt to get response via HTTP2 failed.") + + self.disconnect() + self.connect() + get_response() + + # call the appropriate script hook - this is an opportunity for an + # inline script to set f.stream = True + self.channel.ask("responseheaders", f) + + if f.response.stream: + f.response.data.content = None + else: + f.response.data.content = b"".join(self.read_response_body( + f.request, + f.response + )) + f.response.timestamp_end = time.time() + + # no further manipulation of self.server_conn beyond this point + # we can safely set it as the final attribute value here. + f.server_conn = self.server_conn + + def process_request_hook(self, f): + # Determine .scheme, .host and .port attributes for inline scripts. + # For absolute-form requests, they are directly given in the request. + # For authority-form requests, we only need to determine the request scheme. + # For relative-form requests, we need to determine host and port as + # well. + if self.mode == "regular": + pass # only absolute-form at this point, nothing to do here. + elif self.mode == "upstream": + pass + else: + # Setting request.host also updates the host header, which we want to preserve + host_header = f.request.headers.get("host", None) + f.request.host = self.__initial_server_conn.address.host + f.request.port = self.__initial_server_conn.address.port + if host_header: + f.request.headers["host"] = host_header + f.request.scheme = "https" if self.__initial_server_tls else "http" + self.channel.ask("request", f) + + def establish_server_connection(self, host, port, scheme): + address = tcp.Address((host, port)) + tls = (scheme == "https") + + if self.mode == "regular" or self.mode == "transparent": + # If there's an existing connection that doesn't match our expectations, kill it. + if address != self.server_conn.address or tls != self.server_tls: + self.set_server(address) + self.set_server_tls(tls, address.host) + # Establish connection is neccessary. + if not self.server_conn: + self.connect() + else: + if not self.server_conn: + self.connect() + if tls: + raise exceptions.HttpProtocolException("Cannot change scheme in upstream proxy mode.") + """ + # This is a very ugly (untested) workaround to solve a very ugly problem. + if self.server_conn and self.server_conn.tls_established and not ssl: + self.disconnect() + self.connect() + elif ssl and not hasattr(self, "connected_to") or self.connected_to != address: + if self.server_conn.tls_established: + self.disconnect() + self.connect() + + self.send_request(make_connect_request(address)) + tls_layer = TlsLayer(self, False, True) + tls_layer._establish_tls_with_server() + """ + + def validate_request(self, request): + if request.first_line_format == "absolute" and request.scheme != "http": + raise netlib.exceptions.HttpException("Invalid request scheme: %s" % request.scheme) + + expected_request_forms = { + "regular": ("authority", "absolute",), + "upstream": ("authority", "absolute"), + "transparent": ("relative",) + } + + allowed_request_forms = expected_request_forms[self.mode] + if request.first_line_format not in allowed_request_forms: + err_message = "Invalid HTTP request form (expected: %s, got: %s)" % ( + " or ".join(allowed_request_forms), request.first_line_format + ) + raise netlib.exceptions.HttpException(err_message) + + if self.mode == "regular" and request.first_line_format == "absolute": + request.first_line_format = "relative" + + def authenticate(self, request): + if self.config.authenticator: + if self.config.authenticator.authenticate(request.headers): + self.config.authenticator.clean(request.headers) + else: + if self.mode == "transparent": + self.send_response(http.make_error_response( + 401, + "Authentication Required", + netlib.http.Headers(**self.config.authenticator.auth_challenge_headers()) + )) + else: + self.send_response(http.make_error_response( + 407, + "Proxy Authentication Required", + netlib.http.Headers(**self.config.authenticator.auth_challenge_headers()) + )) + return False + return True + + def handle_101_switching_protocols(self, f): + """ + Handle a successful HTTP 101 Switching Protocols Response, received after e.g. a WebSocket upgrade request. + """ + # Check for WebSockets handshake + is_websockets = ( + f and + websockets.check_handshake(f.request.headers) and + websockets.check_handshake(f.response.headers) + ) + if is_websockets and not self.config.options.websockets: + self.log( + "Client requested WebSocket connection, but the protocol is currently disabled in mitmproxy.", + "info" + ) + + if is_websockets and self.config.options.websockets: + layer = pwebsockets.WebSocketsLayer(self, f) + else: + layer = self.ctx.next_layer(self) + + layer() diff --git a/mitmproxy/proxy/protocol/http1.py b/mitmproxy/proxy/protocol/http1.py new file mode 100644 index 00000000..968d3249 --- /dev/null +++ b/mitmproxy/proxy/protocol/http1.py @@ -0,0 +1,72 @@ +from mitmproxy import http +from mitmproxy.proxy.protocol import http as httpbase +from netlib.http import http1 + + +class Http1Layer(httpbase._HttpTransmissionLayer): + + def __init__(self, ctx, mode): + super().__init__(ctx) + self.mode = mode + + def read_request_headers(self): + return http.HTTPRequest.wrap( + http1.read_request_head(self.client_conn.rfile) + ) + + def read_request_body(self, request): + expected_size = http1.expected_http_body_size(request) + return http1.read_body( + self.client_conn.rfile, + expected_size, + self.config.options.body_size_limit + ) + + def send_request(self, request): + self.server_conn.wfile.write(http1.assemble_request(request)) + self.server_conn.wfile.flush() + + def read_response_headers(self): + resp = http1.read_response_head(self.server_conn.rfile) + return http.HTTPResponse.wrap(resp) + + def read_response_body(self, request, response): + expected_size = http1.expected_http_body_size(request, response) + return http1.read_body( + self.server_conn.rfile, + expected_size, + self.config.options.body_size_limit + ) + + def send_response_headers(self, response): + raw = http1.assemble_response_head(response) + self.client_conn.wfile.write(raw) + self.client_conn.wfile.flush() + + def send_response_body(self, response, chunks): + for chunk in http1.assemble_body(response.headers, chunks): + self.client_conn.wfile.write(chunk) + self.client_conn.wfile.flush() + + def check_close_connection(self, flow): + request_close = http1.connection_close( + flow.request.http_version, + flow.request.headers + ) + response_close = http1.connection_close( + flow.response.http_version, + flow.response.headers + ) + read_until_eof = http1.expected_http_body_size(flow.request, flow.response) == -1 + close_connection = request_close or response_close or read_until_eof + if flow.request.first_line_format == "authority" and flow.response.status_code == 200: + # Workaround for https://github.com/mitmproxy/mitmproxy/issues/313: + # Charles Proxy sends a CONNECT response with HTTP/1.0 + # and no Content-Length header + + return False + return close_connection + + def __call__(self): + layer = httpbase.HttpLayer(self, self.mode) + layer() diff --git a/mitmproxy/proxy/protocol/http2.py b/mitmproxy/proxy/protocol/http2.py new file mode 100644 index 00000000..cbd8b34c --- /dev/null +++ b/mitmproxy/proxy/protocol/http2.py @@ -0,0 +1,614 @@ +import threading +import time +import traceback +import functools + +import h2.exceptions +from h2 import connection +from h2 import events +import queue + +import netlib.exceptions +from mitmproxy import exceptions +from mitmproxy import http +from mitmproxy.proxy.protocol import base +from mitmproxy.proxy.protocol import http as httpbase +import netlib.http +from netlib import tcp +from netlib import basethread +from netlib.http import http2 + + +class SafeH2Connection(connection.H2Connection): + + def __init__(self, conn, *args, **kwargs): + super().__init__(*args, **kwargs) + self.conn = conn + self.lock = threading.RLock() + + def safe_increment_flow_control(self, stream_id, length): + if length == 0: + return + + with self.lock: + self.increment_flow_control_window(length) + self.conn.send(self.data_to_send()) + with self.lock: + if stream_id in self.streams and not self.streams[stream_id].closed: + self.increment_flow_control_window(length, stream_id=stream_id) + self.conn.send(self.data_to_send()) + + def safe_reset_stream(self, stream_id, error_code): + with self.lock: + try: + self.reset_stream(stream_id, error_code) + except h2.exceptions.StreamClosedError: # pragma: no cover + # stream is already closed - good + pass + self.conn.send(self.data_to_send()) + + def safe_update_settings(self, new_settings): + with self.lock: + self.update_settings(new_settings) + self.conn.send(self.data_to_send()) + + def safe_send_headers(self, raise_zombie, stream_id, headers, **kwargs): + with self.lock: + raise_zombie() + self.send_headers(stream_id, headers.fields, **kwargs) + self.conn.send(self.data_to_send()) + + def safe_send_body(self, raise_zombie, stream_id, chunks): + for chunk in chunks: + position = 0 + while position < len(chunk): + self.lock.acquire() + raise_zombie(self.lock.release) + max_outbound_frame_size = self.max_outbound_frame_size + frame_chunk = chunk[position:position + max_outbound_frame_size] + if self.local_flow_control_window(stream_id) < len(frame_chunk): + self.lock.release() + time.sleep(0.1) + continue + self.send_data(stream_id, frame_chunk) + try: + self.conn.send(self.data_to_send()) + except Exception as e: # pragma: no cover + raise e + finally: + self.lock.release() + position += max_outbound_frame_size + with self.lock: + raise_zombie() + self.end_stream(stream_id) + self.conn.send(self.data_to_send()) + + +class Http2Layer(base.Layer): + + def __init__(self, ctx, mode): + super().__init__(ctx) + self.mode = mode + self.streams = dict() + self.server_to_client_stream_ids = dict([(0, 0)]) + self.client_conn.h2 = SafeH2Connection(self.client_conn, client_side=False, header_encoding=False) + + def _initiate_server_conn(self): + if self.server_conn: + self.server_conn.h2 = SafeH2Connection(self.server_conn, client_side=True, header_encoding=False) + self.server_conn.h2.initiate_connection() + self.server_conn.send(self.server_conn.h2.data_to_send()) + + def _complete_handshake(self): + preamble = self.client_conn.rfile.read(24) + self.client_conn.h2.initiate_connection() + self.client_conn.h2.receive_data(preamble) + self.client_conn.send(self.client_conn.h2.data_to_send()) + + def next_layer(self): # pragma: no cover + # WebSockets over HTTP/2? + # CONNECT for proxying? + raise NotImplementedError() + + def _handle_event(self, event, source_conn, other_conn, is_server): + self.log( + "HTTP2 Event from {}".format("server" if is_server else "client"), + "debug", + [repr(event)] + ) + + eid = None + if hasattr(event, 'stream_id'): + if is_server and event.stream_id % 2 == 1: + eid = self.server_to_client_stream_ids[event.stream_id] + else: + eid = event.stream_id + + if isinstance(event, events.RequestReceived): + return self._handle_request_received(eid, event, source_conn.h2) + elif isinstance(event, events.ResponseReceived): + return self._handle_response_received(eid, event) + elif isinstance(event, events.DataReceived): + return self._handle_data_received(eid, event, source_conn) + elif isinstance(event, events.StreamEnded): + return self._handle_stream_ended(eid) + elif isinstance(event, events.StreamReset): + return self._handle_stream_reset(eid, event, is_server, other_conn) + elif isinstance(event, events.RemoteSettingsChanged): + return self._handle_remote_settings_changed(event, other_conn) + elif isinstance(event, events.ConnectionTerminated): + return self._handle_connection_terminated(event, is_server) + elif isinstance(event, events.PushedStreamReceived): + return self._handle_pushed_stream_received(event, source_conn.h2) + elif isinstance(event, events.PriorityUpdated): + return self._handle_priority_updated(eid, event) + elif isinstance(event, events.TrailersReceived): + raise NotImplementedError('TrailersReceived not implemented') + + # fail-safe for unhandled events + return True + + def _handle_request_received(self, eid, event, h2_connection): + headers = netlib.http.Headers([[k, v] for k, v in event.headers]) + self.streams[eid] = Http2SingleStreamLayer(self, h2_connection, eid, headers) + self.streams[eid].timestamp_start = time.time() + self.streams[eid].no_body = (event.stream_ended is not None) + if event.priority_updated is not None: + self.streams[eid].priority_exclusive = event.priority_updated.exclusive + self.streams[eid].priority_depends_on = event.priority_updated.depends_on + self.streams[eid].priority_weight = event.priority_updated.weight + self.streams[eid].handled_priority_event = event.priority_updated + self.streams[eid].start() + self.streams[eid].request_arrived.set() + return True + + def _handle_response_received(self, eid, event): + headers = netlib.http.Headers([[k, v] for k, v in event.headers]) + self.streams[eid].queued_data_length = 0 + self.streams[eid].timestamp_start = time.time() + self.streams[eid].response_headers = headers + self.streams[eid].response_arrived.set() + return True + + def _handle_data_received(self, eid, event, source_conn): + bsl = self.config.options.body_size_limit + if bsl and self.streams[eid].queued_data_length > bsl: + self.streams[eid].kill() + source_conn.h2.safe_reset_stream( + event.stream_id, + h2.errors.REFUSED_STREAM + ) + self.log("HTTP body too large. Limit is {}.".format(bsl), "info") + else: + self.streams[eid].data_queue.put(event.data) + self.streams[eid].queued_data_length += len(event.data) + source_conn.h2.safe_increment_flow_control( + event.stream_id, + event.flow_controlled_length + ) + return True + + def _handle_stream_ended(self, eid): + self.streams[eid].timestamp_end = time.time() + self.streams[eid].data_finished.set() + return True + + def _handle_stream_reset(self, eid, event, is_server, other_conn): + self.streams[eid].kill() + if eid in self.streams and event.error_code == h2.errors.CANCEL: + if is_server: + other_stream_id = self.streams[eid].client_stream_id + else: + other_stream_id = self.streams[eid].server_stream_id + if other_stream_id is not None: + other_conn.h2.safe_reset_stream(other_stream_id, event.error_code) + return True + + def _handle_remote_settings_changed(self, event, other_conn): + new_settings = dict([(key, cs.new_value) for (key, cs) in event.changed_settings.items()]) + other_conn.h2.safe_update_settings(new_settings) + return True + + def _handle_connection_terminated(self, event, is_server): + self.log("HTTP/2 connection terminated by {}: error code: {}, last stream id: {}, additional data: {}".format( + "server" if is_server else "client", + event.error_code, + event.last_stream_id, + event.additional_data), "info") + + if event.error_code != h2.errors.NO_ERROR: + # Something terrible has happened - kill everything! + self.client_conn.h2.close_connection( + error_code=event.error_code, + last_stream_id=event.last_stream_id, + additional_data=event.additional_data + ) + self.client_conn.send(self.client_conn.h2.data_to_send()) + self._kill_all_streams() + else: + """ + Do not immediately terminate the other connection. + Some streams might be still sending data to the client. + """ + return False + + def _handle_pushed_stream_received(self, event, h2_connection): + # pushed stream ids should be unique and not dependent on race conditions + # only the parent stream id must be looked up first + parent_eid = self.server_to_client_stream_ids[event.parent_stream_id] + with self.client_conn.h2.lock: + self.client_conn.h2.push_stream(parent_eid, event.pushed_stream_id, event.headers) + self.client_conn.send(self.client_conn.h2.data_to_send()) + + headers = netlib.http.Headers([[k, v] for k, v in event.headers]) + self.streams[event.pushed_stream_id] = Http2SingleStreamLayer(self, h2_connection, event.pushed_stream_id, headers) + self.streams[event.pushed_stream_id].timestamp_start = time.time() + self.streams[event.pushed_stream_id].pushed = True + self.streams[event.pushed_stream_id].parent_stream_id = parent_eid + self.streams[event.pushed_stream_id].timestamp_end = time.time() + self.streams[event.pushed_stream_id].request_arrived.set() + self.streams[event.pushed_stream_id].request_data_finished.set() + self.streams[event.pushed_stream_id].start() + return True + + def _handle_priority_updated(self, eid, event): + if eid in self.streams and self.streams[eid].handled_priority_event is event: + # this event was already handled during stream creation + # HeadersFrame + Priority information as RequestReceived + return True + + with self.server_conn.h2.lock: + mapped_stream_id = event.stream_id + if mapped_stream_id in self.streams and self.streams[mapped_stream_id].server_stream_id: + # if the stream is already up and running and was sent to the server, + # use the mapped server stream id to update priority information + mapped_stream_id = self.streams[mapped_stream_id].server_stream_id + + if eid in self.streams: + self.streams[eid].priority_exclusive = event.exclusive + self.streams[eid].priority_depends_on = event.depends_on + self.streams[eid].priority_weight = event.weight + + self.server_conn.h2.prioritize( + mapped_stream_id, + weight=event.weight, + depends_on=self._map_depends_on_stream_id(mapped_stream_id, event.depends_on), + exclusive=event.exclusive + ) + self.server_conn.send(self.server_conn.h2.data_to_send()) + return True + + def _map_depends_on_stream_id(self, stream_id, depends_on): + mapped_depends_on = depends_on + if mapped_depends_on in self.streams and self.streams[mapped_depends_on].server_stream_id: + # if the depends-on-stream is already up and running and was sent to the server + # use the mapped server stream id to update priority information + mapped_depends_on = self.streams[mapped_depends_on].server_stream_id + if stream_id == mapped_depends_on: + # looks like one of the streams wasn't opened yet + # prevent self-dependent streams which result in ProtocolError + mapped_depends_on += 2 + return mapped_depends_on + + def _cleanup_streams(self): + death_time = time.time() - 10 + + zombie_streams = [(stream_id, stream) for stream_id, stream in list(self.streams.items()) if stream.zombie] + outdated_streams = [stream_id for stream_id, stream in zombie_streams if stream.zombie <= death_time] + + for stream_id in outdated_streams: # pragma: no cover + self.streams.pop(stream_id, None) + + def _kill_all_streams(self): + for stream in self.streams.values(): + stream.kill() + + def __call__(self): + self._initiate_server_conn() + self._complete_handshake() + + client = self.client_conn.connection + server = self.server_conn.connection + conns = [client, server] + + try: + while True: + r = tcp.ssl_read_select(conns, 1) + for conn in r: + source_conn = self.client_conn if conn == client else self.server_conn + other_conn = self.server_conn if conn == client else self.client_conn + is_server = (conn == self.server_conn.connection) + + with source_conn.h2.lock: + try: + raw_frame = b''.join(http2.read_raw_frame(source_conn.rfile)) + except: + # read frame failed: connection closed + self._kill_all_streams() + return + + if source_conn.h2.state_machine.state == h2.connection.ConnectionState.CLOSED: + self.log("HTTP/2 connection entered closed state already", "debug") + return + + incoming_events = source_conn.h2.receive_data(raw_frame) + source_conn.send(source_conn.h2.data_to_send()) + + for event in incoming_events: + if not self._handle_event(event, source_conn, other_conn, is_server): + # connection terminated: GoAway + self._kill_all_streams() + return + + self._cleanup_streams() + except Exception as e: # pragma: no cover + self.log(repr(e), "info") + self.log(traceback.format_exc(), "debug") + self._kill_all_streams() + + +def detect_zombie_stream(func): + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + self.raise_zombie() + result = func(self, *args, **kwargs) + self.raise_zombie() + return result + + return wrapper + + +class Http2SingleStreamLayer(httpbase._HttpTransmissionLayer, basethread.BaseThread): + + def __init__(self, ctx, h2_connection, stream_id, request_headers): + super().__init__( + ctx, name="Http2SingleStreamLayer-{}".format(stream_id) + ) + self.h2_connection = h2_connection + self.zombie = None + self.client_stream_id = stream_id + self.server_stream_id = None + self.request_headers = request_headers + self.response_headers = None + self.pushed = False + + self.timestamp_start = None + self.timestamp_end = None + + self.request_arrived = threading.Event() + self.request_data_queue = queue.Queue() + self.request_queued_data_length = 0 + self.request_data_finished = threading.Event() + + self.response_arrived = threading.Event() + self.response_data_queue = queue.Queue() + self.response_queued_data_length = 0 + self.response_data_finished = threading.Event() + + self.no_body = False + + self.priority_exclusive = None + self.priority_depends_on = None + self.priority_weight = None + self.handled_priority_event = None + + def kill(self): + if not self.zombie: + self.zombie = time.time() + self.request_data_finished.set() + self.request_arrived.set() + self.response_arrived.set() + self.response_data_finished.set() + + def connect(self): # pragma: no cover + raise exceptions.Http2ProtocolException("HTTP2 layer should already have a connection.") + + def disconnect(self): # pragma: no cover + raise exceptions.Http2ProtocolException("Cannot dis- or reconnect in HTTP2 connections.") + + def set_server(self, address): # pragma: no cover + raise exceptions.SetServerNotAllowedException(repr(address)) + + def check_close_connection(self, flow): + # This layer only handles a single stream. + # RFC 7540 8.1: An HTTP request/response exchange fully consumes a single stream. + return True + + @property + def data_queue(self): + if self.response_arrived.is_set(): + return self.response_data_queue + else: + return self.request_data_queue + + @property + def queued_data_length(self): + if self.response_arrived.is_set(): + return self.response_queued_data_length + else: + return self.request_queued_data_length + + @property + def data_finished(self): + if self.response_arrived.is_set(): + return self.response_data_finished + else: + return self.request_data_finished + + @queued_data_length.setter + def queued_data_length(self, v): + self.request_queued_data_length = v + + def raise_zombie(self, pre_command=None): + connection_closed = self.h2_connection.state_machine.state == h2.connection.ConnectionState.CLOSED + if self.zombie is not None or not hasattr(self.server_conn, 'h2') or connection_closed: + if pre_command is not None: + pre_command() + raise exceptions.Http2ZombieException("Connection already dead") + + @detect_zombie_stream + def read_request_headers(self): + self.request_arrived.wait() + self.raise_zombie() + first_line_format, method, scheme, host, port, path = http2.parse_headers(self.request_headers) + return http.HTTPRequest( + first_line_format, + method, + scheme, + host, + port, + path, + b"HTTP/2.0", + self.request_headers, + None, + timestamp_start=self.timestamp_start, + timestamp_end=self.timestamp_end, + ) + + @detect_zombie_stream + def read_request_body(self, request): + self.request_data_finished.wait() + data = [] + while self.request_data_queue.qsize() > 0: + data.append(self.request_data_queue.get()) + return data + + @detect_zombie_stream + def send_request(self, message): + if self.pushed: + # nothing to do here + return + + while True: + self.raise_zombie() + self.server_conn.h2.lock.acquire() + + max_streams = self.server_conn.h2.remote_settings.max_concurrent_streams + if self.server_conn.h2.open_outbound_streams + 1 >= max_streams: + # wait until we get a free slot for a new outgoing stream + self.server_conn.h2.lock.release() + time.sleep(0.1) + continue + + # keep the lock + break + + # We must not assign a stream id if we are already a zombie. + self.raise_zombie() + + self.server_stream_id = self.server_conn.h2.get_next_available_stream_id() + self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id + + headers = message.headers.copy() + headers.insert(0, ":path", message.path) + headers.insert(0, ":method", message.method) + headers.insert(0, ":scheme", message.scheme) + + priority_exclusive = None + priority_depends_on = None + priority_weight = None + if self.handled_priority_event: + # only send priority information if they actually came with the original HeadersFrame + # and not if they got updated before/after with a PriorityFrame + priority_exclusive = self.priority_exclusive + priority_depends_on = self._map_depends_on_stream_id(self.server_stream_id, self.priority_depends_on) + priority_weight = self.priority_weight + + try: + self.server_conn.h2.safe_send_headers( + self.raise_zombie, + self.server_stream_id, + headers, + end_stream=self.no_body, + priority_exclusive=priority_exclusive, + priority_depends_on=priority_depends_on, + priority_weight=priority_weight, + ) + except Exception as e: # pragma: no cover + raise e + finally: + self.raise_zombie() + self.server_conn.h2.lock.release() + + if not self.no_body: + self.server_conn.h2.safe_send_body( + self.raise_zombie, + self.server_stream_id, + [message.body] + ) + + @detect_zombie_stream + def read_response_headers(self): + self.response_arrived.wait() + + self.raise_zombie() + + status_code = int(self.response_headers.get(':status', 502)) + headers = self.response_headers.copy() + headers.pop(":status", None) + + return http.HTTPResponse( + http_version=b"HTTP/2.0", + status_code=status_code, + reason=b'', + headers=headers, + content=None, + timestamp_start=self.timestamp_start, + timestamp_end=self.timestamp_end, + ) + + @detect_zombie_stream + def read_response_body(self, request, response): + while True: + try: + yield self.response_data_queue.get(timeout=1) + except queue.Empty: # pragma: no cover + pass + if self.response_data_finished.is_set(): + self.raise_zombie() + while self.response_data_queue.qsize() > 0: + yield self.response_data_queue.get() + break + self.raise_zombie() + + @detect_zombie_stream + def send_response_headers(self, response): + headers = response.headers.copy() + headers.insert(0, ":status", str(response.status_code)) + for forbidden_header in h2.utilities.CONNECTION_HEADERS: + if forbidden_header in headers: + del headers[forbidden_header] + with self.client_conn.h2.lock: + self.client_conn.h2.safe_send_headers( + self.raise_zombie, + self.client_stream_id, + headers + ) + + @detect_zombie_stream + def send_response_body(self, _response, chunks): + self.client_conn.h2.safe_send_body( + self.raise_zombie, + self.client_stream_id, + chunks + ) + + def __call__(self): + raise EnvironmentError('Http2SingleStreamLayer must be run as thread') + + def run(self): + layer = httpbase.HttpLayer(self, self.mode) + + try: + layer() + except exceptions.Http2ZombieException as e: # pragma: no cover + pass + except exceptions.ProtocolException as e: # pragma: no cover + self.log(repr(e), "info") + self.log(traceback.format_exc(), "debug") + except exceptions.SetServerNotAllowedException as e: # pragma: no cover + self.log("Changing the Host server for HTTP/2 connections not allowed: {}".format(e), "info") + except exceptions.Kill: + self.log("Connection killed", "info") + + self.kill() diff --git a/mitmproxy/proxy/protocol/http_replay.py b/mitmproxy/proxy/protocol/http_replay.py new file mode 100644 index 00000000..bf0697be --- /dev/null +++ b/mitmproxy/proxy/protocol/http_replay.py @@ -0,0 +1,120 @@ +import traceback + +import netlib.exceptions +from mitmproxy import log +from mitmproxy import controller +from mitmproxy import exceptions +from mitmproxy import http +from mitmproxy import flow +from mitmproxy import connections +from netlib.http import http1 +from netlib import basethread + + +# TODO: Doesn't really belong into mitmproxy.proxy.protocol... + + +class RequestReplayThread(basethread.BaseThread): + name = "RequestReplayThread" + + def __init__(self, config, f, event_queue, should_exit): + """ + event_queue can be a queue or None, if no scripthooks should be + processed. + """ + self.config, self.f = config, f + f.live = True + if event_queue: + self.channel = controller.Channel(event_queue, should_exit) + else: + self.channel = None + super().__init__( + "RequestReplay (%s)" % f.request.url + ) + + def run(self): + r = self.f.request + first_line_format_backup = r.first_line_format + server = None + try: + self.f.response = None + + # If we have a channel, run script hooks. + if self.channel: + request_reply = self.channel.ask("request", self.f) + if isinstance(request_reply, http.HTTPResponse): + self.f.response = request_reply + + if not self.f.response: + # In all modes, we directly connect to the server displayed + if self.config.options.mode == "upstream": + server_address = self.config.upstream_server.address + server = connections.ServerConnection(server_address, (self.config.options.listen_host, 0)) + server.connect() + if r.scheme == "https": + connect_request = http.make_connect_request((r.data.host, r.port)) + server.wfile.write(http1.assemble_request(connect_request)) + server.wfile.flush() + resp = http1.read_response( + server.rfile, + connect_request, + body_size_limit=self.config.options.body_size_limit + ) + if resp.status_code != 200: + raise exceptions.ReplayException("Upstream server refuses CONNECT request") + server.establish_ssl( + self.config.clientcerts, + sni=self.f.server_conn.sni + ) + r.first_line_format = "relative" + else: + r.first_line_format = "absolute" + else: + server_address = (r.host, r.port) + server = connections.ServerConnection( + server_address, + (self.config.options.listen_host, 0) + ) + server.connect() + if r.scheme == "https": + server.establish_ssl( + self.config.clientcerts, + sni=self.f.server_conn.sni + ) + r.first_line_format = "relative" + + server.wfile.write(http1.assemble_request(r)) + server.wfile.flush() + self.f.server_conn = server + self.f.response = http.HTTPResponse.wrap( + http1.read_response( + server.rfile, + r, + body_size_limit=self.config.options.body_size_limit + ) + ) + if self.channel: + response_reply = self.channel.ask("response", self.f) + if response_reply == exceptions.Kill: + raise exceptions.Kill() + except (exceptions.ReplayException, netlib.exceptions.NetlibException) as e: + self.f.error = flow.Error(str(e)) + if self.channel: + self.channel.ask("error", self.f) + except exceptions.Kill: + # Kill should only be raised if there's a channel in the + # first place. + self.channel.tell( + "log", + log.LogEntry("Connection killed", "info") + ) + except Exception: + self.channel.tell( + "log", + log.LogEntry(traceback.format_exc(), "error") + ) + finally: + r.first_line_format = first_line_format_backup + self.f.live = False + if server: + server.finish() diff --git a/mitmproxy/proxy/protocol/rawtcp.py b/mitmproxy/proxy/protocol/rawtcp.py new file mode 100644 index 00000000..513b90b3 --- /dev/null +++ b/mitmproxy/proxy/protocol/rawtcp.py @@ -0,0 +1,65 @@ +import socket + +from OpenSSL import SSL + +import netlib.exceptions +import netlib.tcp +from mitmproxy import tcp +from mitmproxy import flow +from mitmproxy.proxy.protocol import base + + +class RawTCPLayer(base.Layer): + chunk_size = 4096 + + def __init__(self, ctx, ignore=False): + self.ignore = ignore + super().__init__(ctx) + + def __call__(self): + self.connect() + + if not self.ignore: + f = tcp.TCPFlow(self.client_conn, self.server_conn, self) + self.channel.ask("tcp_start", f) + + buf = memoryview(bytearray(self.chunk_size)) + + client = self.client_conn.connection + server = self.server_conn.connection + conns = [client, server] + + try: + while not self.channel.should_exit.is_set(): + r = netlib.tcp.ssl_read_select(conns, 10) + for conn in r: + dst = server if conn == client else client + + size = conn.recv_into(buf, self.chunk_size) + if not size: + conns.remove(conn) + # Shutdown connection to the other peer + if isinstance(conn, SSL.Connection): + # We can't half-close a connection, so we just close everything here. + # Sockets will be cleaned up on a higher level. + return + else: + dst.shutdown(socket.SHUT_WR) + + if len(conns) == 0: + return + continue + + tcp_message = tcp.TCPMessage(dst == server, buf[:size].tobytes()) + if not self.ignore: + f.messages.append(tcp_message) + self.channel.ask("tcp_message", f) + dst.sendall(tcp_message.content) + + except (socket.error, netlib.exceptions.TcpException, SSL.Error) as e: + if not self.ignore: + f.error = flow.Error("TCP connection closed unexpectedly: {}".format(repr(e))) + self.channel.tell("tcp_error", f) + finally: + if not self.ignore: + self.channel.tell("tcp_end", f) diff --git a/mitmproxy/proxy/protocol/tls.py b/mitmproxy/proxy/protocol/tls.py new file mode 100644 index 00000000..940ab9ea --- /dev/null +++ b/mitmproxy/proxy/protocol/tls.py @@ -0,0 +1,581 @@ +import struct +from typing import Optional # noqa +from typing import Union + +import construct +import netlib.exceptions +from mitmproxy import exceptions +from mitmproxy.contrib.tls import _constructs +from mitmproxy.proxy.protocol import base +from netlib import utils + + +# taken from https://testssl.sh/openssl-rfc.mappping.html +CIPHER_ID_NAME_MAP = { + 0x00: 'NULL-MD5', + 0x01: 'NULL-MD5', + 0x02: 'NULL-SHA', + 0x03: 'EXP-RC4-MD5', + 0x04: 'RC4-MD5', + 0x05: 'RC4-SHA', + 0x06: 'EXP-RC2-CBC-MD5', + 0x07: 'IDEA-CBC-SHA', + 0x08: 'EXP-DES-CBC-SHA', + 0x09: 'DES-CBC-SHA', + 0x0a: 'DES-CBC3-SHA', + 0x0b: 'EXP-DH-DSS-DES-CBC-SHA', + 0x0c: 'DH-DSS-DES-CBC-SHA', + 0x0d: 'DH-DSS-DES-CBC3-SHA', + 0x0e: 'EXP-DH-RSA-DES-CBC-SHA', + 0x0f: 'DH-RSA-DES-CBC-SHA', + 0x10: 'DH-RSA-DES-CBC3-SHA', + 0x11: 'EXP-EDH-DSS-DES-CBC-SHA', + 0x12: 'EDH-DSS-DES-CBC-SHA', + 0x13: 'EDH-DSS-DES-CBC3-SHA', + 0x14: 'EXP-EDH-RSA-DES-CBC-SHA', + 0x15: 'EDH-RSA-DES-CBC-SHA', + 0x16: 'EDH-RSA-DES-CBC3-SHA', + 0x17: 'EXP-ADH-RC4-MD5', + 0x18: 'ADH-RC4-MD5', + 0x19: 'EXP-ADH-DES-CBC-SHA', + 0x1a: 'ADH-DES-CBC-SHA', + 0x1b: 'ADH-DES-CBC3-SHA', + # 0x1c: , + # 0x1d: , + 0x1e: 'KRB5-DES-CBC-SHA', + 0x1f: 'KRB5-DES-CBC3-SHA', + 0x20: 'KRB5-RC4-SHA', + 0x21: 'KRB5-IDEA-CBC-SHA', + 0x22: 'KRB5-DES-CBC-MD5', + 0x23: 'KRB5-DES-CBC3-MD5', + 0x24: 'KRB5-RC4-MD5', + 0x25: 'KRB5-IDEA-CBC-MD5', + 0x26: 'EXP-KRB5-DES-CBC-SHA', + 0x27: 'EXP-KRB5-RC2-CBC-SHA', + 0x28: 'EXP-KRB5-RC4-SHA', + 0x29: 'EXP-KRB5-DES-CBC-MD5', + 0x2a: 'EXP-KRB5-RC2-CBC-MD5', + 0x2b: 'EXP-KRB5-RC4-MD5', + 0x2f: 'AES128-SHA', + 0x30: 'DH-DSS-AES128-SHA', + 0x31: 'DH-RSA-AES128-SHA', + 0x32: 'DHE-DSS-AES128-SHA', + 0x33: 'DHE-RSA-AES128-SHA', + 0x34: 'ADH-AES128-SHA', + 0x35: 'AES256-SHA', + 0x36: 'DH-DSS-AES256-SHA', + 0x37: 'DH-RSA-AES256-SHA', + 0x38: 'DHE-DSS-AES256-SHA', + 0x39: 'DHE-RSA-AES256-SHA', + 0x3a: 'ADH-AES256-SHA', + 0x3b: 'NULL-SHA256', + 0x3c: 'AES128-SHA256', + 0x3d: 'AES256-SHA256', + 0x3e: 'DH-DSS-AES128-SHA256', + 0x3f: 'DH-RSA-AES128-SHA256', + 0x40: 'DHE-DSS-AES128-SHA256', + 0x41: 'CAMELLIA128-SHA', + 0x42: 'DH-DSS-CAMELLIA128-SHA', + 0x43: 'DH-RSA-CAMELLIA128-SHA', + 0x44: 'DHE-DSS-CAMELLIA128-SHA', + 0x45: 'DHE-RSA-CAMELLIA128-SHA', + 0x46: 'ADH-CAMELLIA128-SHA', + 0x62: 'EXP1024-DES-CBC-SHA', + 0x63: 'EXP1024-DHE-DSS-DES-CBC-SHA', + 0x64: 'EXP1024-RC4-SHA', + 0x65: 'EXP1024-DHE-DSS-RC4-SHA', + 0x66: 'DHE-DSS-RC4-SHA', + 0x67: 'DHE-RSA-AES128-SHA256', + 0x68: 'DH-DSS-AES256-SHA256', + 0x69: 'DH-RSA-AES256-SHA256', + 0x6a: 'DHE-DSS-AES256-SHA256', + 0x6b: 'DHE-RSA-AES256-SHA256', + 0x6c: 'ADH-AES128-SHA256', + 0x6d: 'ADH-AES256-SHA256', + 0x80: 'GOST94-GOST89-GOST89', + 0x81: 'GOST2001-GOST89-GOST89', + 0x82: 'GOST94-NULL-GOST94', + 0x83: 'GOST2001-GOST89-GOST89', + 0x84: 'CAMELLIA256-SHA', + 0x85: 'DH-DSS-CAMELLIA256-SHA', + 0x86: 'DH-RSA-CAMELLIA256-SHA', + 0x87: 'DHE-DSS-CAMELLIA256-SHA', + 0x88: 'DHE-RSA-CAMELLIA256-SHA', + 0x89: 'ADH-CAMELLIA256-SHA', + 0x8a: 'PSK-RC4-SHA', + 0x8b: 'PSK-3DES-EDE-CBC-SHA', + 0x8c: 'PSK-AES128-CBC-SHA', + 0x8d: 'PSK-AES256-CBC-SHA', + # 0x8e: , + # 0x8f: , + # 0x90: , + # 0x91: , + # 0x92: , + # 0x93: , + # 0x94: , + # 0x95: , + 0x96: 'SEED-SHA', + 0x97: 'DH-DSS-SEED-SHA', + 0x98: 'DH-RSA-SEED-SHA', + 0x99: 'DHE-DSS-SEED-SHA', + 0x9a: 'DHE-RSA-SEED-SHA', + 0x9b: 'ADH-SEED-SHA', + 0x9c: 'AES128-GCM-SHA256', + 0x9d: 'AES256-GCM-SHA384', + 0x9e: 'DHE-RSA-AES128-GCM-SHA256', + 0x9f: 'DHE-RSA-AES256-GCM-SHA384', + 0xa0: 'DH-RSA-AES128-GCM-SHA256', + 0xa1: 'DH-RSA-AES256-GCM-SHA384', + 0xa2: 'DHE-DSS-AES128-GCM-SHA256', + 0xa3: 'DHE-DSS-AES256-GCM-SHA384', + 0xa4: 'DH-DSS-AES128-GCM-SHA256', + 0xa5: 'DH-DSS-AES256-GCM-SHA384', + 0xa6: 'ADH-AES128-GCM-SHA256', + 0xa7: 'ADH-AES256-GCM-SHA384', + 0x5600: 'TLS_FALLBACK_SCSV', + 0xc001: 'ECDH-ECDSA-NULL-SHA', + 0xc002: 'ECDH-ECDSA-RC4-SHA', + 0xc003: 'ECDH-ECDSA-DES-CBC3-SHA', + 0xc004: 'ECDH-ECDSA-AES128-SHA', + 0xc005: 'ECDH-ECDSA-AES256-SHA', + 0xc006: 'ECDHE-ECDSA-NULL-SHA', + 0xc007: 'ECDHE-ECDSA-RC4-SHA', + 0xc008: 'ECDHE-ECDSA-DES-CBC3-SHA', + 0xc009: 'ECDHE-ECDSA-AES128-SHA', + 0xc00a: 'ECDHE-ECDSA-AES256-SHA', + 0xc00b: 'ECDH-RSA-NULL-SHA', + 0xc00c: 'ECDH-RSA-RC4-SHA', + 0xc00d: 'ECDH-RSA-DES-CBC3-SHA', + 0xc00e: 'ECDH-RSA-AES128-SHA', + 0xc00f: 'ECDH-RSA-AES256-SHA', + 0xc010: 'ECDHE-RSA-NULL-SHA', + 0xc011: 'ECDHE-RSA-RC4-SHA', + 0xc012: 'ECDHE-RSA-DES-CBC3-SHA', + 0xc013: 'ECDHE-RSA-AES128-SHA', + 0xc014: 'ECDHE-RSA-AES256-SHA', + 0xc015: 'AECDH-NULL-SHA', + 0xc016: 'AECDH-RC4-SHA', + 0xc017: 'AECDH-DES-CBC3-SHA', + 0xc018: 'AECDH-AES128-SHA', + 0xc019: 'AECDH-AES256-SHA', + 0xc01a: 'SRP-3DES-EDE-CBC-SHA', + 0xc01b: 'SRP-RSA-3DES-EDE-CBC-SHA', + 0xc01c: 'SRP-DSS-3DES-EDE-CBC-SHA', + 0xc01d: 'SRP-AES-128-CBC-SHA', + 0xc01e: 'SRP-RSA-AES-128-CBC-SHA', + 0xc01f: 'SRP-DSS-AES-128-CBC-SHA', + 0xc020: 'SRP-AES-256-CBC-SHA', + 0xc021: 'SRP-RSA-AES-256-CBC-SHA', + 0xc022: 'SRP-DSS-AES-256-CBC-SHA', + 0xc023: 'ECDHE-ECDSA-AES128-SHA256', + 0xc024: 'ECDHE-ECDSA-AES256-SHA384', + 0xc025: 'ECDH-ECDSA-AES128-SHA256', + 0xc026: 'ECDH-ECDSA-AES256-SHA384', + 0xc027: 'ECDHE-RSA-AES128-SHA256', + 0xc028: 'ECDHE-RSA-AES256-SHA384', + 0xc029: 'ECDH-RSA-AES128-SHA256', + 0xc02a: 'ECDH-RSA-AES256-SHA384', + 0xc02b: 'ECDHE-ECDSA-AES128-GCM-SHA256', + 0xc02c: 'ECDHE-ECDSA-AES256-GCM-SHA384', + 0xc02d: 'ECDH-ECDSA-AES128-GCM-SHA256', + 0xc02e: 'ECDH-ECDSA-AES256-GCM-SHA384', + 0xc02f: 'ECDHE-RSA-AES128-GCM-SHA256', + 0xc030: 'ECDHE-RSA-AES256-GCM-SHA384', + 0xc031: 'ECDH-RSA-AES128-GCM-SHA256', + 0xc032: 'ECDH-RSA-AES256-GCM-SHA384', + 0xcc13: 'ECDHE-RSA-CHACHA20-POLY1305', + 0xcc14: 'ECDHE-ECDSA-CHACHA20-POLY1305', + 0xcc15: 'DHE-RSA-CHACHA20-POLY1305', + 0xff00: 'GOST-MD5', + 0xff01: 'GOST-GOST94', + 0xff02: 'GOST-GOST89MAC', + 0xff03: 'GOST-GOST89STREAM', + 0x010080: 'RC4-MD5', + 0x020080: 'EXP-RC4-MD5', + 0x030080: 'RC2-CBC-MD5', + 0x040080: 'EXP-RC2-CBC-MD5', + 0x050080: 'IDEA-CBC-MD5', + 0x060040: 'DES-CBC-MD5', + 0x0700c0: 'DES-CBC3-MD5', + 0x080080: 'RC4-64-MD5', +} + + +def is_tls_record_magic(d): + """ + Returns: + True, if the passed bytes start with the TLS record magic bytes. + False, otherwise. + """ + d = d[:3] + + # TLS ClientHello magic, works for SSLv3, TLSv1.0, TLSv1.1, TLSv1.2 + # http://www.moserware.com/2009/06/first-few-milliseconds-of-https.html#client-hello + return ( + len(d) == 3 and + d[0] == 0x16 and + d[1] == 0x03 and + 0x0 <= d[2] <= 0x03 + ) + + +def get_client_hello(client_conn): + """ + Peek into the socket and read all records that contain the initial client hello message. + + client_conn: + The :py:class:`client connection <mitmproxy.connections.ClientConnection>`. + + Returns: + The raw handshake packet bytes, without TLS record header(s). + """ + client_hello = b"" + client_hello_size = 1 + offset = 0 + while len(client_hello) < client_hello_size: + record_header = client_conn.rfile.peek(offset + 5)[offset:] + if not is_tls_record_magic(record_header) or len(record_header) != 5: + raise exceptions.TlsProtocolException('Expected TLS record, got "%s" instead.' % record_header) + record_size = struct.unpack("!H", record_header[3:])[0] + 5 + record_body = client_conn.rfile.peek(offset + record_size)[offset + 5:] + if len(record_body) != record_size - 5: + raise exceptions.TlsProtocolException("Unexpected EOF in TLS handshake: %s" % record_body) + client_hello += record_body + offset += record_size + client_hello_size = struct.unpack("!I", b'\x00' + client_hello[1:4])[0] + 4 + return client_hello + + +class TlsClientHello: + + def __init__(self, raw_client_hello): + self._client_hello = _constructs.ClientHello.parse(raw_client_hello) + + def raw(self): + return self._client_hello + + @property + def cipher_suites(self): + return self._client_hello.cipher_suites.cipher_suites + + @property + def sni(self): + for extension in self._client_hello.extensions: + is_valid_sni_extension = ( + extension.type == 0x00 and + len(extension.server_names) == 1 and + extension.server_names[0].type == 0 and + utils.is_valid_host(extension.server_names[0].name) + ) + if is_valid_sni_extension: + return extension.server_names[0].name.decode("idna") + + @property + def alpn_protocols(self): + for extension in self._client_hello.extensions: + if extension.type == 0x10: + return list(extension.alpn_protocols) + + @classmethod + def from_client_conn(cls, client_conn): + """ + Peek into the connection, read the initial client hello and parse it to obtain ALPN values. + client_conn: + The :py:class:`client connection <mitmproxy.connections.ClientConnection>`. + Returns: + :py:class:`client hello <mitmproxy.proxy.protocol.tls.TlsClientHello>`. + """ + try: + raw_client_hello = get_client_hello(client_conn)[4:] # exclude handshake header. + except exceptions.ProtocolException as e: + raise exceptions.TlsProtocolException('Cannot read raw Client Hello: %s' % repr(e)) + + try: + return cls(raw_client_hello) + except construct.ConstructError as e: + raise exceptions.TlsProtocolException( + 'Cannot parse Client Hello: %s, Raw Client Hello: %s' % + (repr(e), raw_client_hello.encode("hex")) + ) + + def __repr__(self): + return "TlsClientHello( sni: %s alpn_protocols: %s, cipher_suites: %s)" % \ + (self.sni, self.alpn_protocols, self.cipher_suites) + + +class TlsLayer(base.Layer): + + """ + The TLS layer implements transparent TLS connections. + + It exposes the following API to child layers: + + - :py:meth:`set_server_tls` to modify TLS settings for the server connection. + - :py:attr:`server_tls`, :py:attr:`server_sni` as read-only attributes describing the current TLS settings for + the server connection. + """ + + def __init__(self, ctx, client_tls, server_tls, custom_server_sni = None): + super().__init__(ctx) + self._client_tls = client_tls + self._server_tls = server_tls + + self._custom_server_sni = custom_server_sni + self._client_hello = None # type: Optional[TlsClientHello] + + def __call__(self): + """ + The strategy for establishing TLS is as follows: + First, we determine whether we need the server cert to establish ssl with the client. + If so, we first connect to the server and then to the client. + If not, we only connect to the client and do the server handshake lazily. + + An additional complexity is that we need to mirror SNI and ALPN from the client when connecting to the server. + We manually peek into the connection and parse the ClientHello message to obtain these values. + """ + if self._client_tls: + # Peek into the connection, read the initial client hello and parse it to obtain SNI and ALPN values. + try: + self._client_hello = TlsClientHello.from_client_conn(self.client_conn) + except exceptions.TlsProtocolException as e: + self.log("Cannot parse Client Hello: %s" % repr(e), "error") + + # Do we need to do a server handshake now? + # There are two reasons why we would want to establish TLS with the server now: + # 1. If we already have an existing server connection and server_tls is True, + # we need to establish TLS now because .connect() will not be called anymore. + # 2. We may need information from the server connection for the client handshake. + # + # A couple of factors influence (2): + # 2.1 There actually is (or will be) a TLS-enabled upstream connection + # 2.2 An upstream connection is not wanted by the user if --no-upstream-cert is passed. + # 2.3 An upstream connection is implied by add_upstream_certs_to_client_chain + # 2.4 The client wants to negotiate an alternative protocol in its handshake, we need to find out + # what is supported by the server + # 2.5 The client did not sent a SNI value, we don't know the certificate subject. + client_tls_requires_server_connection = ( + self._server_tls and + not self.config.options.no_upstream_cert and + ( + self.config.options.add_upstream_certs_to_client_chain or + self._client_tls and ( + self._client_hello.alpn_protocols or + not self._client_hello.sni + ) + ) + ) + establish_server_tls_now = ( + (self.server_conn and self._server_tls) or + client_tls_requires_server_connection + ) + + if self._client_tls and establish_server_tls_now: + self._establish_tls_with_client_and_server() + elif self._client_tls: + self._establish_tls_with_client() + elif establish_server_tls_now: + self._establish_tls_with_server() + + layer = self.ctx.next_layer(self) + layer() + + def __repr__(self): # pragma: no cover + if self._client_tls and self._server_tls: + return "TlsLayer(client and server)" + elif self._client_tls: + return "TlsLayer(client)" + elif self._server_tls: + return "TlsLayer(server)" + else: + return "TlsLayer(inactive)" + + def connect(self): + if not self.server_conn: + self.ctx.connect() + if self._server_tls and not self.server_conn.tls_established: + self._establish_tls_with_server() + + def set_server_tls(self, server_tls: bool, sni: Union[str, None, bool]=None) -> None: + """ + Set the TLS settings for the next server connection that will be established. + This function will not alter an existing connection. + + Args: + server_tls: Shall we establish TLS with the server? + sni: ``str`` for a custom SNI value, + ``None`` for the client SNI value, + ``False`` if no SNI value should be sent. + """ + self._server_tls = server_tls + self._custom_server_sni = sni + + @property + def server_tls(self): + """ + ``True``, if the next server connection that will be established should be upgraded to TLS. + """ + return self._server_tls + + @property + def server_sni(self): + """ + The Server Name Indication we want to send with the next server TLS handshake. + """ + if self._custom_server_sni is False: + return None + else: + return self._custom_server_sni or self._client_hello and self._client_hello.sni + + @property + def alpn_for_client_connection(self): + return self.server_conn.get_alpn_proto_negotiated() + + def __alpn_select_callback(self, conn_, options): + # This gets triggered if we haven't established an upstream connection yet. + default_alpn = b'http/1.1' + # alpn_preference = b'h2' + + if self.alpn_for_client_connection in options: + choice = bytes(self.alpn_for_client_connection) + elif default_alpn in options: + choice = bytes(default_alpn) + else: + choice = options[0] + self.log("ALPN for client: %s" % choice, "debug") + return choice + + def _establish_tls_with_client_and_server(self): + try: + self.ctx.connect() + self._establish_tls_with_server() + except Exception: + # If establishing TLS with the server fails, we try to establish TLS with the client nonetheless + # to send an error message over TLS. + try: + self._establish_tls_with_client() + except: + pass + raise + + self._establish_tls_with_client() + + def _establish_tls_with_client(self): + self.log("Establish TLS with client", "debug") + cert, key, chain_file = self._find_cert() + + if self.config.options.add_upstream_certs_to_client_chain: + extra_certs = self.server_conn.server_certs + else: + extra_certs = None + + try: + self.client_conn.convert_to_ssl( + cert, key, + method=self.config.openssl_method_client, + options=self.config.openssl_options_client, + cipher_list=self.config.options.ciphers_client, + dhparams=self.config.certstore.dhparams, + chain_file=chain_file, + alpn_select_callback=self.__alpn_select_callback, + extra_chain_certs=extra_certs, + ) + # Some TLS clients will not fail the handshake, + # but will immediately throw an "unexpected eof" error on the first read. + # The reason for this might be difficult to find, so we try to peek here to see if it + # raises ann error. + self.client_conn.rfile.peek(1) + except netlib.exceptions.TlsException as e: + raise exceptions.ClientHandshakeException( + "Cannot establish TLS with client (sni: {sni}): {e}".format( + sni=self._client_hello.sni, e=repr(e) + ), + self._client_hello.sni or repr(self.server_conn.address) + ) + + def _establish_tls_with_server(self): + self.log("Establish TLS with server", "debug") + try: + alpn = None + if self._client_tls: + if self._client_hello.alpn_protocols: + # We only support http/1.1 and h2. + # If the server only supports spdy (next to http/1.1), it may select that + # and mitmproxy would enter TCP passthrough mode, which we want to avoid. + alpn = [x for x in self._client_hello.alpn_protocols if not (x.startswith(b"h2-") or x.startswith(b"spdy"))] + if alpn and b"h2" in alpn and not self.config.options.http2: + alpn.remove(b"h2") + + ciphers_server = self.config.options.ciphers_server + if not ciphers_server and self._client_tls: + ciphers_server = [] + for id in self._client_hello.cipher_suites: + if id in CIPHER_ID_NAME_MAP.keys(): + ciphers_server.append(CIPHER_ID_NAME_MAP[id]) + ciphers_server = ':'.join(ciphers_server) + + self.server_conn.establish_ssl( + self.config.clientcerts, + self.server_sni, + method=self.config.openssl_method_server, + options=self.config.openssl_options_server, + verify_options=self.config.openssl_verification_mode_server, + ca_path=self.config.options.ssl_verify_upstream_trusted_cadir, + ca_pemfile=self.config.options.ssl_verify_upstream_trusted_ca, + cipher_list=ciphers_server, + alpn_protos=alpn, + ) + tls_cert_err = self.server_conn.ssl_verification_error + if tls_cert_err is not None: + self.log(str(tls_cert_err), "warn") + self.log("Ignoring server verification error, continuing with connection", "warn") + except netlib.exceptions.InvalidCertificateException as e: + raise exceptions.InvalidServerCertificate(str(e)) + except netlib.exceptions.TlsException as e: + raise exceptions.TlsProtocolException( + "Cannot establish TLS with {address} (sni: {sni}): {e}".format( + address=repr(self.server_conn.address), + sni=self.server_sni, + e=repr(e) + ) + ) + + proto = self.alpn_for_client_connection.decode() if self.alpn_for_client_connection else '-' + self.log("ALPN selected by server: {}".format(proto), "debug") + + def _find_cert(self): + """ + This function determines the Common Name (CN) and Subject Alternative Names (SANs) + our certificate should have and then fetches a matching cert from the certstore. + """ + host = None + sans = set() + + # In normal operation, the server address should always be known at this point. + # However, we may just want to establish TLS so that we can send an error message to the client, + # in which case the address can be None. + if self.server_conn.address: + host = self.server_conn.address.host.encode("idna") + + # Should we incorporate information from the server certificate? + use_upstream_cert = ( + self.server_conn and + self.server_conn.tls_established and + (not self.config.options.no_upstream_cert) + ) + if use_upstream_cert: + upstream_cert = self.server_conn.cert + sans.update(upstream_cert.altnames) + if upstream_cert.cn: + sans.add(host) + host = upstream_cert.cn.decode("utf8").encode("idna") + # Also add SNI values. + if self._client_hello.sni: + sans.add(self._client_hello.sni.encode("idna")) + if self._custom_server_sni: + sans.add(self._custom_server_sni.encode("idna")) + + # RFC 2818: If a subjectAltName extension of type dNSName is present, that MUST be used as the identity. + # In other words, the Common Name is irrelevant then. + if host: + sans.add(host) + return self.config.certstore.get_cert(host, list(sans)) diff --git a/mitmproxy/proxy/protocol/websockets.py b/mitmproxy/proxy/protocol/websockets.py new file mode 100644 index 00000000..816ec92d --- /dev/null +++ b/mitmproxy/proxy/protocol/websockets.py @@ -0,0 +1,112 @@ +import netlib.exceptions +import socket +import struct +from OpenSSL import SSL +from mitmproxy import exceptions +from mitmproxy.proxy.protocol import base +from netlib import strutils +from netlib import tcp +from netlib import websockets + + +class WebSocketsLayer(base.Layer): + """ + WebSockets layer to intercept, modify, and forward WebSockets connections + + Only version 13 is supported (as specified in RFC6455) + Only HTTP/1.1-initiated connections are supported. + + The client starts by sending an Upgrade-request. + In order to determine the handshake and negotiate the correct protocol + and extensions, the Upgrade-request is forwarded to the server. + The response from the server is then parsed and negotiated settings are extracted. + Finally the handshake is completed by forwarding the server-response to the client. + After that, only WebSockets frames are exchanged. + + PING/PONG frames pass through and must be answered by the other endpoint. + + CLOSE frames are forwarded before this WebSocketsLayer terminates. + + This layer is transparent to any negotiated extensions. + This layer is transparent to any negotiated subprotocols. + Only raw frames are forwarded to the other endpoint. + """ + + def __init__(self, ctx, flow): + super().__init__(ctx) + self._flow = flow + + self.client_key = websockets.get_client_key(self._flow.request.headers) + self.client_protocol = websockets.get_protocol(self._flow.request.headers) + self.client_extensions = websockets.get_extensions(self._flow.request.headers) + + self.server_accept = websockets.get_server_accept(self._flow.response.headers) + self.server_protocol = websockets.get_protocol(self._flow.response.headers) + self.server_extensions = websockets.get_extensions(self._flow.response.headers) + + def _handle_frame(self, frame, source_conn, other_conn, is_server): + sender = "server" if is_server else "client" + self.log( + "WebSockets Frame received from {}".format(sender), + "debug", + [repr(frame)] + ) + + if frame.header.opcode & 0x8 == 0: + self.log( + "{direction} websocket {direction} {server}".format( + server=repr(self.server_conn.address), + direction="<-" if is_server else "->", + ), + "info", + strutils.bytes_to_escaped_str(frame.payload, keep_spacing=True).splitlines() + ) + # forward the data frame to the other side + other_conn.send(bytes(frame)) + elif frame.header.opcode in (websockets.OPCODE.PING, websockets.OPCODE.PONG): + # just forward the ping/pong to the other side + other_conn.send(bytes(frame)) + elif frame.header.opcode == websockets.OPCODE.CLOSE: + code = '(status code missing)' + msg = None + reason = '(message missing)' + if len(frame.payload) >= 2: + code, = struct.unpack('!H', frame.payload[:2]) + msg = websockets.CLOSE_REASON.get_name(code, default='unknown status code') + if len(frame.payload) > 2: + reason = frame.payload[2:] + self.log("WebSockets connection closed by {}: {} {}, {}".format(sender, code, msg, reason), "info") + + other_conn.send(bytes(frame)) + # close the connection + return False + else: + self.log("Unknown WebSockets frame received from {}".format(sender), "info", [repr(frame)]) + # unknown frame - just forward it + other_conn.send(bytes(frame)) + + # continue the connection + return True + + def __call__(self): + client = self.client_conn.connection + server = self.server_conn.connection + conns = [client, server] + + try: + while not self.channel.should_exit.is_set(): + r = tcp.ssl_read_select(conns, 1) + for conn in r: + source_conn = self.client_conn if conn == client else self.server_conn + other_conn = self.server_conn if conn == client else self.client_conn + is_server = (conn == self.server_conn.connection) + + frame = websockets.Frame.from_file(source_conn.rfile) + + if not self._handle_frame(frame, source_conn, other_conn, is_server): + return + except (socket.error, netlib.exceptions.TcpException, SSL.Error) as e: + self.log("WebSockets connection closed unexpectedly by {}: {}".format( + "server" if is_server else "client", repr(e)), "info") + except Exception as e: # pragma: no cover + raise exceptions.ProtocolException("Error in WebSockets connection: {}".format(repr(e))) |