import os.path import threading import tempfile import sys import time from unittest import mock import asyncio import mitmproxy.platform from mitmproxy.addons import core from mitmproxy.proxy.config import ProxyConfig from mitmproxy.proxy.server import ProxyServer from mitmproxy import controller from mitmproxy import options from mitmproxy import exceptions from mitmproxy import io from mitmproxy.utils import human import pathod.test import pathod.pathoc from mitmproxy import eventsequence from mitmproxy.test import tflow from mitmproxy.test import tutils from mitmproxy.test import taddons class MasterTest: async def cycle(self, master, content): f = tflow.tflow(req=tutils.treq(content=content)) layer = mock.Mock("mitmproxy.proxy.protocol.base.Layer") layer.client_conn = f.client_conn layer.reply = controller.DummyReply() await master.addons.handle_lifecycle("clientconnect", layer) for i in eventsequence.iterate(f): await master.addons.handle_lifecycle(*i) await master.addons.handle_lifecycle("clientdisconnect", layer) return f async def dummy_cycle(self, master, n, content): for i in range(n): await self.cycle(master, content) await master._shutdown() def flowfile(self, path): with open(path, "wb") as f: fw = io.FlowWriter(f) t = tflow.tflow(resp=True) fw.add(t) class TestState: def __init__(self): self.flows = [] def request(self, f): if f not in self.flows: self.flows.append(f) def response(self, f): if f not in self.flows: self.flows.append(f) def websocket_start(self, f): if f not in self.flows: self.flows.append(f) class TestMaster(taddons.RecordingMaster): def __init__(self, opts): super().__init__(opts) config = ProxyConfig(opts) self.server = ProxyServer(config) def clear_addons(self, addons): self.addons.clear() self.state = TestState() self.addons.add(self.state) self.addons.add(*addons) self.addons.trigger("configure", self.options.keys()) self.addons.trigger("running") def reset(self, addons): self.clear_addons(addons) self.clear() class ProxyThread(threading.Thread): def __init__(self, masterclass, options): threading.Thread.__init__(self) self.masterclass = masterclass self.options = options self.tmaster = None self.event_loop = None controller.should_exit = False @property def port(self): return self.tmaster.server.address[1] @property def tlog(self): return self.tmaster.logs def shutdown(self): self.tmaster.shutdown() def run(self): self.event_loop = asyncio.new_event_loop() asyncio.set_event_loop(self.event_loop) self.tmaster = self.masterclass(self.options) self.tmaster.addons.add(core.Core()) self.name = "ProxyThread (%s)" % human.format_address(self.tmaster.server.address) self.tmaster.run() def set_addons(self, *addons): self.tmaster.reset(addons) def start(self): super().start() while True: if self.tmaster: break time.sleep(0.01) class ProxyTestBase: # Test Configuration ssl = None ssloptions = False masterclass = TestMaster add_upstream_certs_to_client_chain = False @classmethod def setup_class(cls): cls.server = pathod.test.Daemon( ssl=cls.ssl, ssloptions=cls.ssloptions) cls.server2 = pathod.test.Daemon( ssl=cls.ssl, ssloptions=cls.ssloptions) cls.options = cls.get_options() cls.proxy = ProxyThread(cls.masterclass, cls.options) cls.proxy.start() @classmethod def teardown_class(cls): # perf: we want to run tests in parallel # should this ever cause an error, travis should catch it. # shutil.rmtree(cls.confdir) cls.proxy.shutdown() cls.server.shutdown() cls.server2.shutdown() def teardown(self): try: self.server.wait_for_silence() except exceptions.Timeout: # FIXME: Track down the Windows sync issues if sys.platform != "win32": raise def setup(self): self.master.reset(self.addons()) self.server.clear_log() self.server2.clear_log() @property def master(self): return self.proxy.tmaster @classmethod def get_options(cls): cls.confdir = os.path.join(tempfile.gettempdir(), "mitmproxy") return options.Options( listen_port=0, confdir=cls.confdir, add_upstream_certs_to_client_chain=cls.add_upstream_certs_to_client_chain, ssl_insecure=True, ) def set_addons(self, *addons): self.proxy.set_addons(*addons) def addons(self): """ Can be over-ridden to add a standard set of addons to tests. """ return [] class LazyPathoc(pathod.pathoc.Pathoc): def __init__(self, lazy_connect, *args, **kwargs): self.lazy_connect = lazy_connect pathod.pathoc.Pathoc.__init__(self, *args, **kwargs) def connect(self): return pathod.pathoc.Pathoc.connect(self, self.lazy_connect) class HTTPProxyTest(ProxyTestBase): def pathoc_raw(self): return pathod.pathoc.Pathoc(("127.0.0.1", self.proxy.port), fp=None) def pathoc(self, sni=None): """ Returns a connected Pathoc instance. """ if self.ssl: conn = ("127.0.0.1", self.server.port) else: conn = None return LazyPathoc( conn, ("localhost", self.proxy.port), ssl=self.ssl, sni=sni, fp=None ) def pathod(self, spec, sni=None): """ Constructs a pathod GET request, with the appropriate base and proxy. """ p = self.pathoc(sni=sni) if self.ssl: q = "get:'/p/%s'" % spec else: q = "get:'%s/p/%s'" % (self.server.urlbase, spec) with p.connect(): return p.request(q) def app(self, page): if self.ssl: p = pathod.pathoc.Pathoc( ("127.0.0.1", self.proxy.port), True, fp=None ) with p.connect((self.master.options.onboarding_host, self.master.options.onbarding_port)): return p.request("get:'%s'" % page) else: p = self.pathoc() with p.connect(): return p.request("get:'http://%s%s'" % (self.master.options.onboarding_host, page)) class TransparentProxyTest(ProxyTestBase): ssl = None @classmethod def setup_class(cls): cls._init_transparent_mode = mitmproxy.platform.init_transparent_mode cls._original_addr = mitmproxy.platform.original_addr mitmproxy.platform.init_transparent_mode = lambda: True mitmproxy.platform.original_addr = lambda sock: ("127.0.0.1", cls.server.port) super().setup_class() @classmethod def teardown_class(cls): super().teardown_class() mitmproxy.platform.init_transparent_mode = cls._init_transparent_mode mitmproxy.platform.original_addr = cls._original_addr @classmethod def get_options(cls): opts = ProxyTestBase.get_options() opts.mode = "transparent" return opts def pathod(self, spec, sni=None): """ Constructs a pathod GET request, with the appropriate base and proxy. """ if self.ssl: p = self.pathoc(sni=sni) q = "get:'/p/%s'" % spec else: p = self.pathoc() q = "get:'/p/%s'" % spec with p.connect(): return p.request(q) def pathoc(self, sni=None): """ Returns a connected Pathoc instance. """ p = pathod.pathoc.Pathoc( ("localhost", self.proxy.port), ssl=self.ssl, sni=sni, fp=None ) return p class ReverseProxyTest(ProxyTestBase): ssl = None @classmethod def get_options(cls): opts = ProxyTestBase.get_options() s = "".join( [ "https" if cls.ssl else "http", "://", "127.0.0.1:", str(cls.server.port) ] ) opts.mode = "reverse:" + s return opts def pathoc(self, sni=None): """ Returns a connected Pathoc instance. """ p = pathod.pathoc.Pathoc( ("localhost", self.proxy.port), ssl=self.ssl, sni=sni, fp=None ) return p def pathod(self, spec, sni=None): """ Constructs a pathod GET request, with the appropriate base and proxy. """ if self.ssl: p = self.pathoc(sni=sni) q = "get:'/p/%s'" % spec else: p = self.pathoc() q = "get:'/p/%s'" % spec with p.connect(): return p.request(q) class SocksModeTest(HTTPProxyTest): @classmethod def get_options(cls): opts = ProxyTestBase.get_options() opts.mode = "socks5" return opts class HTTPUpstreamProxyTest(HTTPProxyTest): """ Chain three instances of mitmproxy in a row to test upstream mode. Proxy order is cls.proxy -> cls.chain[0] -> cls.chain[1] cls.proxy and cls.chain[0] are in upstream mode, cls.chain[1] is in regular mode. """ chain = None n = 2 @classmethod def setup_class(cls): # We need to initialize the chain first so that the normal server gets a correct config. cls.chain = [] for _ in range(cls.n): opts = cls.get_options() proxy = ProxyThread(cls.masterclass, opts) proxy.start() cls.chain.insert(0, proxy) while True: if proxy.event_loop and proxy.event_loop.is_running(): break super().setup_class() @classmethod def teardown_class(cls): super().teardown_class() for proxy in cls.chain: proxy.shutdown() def setup(self): super().setup() for proxy in self.chain: proxy.tmaster.reset(self.addons()) @classmethod def get_options(cls): opts = super().get_options() if cls.chain: # First proxy is in normal mode. s = "http://127.0.0.1:%s" % cls.chain[0].port opts.update( mode="upstream:" + s, ) return opts