From 1e07d9e6e7962922707fb0f384e30fd4d9461e2a Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Sat, 4 Jan 2014 14:35:11 +1300 Subject: Move app mechanism to flow.py Disable apps while message passing is improved. --- libmproxy/flow.py | 61 +++++++++++++++----- libmproxy/proxy.py | 162 ++++++++++++++++++++++------------------------------- test/test_dump.py | 2 +- test/test_flow.py | 21 +++++++ test/test_proxy.py | 20 ------- test/tservers.py | 4 +- 6 files changed, 137 insertions(+), 133 deletions(-) diff --git a/libmproxy/flow.py b/libmproxy/flow.py index 44dc57ae..59d8bde4 100644 --- a/libmproxy/flow.py +++ b/libmproxy/flow.py @@ -6,7 +6,7 @@ import hashlib, Cookie, cookielib, copy, re, urlparse, os, threading import time, urllib import tnetstring, filt, script, utils, encoding, proxy from email.utils import parsedate_tz, formatdate, mktime_tz -from netlib import odict, http, certutils +from netlib import odict, http, certutils, wsgi import controller, version import app @@ -17,6 +17,28 @@ ODict = odict.ODict ODictCaseless = odict.ODictCaseless +class AppRegistry: + def __init__(self): + self.apps = {} + + def add(self, app, domain, port): + """ + Add a WSGI app to the registry, to be served for requests to the + specified domain, on the specified port. + """ + self.apps[(domain, port)] = wsgi.WSGIAdaptor(app, domain, port, version.NAMEVERSION) + + def get(self, request): + """ + Returns an WSGIAdaptor instance if request matches an app, or None. + """ + if (request.host, request.port) in self.apps: + return self.apps[(request.host, request.port)] + if "host" in request.headers: + host = request.headers["host"][0] + return self.apps.get((host, request.port), None) + + class ReplaceHooks: def __init__(self): self.lst = [] @@ -289,8 +311,10 @@ class Request(HTTPMsg): """ def __init__( - self, client_conn, httpversion, host, port, scheme, method, path, headers, content, timestamp_start=None, - timestamp_end=None, tcp_setup_timestamp=None, ssl_setup_timestamp=None, ip=None): + self, client_conn, httpversion, host, port, + scheme, method, path, headers, content, timestamp_start=None, + timestamp_end=None, tcp_setup_timestamp=None, + ssl_setup_timestamp=None, ip=None): assert isinstance(headers, ODictCaseless) self.client_conn = client_conn self.httpversion = httpversion @@ -1374,16 +1398,16 @@ class FlowMaster(controller.Master): self.stream = None app.mapp.config["PMASTER"] = self + self.apps = AppRegistry() def start_app(self, host, port, external): if not external: - self.server.apps.add( + self.apps.add( app.mapp, host, port ) else: - print host threading.Thread(target=app.mapp.run,kwargs={ "use_reloader": False, "host": host, @@ -1590,9 +1614,11 @@ class FlowMaster(controller.Master): r.reply() def handle_serverconnection(self, sc): - # To unify the mitmproxy script API, we call the script hook "serverconnect" rather than "serverconnection". - # As things are handled differently in libmproxy (ClientConnect + ClientDisconnect vs ServerConnection class), - # there is no "serverdisonnect" event at the moment. + # To unify the mitmproxy script API, we call the script hook + # "serverconnect" rather than "serverconnection". As things are handled + # differently in libmproxy (ClientConnect + ClientDisconnect vs + # ServerConnection class), there is no "serverdisonnect" event at the + # moment. self.run_script_hook("serverconnect", sc) sc.reply() @@ -1606,12 +1632,19 @@ class FlowMaster(controller.Master): return f def handle_request(self, r): - f = self.state.add_request(r) - self.replacehooks.run(f) - self.setheaders.run(f) - self.run_script_hook("request", f) - self.process_new_request(f) - return f + app = self.apps.get(r) + if app: + r.reply() + #err = app.serve(r, self.wfile) + #if err: + # self.add_event("Error in wsgi app. %s"%err, "error") + else: + f = self.state.add_request(r) + self.replacehooks.run(f) + self.setheaders.run(f) + self.run_script_hook("request", f) + self.process_new_request(f) + return f def handle_response(self, r): f = self.state.add_response(r) diff --git a/libmproxy/proxy.py b/libmproxy/proxy.py index 609ffb62..9b300aa1 100644 --- a/libmproxy/proxy.py +++ b/libmproxy/proxy.py @@ -2,7 +2,7 @@ import sys, os, string, socket, time import shutil, tempfile, threading import SocketServer from OpenSSL import SSL -from netlib import odict, tcp, http, wsgi, certutils, http_status, http_auth +from netlib import odict, tcp, http, certutils, http_status, http_auth import utils, flow, version, platform, controller @@ -209,84 +209,77 @@ class ProxyHandler(tcp.BaseHandler): return cc.requestcount += 1 - app = self.server.apps.get(request) - if app: - err = app.serve(request, self.wfile) - if err: - self.log(cc, "Error in wsgi app.", err.split("\n")) - return + request_reply = self.channel.ask(request) + if request_reply is None or request_reply == KILL: + return + elif isinstance(request_reply, flow.Response): + request = False + response = request_reply + response_reply = self.channel.ask(response) else: - request_reply = self.channel.ask(request) - if request_reply is None or request_reply == KILL: - return - elif isinstance(request_reply, flow.Response): - request = False - response = request_reply - response_reply = self.channel.ask(response) + request = request_reply + if self.config.reverse_proxy: + scheme, host, port = self.config.reverse_proxy + elif self.config.forward_proxy: + scheme, host, port = self.config.forward_proxy else: - request = request_reply - if self.config.reverse_proxy: - scheme, host, port = self.config.reverse_proxy - elif self.config.forward_proxy: - scheme, host, port = self.config.forward_proxy - else: - scheme, host, port = request.scheme, request.host, request.port - - # If we've already pumped a request over this connection, - # it's possible that the server has timed out. If this is - # the case, we want to reconnect without sending an error - # to the client. - while 1: - sc = self.get_server_connection(cc, scheme, host, port, self.sni, request=request) - sc.send(request) - if sc.requestcount == 1: # add timestamps only for first request (others are not directly affected) - request.tcp_setup_timestamp = sc.tcp_setup_timestamp - request.ssl_setup_timestamp = sc.ssl_setup_timestamp - sc.rfile.reset_timestamps() - try: - tsstart = utils.timestamp() - peername = sc.connection.getpeername() - if peername: - request.ip = peername[0] - httpversion, code, msg, headers, content = http.read_response( - sc.rfile, - request.method, - self.config.body_size_limit - ) - except http.HttpErrorConnClosed, v: - self.del_server_connection() - if sc.requestcount > 1: - continue - else: - raise - except http.HttpError, v: - raise ProxyError(502, "Invalid server response.") + scheme, host, port = request.scheme, request.host, request.port + + # If we've already pumped a request over this connection, + # it's possible that the server has timed out. If this is + # the case, we want to reconnect without sending an error + # to the client. + while 1: + sc = self.get_server_connection(cc, scheme, host, port, self.sni, request=request) + sc.send(request) + if sc.requestcount == 1: # add timestamps only for first request (others are not directly affected) + request.tcp_setup_timestamp = sc.tcp_setup_timestamp + request.ssl_setup_timestamp = sc.ssl_setup_timestamp + sc.rfile.reset_timestamps() + try: + tsstart = utils.timestamp() + peername = sc.connection.getpeername() + if peername: + request.ip = peername[0] + httpversion, code, msg, headers, content = http.read_response( + sc.rfile, + request.method, + self.config.body_size_limit + ) + except http.HttpErrorConnClosed, v: + self.del_server_connection() + if sc.requestcount > 1: + continue else: - break - - response = flow.Response( - request, httpversion, code, msg, headers, content, sc.cert, - sc.rfile.first_byte_timestamp - ) - response_reply = self.channel.ask(response) - # Not replying to the server invalidates the server - # connection, so we terminate. - if response_reply == KILL: - sc.terminate() + raise + except http.HttpError, v: + raise ProxyError(502, "Invalid server response.") + else: + break + response = flow.Response( + request, httpversion, code, msg, headers, content, sc.cert, + sc.rfile.first_byte_timestamp + ) + response_reply = self.channel.ask(response) + # Not replying to the server invalidates the server + # connection, so we terminate. if response_reply == KILL: + sc.terminate() + + if response_reply == KILL: + return + else: + response = response_reply + self.send_response(response) + if request and http.connection_close(request.httpversion, request.headers): + return + # We could keep the client connection when the server + # connection needs to go away. However, we want to mimic + # behaviour as closely as possible to the client, so we + # disconnect. + if http.connection_close(response.httpversion, response.headers): return - else: - response = response_reply - self.send_response(response) - if request and http.connection_close(request.httpversion, request.headers): - return - # We could keep the client connection when the server - # connection needs to go away. However, we want to mimic - # behaviour as closely as possible to the client, so we - # disconnect. - if http.connection_close(response.httpversion, response.headers): - return except (IOError, ProxyError, http.HttpError, tcp.NetLibError), e: if hasattr(e, "code"): cc.error = "%s: %s"%(e.code, e.msg) @@ -526,7 +519,6 @@ class ProxyServer(tcp.TCPServer): except socket.error, v: raise ProxyServerError('Error starting proxy server: ' + v.strerror) self.channel = None - self.apps = AppRegistry() def start_slave(self, klass, channel): slave = klass(channel, self) @@ -541,28 +533,6 @@ class ProxyServer(tcp.TCPServer): h.finish() -class AppRegistry: - def __init__(self): - self.apps = {} - - def add(self, app, domain, port): - """ - Add a WSGI app to the registry, to be served for requests to the - specified domain, on the specified port. - """ - self.apps[(domain, port)] = wsgi.WSGIAdaptor(app, domain, port, version.NAMEVERSION) - - def get(self, request): - """ - Returns an WSGIAdaptor instance if request matches an app, or None. - """ - if (request.host, request.port) in self.apps: - return self.apps[(request.host, request.port)] - if "host" in request.headers: - host = request.headers["host"][0] - return self.apps.get((host, request.port), None) - - class DummyServer: bound = False def __init__(self, config): diff --git a/test/test_dump.py b/test/test_dump.py index 3d375f16..9874b650 100644 --- a/test/test_dump.py +++ b/test/test_dump.py @@ -114,7 +114,7 @@ class TestDumpMaster: o = dump.Options(app=True) s = mock.MagicMock() m = dump.DumpMaster(s, o, None) - assert s.apps.add.call_count == 1 + assert len(m.apps.apps) == 1 def test_replacements(self): o = dump.Options(replacements=[(".*", "content", "foo")]) diff --git a/test/test_flow.py b/test/test_flow.py index c614960b..bf6a7a42 100644 --- a/test/test_flow.py +++ b/test/test_flow.py @@ -5,6 +5,27 @@ from libmproxy import filt, flow, controller, utils, tnetstring, proxy import tutils +def test_app_registry(): + ar = flow.AppRegistry() + ar.add("foo", "domain", 80) + + r = tutils.treq() + r.host = "domain" + r.port = 80 + assert ar.get(r) + + r.port = 81 + assert not ar.get(r) + + r = tutils.treq() + r.host = "domain2" + r.port = 80 + assert not ar.get(r) + r.headers["host"] = ["domain"] + assert ar.get(r) + + + class TestStickyCookieState: def _response(self, cookie, host): s = flow.StickyCookieState(filt.parse(".*")) diff --git a/test/test_proxy.py b/test/test_proxy.py index dfccaaf7..371e5ef7 100644 --- a/test/test_proxy.py +++ b/test/test_proxy.py @@ -11,26 +11,6 @@ def test_proxy_error(): assert str(p) -def test_app_registry(): - ar = proxy.AppRegistry() - ar.add("foo", "domain", 80) - - r = tutils.treq() - r.host = "domain" - r.port = 80 - assert ar.get(r) - - r.port = 81 - assert not ar.get(r) - - r = tutils.treq() - r.host = "domain2" - r.port = 80 - assert not ar.get(r) - r.headers["host"] = ["domain"] - assert ar.get(r) - - class TestServerConnection: def setUp(self): self.d = test.Daemon() diff --git a/test/tservers.py b/test/tservers.py index f886d42d..ac95b168 100644 --- a/test/tservers.py +++ b/test/tservers.py @@ -23,10 +23,10 @@ def errapp(environ, start_response): class TestMaster(flow.FlowMaster): def __init__(self, testq, config): s = proxy.ProxyServer(config, 0) - s.apps.add(testapp, "testapp", 80) - s.apps.add(errapp, "errapp", 80) state = flow.State() flow.FlowMaster.__init__(self, s, state) + self.apps.add(testapp, "testapp", 80) + self.apps.add(errapp, "errapp", 80) self.testq = testq self.clear_log() self.start_app(APP_HOST, APP_PORT, False) -- cgit v1.2.3