aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/README4
-rw-r--r--examples/tls_passthrough.py136
2 files changed, 140 insertions, 0 deletions
diff --git a/examples/README b/examples/README
index adfcd0f2..b4dec8e5 100644
--- a/examples/README
+++ b/examples/README
@@ -1,3 +1,7 @@
+Some inline scripts may require additional dependencies, which can be installed using
+`pip install mitmproxy[examples]`.
+
+
# inline script examples
add_header.py Simple script that just adds a header to every request.
change_upstream_proxy.py Dynamically change the upstream proxy
diff --git a/examples/tls_passthrough.py b/examples/tls_passthrough.py
new file mode 100644
index 00000000..7b4dec62
--- /dev/null
+++ b/examples/tls_passthrough.py
@@ -0,0 +1,136 @@
+"""
+This inline script allows conditional TLS Interception based
+on a user-defined strategy.
+
+Example:
+
+ > mitmdump -s tls_passthrough.py
+
+ 1. curl --proxy http://localhost:8080 https://example.com --insecure
+ // works - we'll also see the contents in mitmproxy
+
+ 2. curl --proxy http://localhost:8080 https://example.com --insecure
+ // still works - we'll also see the contents in mitmproxy
+
+ 3. curl --proxy http://localhost:8080 https://example.com
+ // fails with a certificate error, which we will also see in mitmproxy
+
+ 4. curl --proxy http://localhost:8080 https://example.com
+ // works again, but mitmproxy does not intercept and we do *not* see the contents
+
+Authors: Maximilian Hils, Matthew Tuusberg
+"""
+from __future__ import (absolute_import, print_function, division)
+import collections
+import random
+
+from enum import Enum
+
+from libmproxy.exceptions import TlsException
+from libmproxy.protocol import TlsLayer, RawTCPLayer
+
+
+class InterceptionResult(Enum):
+ success = True
+ failure = False
+ skipped = None
+
+
+class _TlsStrategy(object):
+ """
+ Abstract base class for interception strategies.
+ """
+ def __init__(self):
+ # A server_address -> interception results mapping
+ self.history = collections.defaultdict(lambda: collections.deque(maxlen=200))
+
+ def should_intercept(self, server_address):
+ """
+ Returns:
+ True, if we should attempt to intercept the connection.
+ False, if we want to employ pass-through instead.
+ """
+ raise NotImplementedError()
+
+ def record_success(self, server_address):
+ self.history[server_address].append(InterceptionResult.success)
+
+ def record_failure(self, server_address):
+ self.history[server_address].append(InterceptionResult.failure)
+
+ def record_skipped(self, server_address):
+ self.history[server_address].append(InterceptionResult.skipped)
+
+
+class ConservativeStrategy(_TlsStrategy):
+ """
+ Conservative Interception Strategy - only intercept if there haven't been any failed attempts
+ in the history.
+ """
+
+ def should_intercept(self, server_address):
+ if InterceptionResult.failure in self.history[server_address]:
+ return False
+ return True
+
+
+class ProbabilisticStrategy(_TlsStrategy):
+ """
+ Fixed probability that we intercept a given connection.
+ """
+ def __init__(self, p):
+ self.p = p
+ super(ProbabilisticStrategy, self).__init__()
+
+ def should_intercept(self, server_address):
+ return random.uniform(0, 1) < self.p
+
+
+class TlsFeedback(TlsLayer):
+ """
+ Monkey-patch _establish_tls_with_client to get feedback if TLS could be established
+ successfully on the client connection (which may fail due to cert pinning).
+ """
+
+ def _establish_tls_with_client(self):
+ server_address = self.server_conn.address
+ tls_strategy = self.script_context.tls_strategy
+
+ try:
+ super(TlsFeedback, self)._establish_tls_with_client()
+ except TlsException as e:
+ tls_strategy.record_failure(server_address)
+ raise e
+ else:
+ tls_strategy.record_success(server_address)
+
+
+# inline script hooks below.
+
+
+def start(context, argv):
+ if len(argv) == 2:
+ context.tls_strategy = ProbabilisticStrategy(float(argv[1]))
+ else:
+ context.tls_strategy = ConservativeStrategy()
+
+
+def next_layer(context, next_layer):
+ """
+ This hook does the actual magic - if the next layer is planned to be a TLS layer,
+ we check if we want to enter pass-through mode instead.
+ """
+ if isinstance(next_layer, TlsLayer) and next_layer._client_tls:
+ server_address = next_layer.server_conn.address
+
+ if context.tls_strategy.should_intercept(server_address):
+ # We try to intercept.
+ # Monkey-Patch the layer to get feedback from the TLSLayer if interception worked.
+ next_layer.__class__ = TlsFeedback
+ next_layer.script_context = context
+ else:
+ # We don't intercept - reply with a pass-through layer and add a "skipped" entry.
+ context.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info")
+ next_layer_replacement = RawTCPLayer(next_layer.ctx, logging=False)
+ next_layer.reply(next_layer_replacement)
+ context.tls_strategy.record_skipped(server_address)