diff options
Diffstat (limited to 'examples/complex/tls_passthrough.py')
-rw-r--r-- | examples/complex/tls_passthrough.py | 140 |
1 files changed, 140 insertions, 0 deletions
diff --git a/examples/complex/tls_passthrough.py b/examples/complex/tls_passthrough.py new file mode 100644 index 00000000..40c1051d --- /dev/null +++ b/examples/complex/tls_passthrough.py @@ -0,0 +1,140 @@ +""" +This inline script allows conditional TLS Interception based +on a user-defined strategy. + +Example: + + > mitmdump -s tls_passthrough.py + + 1. curl --proxy http://localhost:8080 https://example.com --insecure + // works - we'll also see the contents in mitmproxy + + 2. curl --proxy http://localhost:8080 https://example.com --insecure + // still works - we'll also see the contents in mitmproxy + + 3. curl --proxy http://localhost:8080 https://example.com + // fails with a certificate error, which we will also see in mitmproxy + + 4. curl --proxy http://localhost:8080 https://example.com + // works again, but mitmproxy does not intercept and we do *not* see the contents + +Authors: Maximilian Hils, Matthew Tuusberg +""" +import collections +import random + +import sys +from enum import Enum + +import mitmproxy +from mitmproxy.exceptions import TlsProtocolException +from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer + + +class InterceptionResult(Enum): + success = True + failure = False + skipped = None + + +class _TlsStrategy: + """ + Abstract base class for interception strategies. + """ + + def __init__(self): + # A server_address -> interception results mapping + self.history = collections.defaultdict(lambda: collections.deque(maxlen=200)) + + def should_intercept(self, server_address): + """ + Returns: + True, if we should attempt to intercept the connection. + False, if we want to employ pass-through instead. + """ + raise NotImplementedError() + + def record_success(self, server_address): + self.history[server_address].append(InterceptionResult.success) + + def record_failure(self, server_address): + self.history[server_address].append(InterceptionResult.failure) + + def record_skipped(self, server_address): + self.history[server_address].append(InterceptionResult.skipped) + + +class ConservativeStrategy(_TlsStrategy): + """ + Conservative Interception Strategy - only intercept if there haven't been any failed attempts + in the history. + """ + + def should_intercept(self, server_address): + if InterceptionResult.failure in self.history[server_address]: + return False + return True + + +class ProbabilisticStrategy(_TlsStrategy): + """ + Fixed probability that we intercept a given connection. + """ + + def __init__(self, p): + self.p = p + super(ProbabilisticStrategy, self).__init__() + + def should_intercept(self, server_address): + return random.uniform(0, 1) < self.p + + +class TlsFeedback(TlsLayer): + """ + Monkey-patch _establish_tls_with_client to get feedback if TLS could be established + successfully on the client connection (which may fail due to cert pinning). + """ + + def _establish_tls_with_client(self): + server_address = self.server_conn.address + + try: + super(TlsFeedback, self)._establish_tls_with_client() + except TlsProtocolException as e: + tls_strategy.record_failure(server_address) + raise e + else: + tls_strategy.record_success(server_address) + + +# inline script hooks below. + +tls_strategy = None + + +def start(): + global tls_strategy + if len(sys.argv) == 2: + tls_strategy = ProbabilisticStrategy(float(sys.argv[1])) + else: + tls_strategy = ConservativeStrategy() + + +def next_layer(next_layer): + """ + This hook does the actual magic - if the next layer is planned to be a TLS layer, + we check if we want to enter pass-through mode instead. + """ + if isinstance(next_layer, TlsLayer) and next_layer._client_tls: + server_address = next_layer.server_conn.address + + if tls_strategy.should_intercept(server_address): + # We try to intercept. + # Monkey-Patch the layer to get feedback from the TLSLayer if interception worked. + next_layer.__class__ = TlsFeedback + else: + # We don't intercept - reply with a pass-through layer and add a "skipped" entry. + mitmproxy.ctx.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info") + next_layer_replacement = RawTCPLayer(next_layer.ctx, ignore=True) + next_layer.reply.send(next_layer_replacement) + tls_strategy.record_skipped(server_address) |