aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2014-03-08 15:47:27 +0100
committerMaximilian Hils <git@maximilianhils.com>2014-03-08 15:47:27 +0100
commit2b01c4eee7a52dfb5fd30bb9b248cec87d563db9 (patch)
tree72604de5152bdf15147a78e03a669a207baeac95
parent3032672f10c727623ee085d04d623901dac856df (diff)
parent221973aff6553ffb1d40859c5d35b88d959f9718 (diff)
downloadmitmproxy-2b01c4eee7a52dfb5fd30bb9b248cec87d563db9.tar.gz
mitmproxy-2b01c4eee7a52dfb5fd30bb9b248cec87d563db9.tar.bz2
mitmproxy-2b01c4eee7a52dfb5fd30bb9b248cec87d563db9.zip
Merge branch 'master' of github.com:mitmproxy/mitmproxy
-rw-r--r--libmproxy/app.py9
-rw-r--r--libmproxy/cmdline.py2
-rw-r--r--libmproxy/console/__init__.py1
-rw-r--r--libmproxy/console/flowview.py148
-rw-r--r--libmproxy/protocol/__init__.py5
-rw-r--r--libmproxy/protocol/tcp.py13
-rw-r--r--libmproxy/proxy.py103
-rw-r--r--setup.py2
-rw-r--r--test/test_app.py5
-rw-r--r--test/test_console_contentview.py99
-rw-r--r--test/test_console_search.py176
-rw-r--r--test/test_protocol_http.py3
-rw-r--r--test/test_proxy.py11
-rw-r--r--test/test_server.py17
-rw-r--r--test/tservers.py17
15 files changed, 384 insertions, 227 deletions
diff --git a/libmproxy/app.py b/libmproxy/app.py
index b046f712..24187704 100644
--- a/libmproxy/app.py
+++ b/libmproxy/app.py
@@ -1,5 +1,6 @@
import flask
-import os.path
+import os.path, os
+import proxy
mapp = flask.Flask(__name__)
mapp.debug = True
@@ -16,14 +17,12 @@ def index():
@mapp.route("/cert/pem")
def certs_pem():
- capath = master().server.config.cacert
- p = os.path.splitext(capath)[0] + "-cert.pem"
+ p = os.path.join(master().server.config.confdir, proxy.CONF_BASENAME + "-ca-cert.pem")
return flask.Response(open(p, "rb").read(), mimetype='application/x-x509-ca-cert')
@mapp.route("/cert/p12")
def certs_p12():
- capath = master().server.config.cacert
- p = os.path.splitext(capath)[0] + "-cert.p12"
+ p = os.path.join(master().server.config.confdir, proxy.CONF_BASENAME + "-ca-cert.p12")
return flask.Response(open(p, "rb").read(), mimetype='application/x-pkcs12')
diff --git a/libmproxy/cmdline.py b/libmproxy/cmdline.py
index 8e7ab4a1..7950d40b 100644
--- a/libmproxy/cmdline.py
+++ b/libmproxy/cmdline.py
@@ -387,4 +387,4 @@ def common_options(parser):
help="Allow access to users specified in an Apache htpasswd file."
)
- proxy.certificate_option_group(parser)
+ proxy.ssl_option_group(parser)
diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py
index 4f298698..4a58e771 100644
--- a/libmproxy/console/__init__.py
+++ b/libmproxy/console/__init__.py
@@ -77,7 +77,6 @@ class PathEdit(urwid.Edit, _PathCompleter):
class ActionBar(common.WWrap):
def __init__(self):
self.message("")
- self.expire = None
def selectable(self):
return True
diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py
index 6c4a2651..3486f57e 100644
--- a/libmproxy/console/flowview.py
+++ b/libmproxy/console/flowview.py
@@ -68,7 +68,8 @@ def _mkhelp():
("space", "next flow"),
("|", "run script on this flow"),
("/", "search in response body (case sensitive)"),
- ("n", "repeat previous search"),
+ ("n", "repeat search forward"),
+ ("N", "repeat search backwards"),
]
text.extend(common.format_keyvals(keys, key="key", val="text", indent=4))
return text
@@ -255,7 +256,7 @@ class FlowView(common.WWrap):
)
return f
- def search_wrapped_around(self, last_find_line, last_search_index):
+ def search_wrapped_around(self, last_find_line, last_search_index, backwards):
"""
returns true if search wrapped around the bottom.
"""
@@ -265,15 +266,39 @@ class FlowView(common.WWrap):
current_search_index = self.state.get_flow_setting(self.flow,
"last_search_index")
- if current_find_line <= last_find_line:
- return True
- elif current_find_line == last_find_line:
- if current_search_index <= last_search_index:
- return True
+ if not backwards:
+ message = "search hit BOTTOM, continuing at TOP"
+ if current_find_line <= last_find_line:
+ return True, message
+ elif current_find_line == last_find_line:
+ if current_search_index <= last_search_index:
+ return True, message
+ else:
+ message = "search hit TOP, continuing at BOTTOM"
+ if current_find_line >= last_find_line:
+ return True, message
+ elif current_find_line == last_find_line:
+ if current_search_index >= last_search_index:
+ return True, message
- return False
+ return False, ""
- def search(self, search_string):
+ def search_again(self, backwards=False):
+ """
+ runs the previous search again, forwards or backwards.
+ """
+ last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
+ if last_search_string:
+ message = self.search(last_search_string, backwards)
+ if message:
+ self.master.statusbar.message(message)
+ else:
+ message = "no previous searches have been made"
+ self.master.statusbar.message(message)
+
+ return message
+
+ def search(self, search_string, backwards=False):
"""
similar to view_response or view_request, but instead of just
displaying the conn, it highlights a word that the user is
@@ -301,7 +326,7 @@ class FlowView(common.WWrap):
# generate the body, highlight the words and get focus
headers, msg, body = self.conn_text_raw(text)
try:
- body, focus_position = self.search_highlight_text(body, search_string)
+ body, focus_position = self.search_highlight_text(body, search_string, backwards=backwards)
except SearchError:
return "Search not supported in this view."
@@ -318,8 +343,10 @@ class FlowView(common.WWrap):
self.last_displayed_body = list_box
- if self.search_wrapped_around(last_find_line, last_search_index):
- return "search hit BOTTOM, continuing at TOP"
+ wrapped, wrapped_message = self.search_wrapped_around(last_find_line, last_search_index, backwards)
+
+ if wrapped:
+ return wrapped_message
def search_get_start(self, search_string):
start_line = 0
@@ -344,57 +371,94 @@ class FlowView(common.WWrap):
return (start_line, start_index)
- def search_highlight_text(self, text_objects, search_string, looping = False):
+ def search_get_range(self, len_text_objects, start_line, backwards):
+ if not backwards:
+ loop_range = xrange(start_line, len_text_objects)
+ else:
+ loop_range = xrange(start_line, -1, -1)
+
+ return loop_range
+
+ def search_find(self, text, search_string, start_index, backwards):
+ if backwards == False:
+ find_index = text.find(search_string, start_index)
+ else:
+ if start_index != 0:
+ start_index -= len(search_string)
+ else:
+ start_index = None
+
+ find_index = text.rfind(search_string, 0, start_index)
+
+ return find_index
+
+ def search_highlight_text(self, text_objects, search_string, looping = False, backwards = False):
start_line, start_index = self.search_get_start(search_string)
i = start_line
found = False
text_objects = copy.deepcopy(text_objects)
- for text_object in text_objects[start_line:]:
- if i != start_line:
- start_index = 0
+ loop_range = self.search_get_range(len(text_objects), start_line, backwards)
+ for i in loop_range:
+ text_object = text_objects[i]
try:
text, style = text_object.get_text()
except AttributeError:
raise SearchError()
- find_index = text.find(search_string, start_index)
- if find_index != -1:
- before = text[:find_index]
- after = text[find_index+len(search_string):]
- new_text = urwid.Text(
- [
- before,
- (self.highlight_color, search_string),
- after,
- ]
- )
- self.state.add_flow_setting(self.flow, "last_search_index",
- find_index)
- self.state.add_flow_setting(self.flow, "last_find_line", i)
+ if i != start_line:
+ start_index = 0
+ find_index = self.search_find(text, search_string, start_index, backwards)
+
+ if find_index != -1:
+ new_text = self.search_highlight_object(text, find_index, search_string)
text_objects[i] = new_text
found = True
+ self.state.add_flow_setting(self.flow, "last_search_index",
+ find_index)
+ self.state.add_flow_setting(self.flow, "last_find_line", i)
break
- i += 1
-
+ # handle search WRAP
if found:
focus_pos = i
else :
- # loop from the beginning, but not forever.
- if (start_line == 0 and start_index == 0) or looping:
+ if looping:
focus_pos = None
else:
- self.state.add_flow_setting(self.flow, "last_search_index", 0)
- self.state.add_flow_setting(self.flow, "last_find_line", 0)
- text_objects, focus_pos = self.search_highlight_text(text_objects, search_string, True)
+ if not backwards:
+ self.state.add_flow_setting(self.flow, "last_search_index", 0)
+ self.state.add_flow_setting(self.flow, "last_find_line", 0)
+ else:
+ self.state.add_flow_setting(self.flow, "last_search_index", None)
+ self.state.add_flow_setting(self.flow, "last_find_line", len(text_objects) - 1)
+
+ text_objects, focus_pos = self.search_highlight_text(text_objects,
+ search_string, looping=True, backwards=backwards)
return text_objects, focus_pos
+ def search_highlight_object(self, text_object, find_index, search_string):
+ """
+ just a little abstraction
+ """
+ before = text_object[:find_index]
+ after = text_object[find_index+len(search_string):]
+
+ new_text = urwid.Text(
+ [
+ before,
+ (self.highlight_color, search_string),
+ after,
+ ]
+ )
+
+ return new_text
+
def view_request(self):
self.state.view_flow_mode = common.VIEW_FLOW_REQUEST
body = self.conn_text(self.flow.request)
@@ -761,13 +825,9 @@ class FlowView(common.WWrap):
None,
self.search)
elif key == "n":
- last_search_string = self.state.get_flow_setting(self.flow, "last_search_string")
- if last_search_string:
- message = self.search(last_search_string)
- if message:
- self.master.statusbar.message(message)
- else:
- self.master.statusbar.message("no previous searches have been made")
+ self.search_again(backwards=False)
+ elif key == "N":
+ self.search_again(backwards=True)
else:
return key
diff --git a/libmproxy/protocol/__init__.py b/libmproxy/protocol/__init__.py
index 580d693c..2c2e7285 100644
--- a/libmproxy/protocol/__init__.py
+++ b/libmproxy/protocol/__init__.py
@@ -35,7 +35,6 @@ class TemporaryServerChangeMixin(object):
This mixin allows safe modification of the target server,
without any need to expose the ConnectionHandler to the Flow.
"""
-
def change_server(self, address, ssl):
if address == self.c.server_conn.address():
return
@@ -98,4 +97,6 @@ def handle_messages(conntype, connection_handler):
def handle_error(conntype, connection_handler, error):
- return _handler(conntype, connection_handler).handle_error(error) \ No newline at end of file
+ return _handler(conntype, connection_handler).handle_error(error)
+
+
diff --git a/libmproxy/protocol/tcp.py b/libmproxy/protocol/tcp.py
index df7e6692..1591eb04 100644
--- a/libmproxy/protocol/tcp.py
+++ b/libmproxy/protocol/tcp.py
@@ -32,12 +32,11 @@ class TCPHandler(ProtocolHandler):
if d == "": # connection closed
break
data.write(d)
-
- """
- OpenSSL Connections have an internal buffer that might contain data altough everything is read
- from the socket. Thankfully, connection.pending() returns the amount of bytes in this buffer,
- so we can read it completely at once.
- """
+ # OpenSSL Connections have an internal buffer that might
+ # contain data altough everything is read from the socket.
+ # Thankfully, connection.pending() returns the amount of
+ # bytes in this buffer, so we can read it completely at
+ # once.
if src.ssl_established:
data.write(rfile.read(src.connection.pending()))
else: # no data left, but not closed yet
@@ -57,4 +56,4 @@ class TCPHandler(ProtocolHandler):
self.c.log("%s %s\r\n%s" % (direction, dst_str,data))
dst.wfile.write(data)
- dst.wfile.flush() \ No newline at end of file
+ dst.wfile.flush()
diff --git a/libmproxy/proxy.py b/libmproxy/proxy.py
index b6480822..6dd37752 100644
--- a/libmproxy/proxy.py
+++ b/libmproxy/proxy.py
@@ -4,6 +4,10 @@ from netlib import tcp, http, certutils, http_auth
import utils, version, platform, controller, stateobject
TRANSPARENT_SSL_PORTS = [443, 8443]
+CONF_BASENAME = "mitmproxy"
+CONF_DIR = "~/.mitmproxy"
+CA_CERT_NAME = "mitmproxy-ca.pem"
+
class AddressPriority(object):
@@ -37,10 +41,12 @@ class Log:
class ProxyConfig:
- def __init__(self, certfile=None, cacert=None, clientcerts=None, no_upstream_cert=False, body_size_limit=None,
- reverse_proxy=None, forward_proxy=None, transparent_proxy=None, authenticator=None):
- self.certfile = certfile
- self.cacert = cacert
+ def __init__(self, confdir=CONF_DIR, clientcerts=None,
+ no_upstream_cert=False, body_size_limit=None, reverse_proxy=None,
+ forward_proxy=None, transparent_proxy=None, authenticator=None,
+ ciphers=None, certs=None
+ ):
+ self.ciphers = ciphers
self.clientcerts = clientcerts
self.no_upstream_cert = no_upstream_cert
self.body_size_limit = body_size_limit
@@ -48,7 +54,9 @@ class ProxyConfig:
self.forward_proxy = forward_proxy
self.transparent_proxy = transparent_proxy
self.authenticator = authenticator
- self.certstore = certutils.CertStore()
+ self.confdir = os.path.expanduser(confdir)
+ self.certstore = certutils.CertStore.from_store(self.confdir, CONF_BASENAME)
+
class ClientConnection(tcp.BaseHandler, stateobject.SimpleStateObject):
@@ -380,9 +388,12 @@ class ConnectionHandler:
if client:
if self.client_conn.ssl_established:
raise ProxyError(502, "SSL to Client already established.")
- dummycert = self.find_cert()
- self.client_conn.convert_to_ssl(dummycert, self.config.certfile or self.config.cacert,
- handle_sni=self.handle_sni)
+ cert, key = self.find_cert()
+ self.client_conn.convert_to_ssl(
+ cert, key,
+ handle_sni = self.handle_sni,
+ cipher_list = self.config.ciphers
+ )
def server_reconnect(self, no_ssl=False):
address = self.server_conn.address
@@ -410,22 +421,18 @@ class ConnectionHandler:
self.channel.tell("log", Log(msg))
def find_cert(self):
- if self.config.certfile:
- with open(self.config.certfile, "rb") as f:
- return certutils.SSLCert.from_pem(f.read())
- else:
- host = self.server_conn.address.host
- sans = []
- if not self.config.no_upstream_cert or not self.server_conn.ssl_established:
- upstream_cert = self.server_conn.cert
- if upstream_cert.cn:
- host = upstream_cert.cn.decode("utf8").encode("idna")
- sans = upstream_cert.altnames
-
- ret = self.config.certstore.get_cert(host, sans, self.config.cacert)
- if not ret:
- raise ProxyError(502, "Unable to generate dummy cert.")
- return ret
+ host = self.server_conn.address.host
+ sans = []
+ if not self.config.no_upstream_cert or not self.server_conn.ssl_established:
+ upstream_cert = self.server_conn.cert
+ if upstream_cert.cn:
+ host = upstream_cert.cn.decode("utf8").encode("idna")
+ sans = upstream_cert.altnames
+
+ ret = self.config.certstore.get_cert(host, sans)
+ if not ret:
+ raise ProxyError(502, "Unable to generate dummy cert.")
+ return ret
def handle_sni(self, connection):
"""
@@ -441,9 +448,9 @@ class ConnectionHandler:
self.server_reconnect() # reconnect to upstream server with SNI
# Now, change client context to reflect changed certificate:
new_context = SSL.Context(SSL.TLSv1_METHOD)
- new_context.use_privatekey_file(self.config.certfile or self.config.cacert)
- dummycert = self.find_cert()
- new_context.use_certificate(dummycert.x509)
+ cert, key = self.find_cert()
+ new_context.use_privatekey_file(key)
+ new_context.use_certificate(cert.X509)
connection.set_context(new_context)
# An unhandled exception in this method will core dump PyOpenSSL, so
# make dang sure it doesn't happen.
@@ -458,7 +465,6 @@ class ProxyServerError(Exception):
class ProxyServer(tcp.TCPServer):
allow_reuse_address = True
bound = True
-
def __init__(self, config, port, host='', server_version=version.NAMEVERSION):
"""
Raises ProxyServerError if there's a startup problem.
@@ -498,30 +504,29 @@ class DummyServer:
# Command-line utils
-def certificate_option_group(parser):
+def ssl_option_group(parser):
group = parser.add_argument_group("SSL")
group.add_argument(
- "--cert", action="store",
- type=str, dest="cert", default=None,
- help="User-created SSL certificate file."
+ "--cert", dest='certs', default=[], type=str,
+ metavar = "SPEC", action="append",
+ help='Add an SSL certificate. SPEC is of the form "[domain=]path". '\
+ 'The domain may include a wildcard, and is equal to "*" if not specified. '\
+ 'The file at path is a certificate in PEM format. If a private key is included in the PEM, '\
+ 'it is used, else the default key in the conf dir is used. Can be passed multiple times.'
)
group.add_argument(
"--client-certs", action="store",
type=str, dest="clientcerts", default=None,
help="Client certificate directory."
)
+ group.add_argument(
+ "--ciphers", action="store",
+ type=str, dest="ciphers", default=None,
+ help="SSL cipher specification."
+ )
def process_proxy_options(parser, options):
- if options.cert:
- options.cert = os.path.expanduser(options.cert)
- if not os.path.exists(options.cert):
- return parser.error("Manually created certificate does not exist: %s" % options.cert)
-
- cacert = os.path.join(options.confdir, "mitmproxy-ca.pem")
- cacert = os.path.expanduser(cacert)
- if not os.path.exists(cacert):
- certutils.dummy_ca(cacert)
body_size_limit = utils.parse_size(options.body_size_limit)
if options.reverse_proxy and options.transparent_proxy:
return parser.error("Can't set both reverse proxy and transparent proxy.")
@@ -574,14 +579,24 @@ def process_proxy_options(parser, options):
else:
authenticator = http_auth.NullProxyAuth(None)
+ certs = []
+ for i in options.certs:
+ parts = i.split("=", 1)
+ if len(parts) == 1:
+ parts = ["*", parts[0]]
+ parts[1] = os.path.expanduser(parts[1])
+ if not os.path.exists(parts[1]):
+ parser.error("Certificate file does not exist: %s"%parts[1])
+ certs.append(parts)
+
return ProxyConfig(
- certfile=options.cert,
- cacert=cacert,
clientcerts=options.clientcerts,
body_size_limit=body_size_limit,
no_upstream_cert=options.no_upstream_cert,
reverse_proxy=rp,
forward_proxy=fp,
transparent_proxy=trans,
- authenticator=authenticator
+ authenticator=authenticator,
+ ciphers=options.ciphers,
+ certs = certs,
)
diff --git a/setup.py b/setup.py
index d8e25660..624a1a60 100644
--- a/setup.py
+++ b/setup.py
@@ -102,7 +102,7 @@ setup(
"netlib>=%s"%version.VERSION,
"urwid>=1.1",
"pyasn1>0.1.2",
- "pyopenssl>=0.13",
+ "pyopenssl>=0.14",
"Pillow>=2.3.0",
"lxml",
"flask"
diff --git a/test/test_app.py b/test/test_app.py
index f0eab7cc..0b6ed14c 100644
--- a/test/test_app.py
+++ b/test/test_app.py
@@ -9,11 +9,8 @@ class TestApp(tservers.HTTPProxTest):
assert self.app("/").status_code == 200
def test_cert(self):
- path = tutils.test_data.path("data/confdir/") + "mitmproxy-ca-cert."
with tutils.tmpdir() as d:
for ext in ["pem", "p12"]:
resp = self.app("/cert/%s" % ext)
assert resp.status_code == 200
- with open(path + ext, "rb") as f:
- assert resp.content == f.read()
-
+ assert resp.content
diff --git a/test/test_console_contentview.py b/test/test_console_contentview.py
index 07ecf1d0..0aabd2c5 100644
--- a/test/test_console_contentview.py
+++ b/test/test_console_contentview.py
@@ -120,7 +120,7 @@ class TestContentView:
def test_view_css(self):
v = cv.ViewCSS()
- with open('./test/data/1.css', 'r') as fp:
+ with open(tutils.test_data.path('data/1.css'), 'r') as fp:
fixture_1 = fp.read()
result = v([], 'a', 100)
@@ -276,100 +276,3 @@ if cv.ViewProtobuf.is_available():
def test_get_by_shortcut():
assert cv.get_by_shortcut("h")
-def test_search_highlights():
- # Default text in requests is content. We will search for nt once, and
- # expect the first bit to be highlighted. We will do it again and expect the
- # second to be.
- f = tutils.tflowview()
-
- f.search("nt")
- text_object = tutils.get_body_line(f.last_displayed_body, 0)
- assert text_object.get_text() == ('content', [(None, 2), (f.highlight_color, 2)])
-
- f.search("nt")
- text_object = tutils.get_body_line(f.last_displayed_body, 1)
- assert text_object.get_text() == ('content', [(None, 5), (f.highlight_color, 2)])
-
-def test_search_returns_useful_messages():
- f = tutils.tflowview()
-
- # original string is content. this string should not be in there.
- response = f.search("oranges and other fruit.")
- assert response == "no matches for 'oranges and other fruit.'"
-
-def test_search_highlights_clears_prev():
- f = tutils.tflowview(request_contents="this is string\nstring is string")
-
- f.search("string")
- text_object = tutils.get_body_line(f.last_displayed_body, 0)
- assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
-
- # search again, it should not be highlighted again.
- f.search("string")
- text_object = tutils.get_body_line(f.last_displayed_body, 0)
- assert text_object.get_text() != ('this is string', [(None, 8), (f.highlight_color, 6)])
-
-def test_search_highlights_multi_line():
- f = tutils.tflowview(request_contents="this is string\nstring is string")
-
- # should highlight the first line.
- f.search("string")
- text_object = tutils.get_body_line(f.last_displayed_body, 0)
- assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
-
- # should highlight second line, first appearance of string.
- f.search("string")
- text_object = tutils.get_body_line(f.last_displayed_body, 1)
- assert text_object.get_text() == ('string is string', [(None, 0), (f.highlight_color, 6)])
-
- # should highlight third line, second appearance of string.
- f.search("string")
- text_object = tutils.get_body_line(f.last_displayed_body, 1)
- assert text_object.get_text() == ('string is string', [(None, 10), (f.highlight_color, 6)])
-
-def test_search_loops():
- f = tutils.tflowview(request_contents="this is string\nstring is string")
-
- # get to the end.
- f.search("string")
- f.search("string")
- f.search("string")
-
- # should highlight the first line.
- message = f.search("string")
- text_object = tutils.get_body_line(f.last_displayed_body, 0)
- assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
- assert message == "search hit BOTTOM, continuing at TOP"
-
-def test_search_focuses():
- f = tutils.tflowview(request_contents="this is string\nstring is string")
-
- # should highlight the first line.
- f.search("string")
-
- # should be focusing on the 2nd text line.
- f.search("string")
- text_object = tutils.get_body_line(f.last_displayed_body, 1)
- assert f.last_displayed_body.focus == text_object
-
-def test_search_does_not_crash_on_bad():
- """
- this used to crash, kept for reference.
- """
-
- f = tutils.tflowview(request_contents="this is string\nstring is string\n"+("A" * cv.VIEW_CUTOFF)+"AFTERCUTOFF")
- f.search("AFTERCUTOFF")
-
- # pretend F
- f.state.add_flow_setting(
- f.flow,
- (f.state.view_flow_mode, "fullcontents"),
- True
- )
- f.master.refresh_flow(f.flow)
-
- # text changed, now this string will exist. can happen when user presses F
- # for full text view
- f.search("AFTERCUTOFF")
-
-
diff --git a/test/test_console_search.py b/test/test_console_search.py
new file mode 100644
index 00000000..f4e9ff9b
--- /dev/null
+++ b/test/test_console_search.py
@@ -0,0 +1,176 @@
+
+import sys
+import libmproxy.console.contentview as cv
+from libmproxy import utils, flow, encoding
+import tutils
+
+def test_search_highlights():
+ # Default text in requests is content. We will search for nt once, and
+ # expect the first bit to be highlighted. We will do it again and expect the
+ # second to be.
+ f = tutils.tflowview()
+
+ f.search("nt")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('content', [(None, 2), (f.highlight_color, 2)])
+
+ f.search("nt")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('content', [(None, 5), (f.highlight_color, 2)])
+
+def test_search_returns_useful_messages():
+ f = tutils.tflowview()
+
+ # original string is content. this string should not be in there.
+ test_string = "oranges and other fruit."
+ response = f.search(test_string)
+ assert response == "no matches for '%s'" % test_string
+
+def test_search_highlights_clears_prev():
+ f = tutils.tflowview(request_contents="this is string\nstring is string")
+
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+ # search again, it should not be highlighted again.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() != ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+def test_search_highlights_multi_line(flow=None):
+ f = flow if flow else tutils.tflowview(request_contents="this is string\nstring is string")
+
+ # should highlight the first line.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+ # should highlight second line, first appearance of string.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('string is string', [(None, 0), (f.highlight_color, 6)])
+
+ # should highlight third line, second appearance of string.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('string is string', [(None, 10), (f.highlight_color, 6)])
+
+def test_search_loops():
+ f = tutils.tflowview(request_contents="this is string\nstring is string")
+
+ # get to the end.
+ f.search("string")
+ f.search("string")
+ f.search("string")
+
+ # should highlight the first line.
+ message = f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+ assert message == "search hit BOTTOM, continuing at TOP"
+
+def test_search_focuses():
+ f = tutils.tflowview(request_contents="this is string\nstring is string")
+
+ # should highlight the first line.
+ f.search("string")
+
+ # should be focusing on the 2nd text line.
+ f.search("string")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert f.last_displayed_body.focus == text_object
+
+def test_search_does_not_crash_on_bad():
+ """
+ this used to crash, kept for reference.
+ """
+
+ f = tutils.tflowview(request_contents="this is string\nstring is string\n"+("A" * cv.VIEW_CUTOFF)+"AFTERCUTOFF")
+ f.search("AFTERCUTOFF")
+
+ # pretend F
+ f.state.add_flow_setting(
+ f.flow,
+ (f.state.view_flow_mode, "fullcontents"),
+ True
+ )
+ f.master.refresh_flow(f.flow)
+
+ # text changed, now this string will exist. can happen when user presses F
+ # for full text view
+ f.search("AFTERCUTOFF")
+
+def test_search_backwards():
+ f = tutils.tflowview(request_contents="content, content")
+
+ first_match = ('content, content', [(None, 2), (f.highlight_color, 2)])
+
+ f.search("nt")
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == first_match
+
+ f.search("nt")
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('content, content', [(None, 5), (f.highlight_color, 2)])
+
+ f.search_again(backwards=True)
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == first_match
+
+def test_search_back_multiline():
+ f = tutils.tflowview(request_contents="this is string\nstring is string")
+
+ # shared assertions. highlight and pointers should now be on the third
+ # 'string' appearance
+ test_search_highlights_multi_line(f)
+
+ # should highlight second line, first appearance of string.
+ f.search_again(backwards=True)
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('string is string', [(None, 0), (f.highlight_color, 6)])
+
+ # should highlight the first line again.
+ f.search_again(backwards=True)
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+def test_search_back_multi_multi_line():
+ """
+ same as above for some bugs the above won't catch.
+ """
+ f = tutils.tflowview(request_contents="this is string\nthis is string\nthis is string")
+
+ f.search("string")
+ f.search_again()
+ f.search_again()
+
+ # should be on second line
+ f.search_again(backwards=True)
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+ # first line now
+ f.search_again(backwards=True)
+ text_object = tutils.get_body_line(f.last_displayed_body, 0)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+def test_search_backwards_wraps():
+ """
+ when searching past line 0, it should loop.
+ """
+ f = tutils.tflowview(request_contents="this is string\nthis is string\nthis is string")
+
+ # should be on second line
+ f.search("string")
+ f.search_again()
+ text_object = tutils.get_body_line(f.last_displayed_body, 1)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+
+ # should be on third now.
+ f.search_again(backwards=True)
+ message = f.search_again(backwards=True)
+ text_object = tutils.get_body_line(f.last_displayed_body, 2)
+ assert text_object.get_text() == ('this is string', [(None, 8), (f.highlight_color, 6)])
+ assert message == "search hit TOP, continuing at BOTTOM"
+
diff --git a/test/test_protocol_http.py b/test/test_protocol_http.py
index 1d8872cc..3f37928c 100644
--- a/test/test_protocol_http.py
+++ b/test/test_protocol_http.py
@@ -88,7 +88,6 @@ class TestHTTPResponse:
class TestInvalidRequests(tservers.HTTPProxTest):
ssl = True
-
def test_double_connect(self):
p = self.pathoc()
r = p.request("connect:'%s:%s'" % ("127.0.0.1", self.server2.port))
@@ -117,9 +116,7 @@ class TestProxyChaining(tservers.HTTPChainProxyTest):
class TestProxyChainingSSL(tservers.HTTPChainProxyTest):
ssl = True
-
def test_simple(self):
-
p = self.pathoc()
req = p.request("get:'/p/418:b\"content\"'")
assert req.content == "content"
diff --git a/test/test_proxy.py b/test/test_proxy.py
index c42d66e7..b15e3f84 100644
--- a/test/test_proxy.py
+++ b/test/test_proxy.py
@@ -70,10 +70,6 @@ class TestProcessProxyOptions:
def test_simple(self):
assert self.p()
- def test_cert(self):
- self.assert_noerr("--cert", tutils.test_data.path("data/testkey.pem"))
- self.assert_err("does not exist", "--cert", "nonexistent")
-
def test_confdir(self):
with tutils.tmpdir() as confdir:
self.assert_noerr("--confdir", confdir)
@@ -90,11 +86,16 @@ class TestProcessProxyOptions:
self.assert_err("invalid reverse proxy", "-P", "reverse")
self.assert_noerr("-P", "http://localhost")
- def test_certs(self):
+ def test_client_certs(self):
with tutils.tmpdir() as confdir:
self.assert_noerr("--client-certs", confdir)
self.assert_err("directory does not exist", "--client-certs", "nonexistent")
+ def test_certs(self):
+ with tutils.tmpdir() as confdir:
+ self.assert_noerr("--cert", tutils.test_data.path("data/testkey.pem"))
+ self.assert_err("does not exist", "--cert", "nonexistent")
+
def test_auth(self):
p = self.assert_noerr("--nonanonymous")
assert p.authenticator
diff --git a/test/test_server.py b/test/test_server.py
index 2f9e6728..ed21e75c 100644
--- a/test/test_server.py
+++ b/test/test_server.py
@@ -206,13 +206,21 @@ class TestHTTPSCertfile(tservers.HTTPProxTest, CommonMixin):
def test_certfile(self):
assert self.pathod("304")
-class TestHTTPSNoCommonName(tservers.HTTPProxTest, CommonMixin):
+
+class TestHTTPSNoCommonName(tservers.HTTPProxTest):
"""
Test what happens if we get a cert without common name back.
"""
ssl = True
- ssloptions=pathod.SSLOptions(certfile=tutils.test_data.path("data/no_common_name.pem"),
- keyfile=tutils.test_data.path("data/no_common_name.pem"))
+ ssloptions=pathod.SSLOptions(
+ certs = [
+ ("*", tutils.test_data.path("data/no_common_name.pem"))
+ ]
+ )
+ def test_http(self):
+ f = self.pathod("202")
+ assert f.sslinfo.certchain[0].get_subject().CN == "127.0.0.1"
+
class TestReverse(tservers.ReverseProxTest, CommonMixin):
reverse = True
@@ -370,7 +378,6 @@ class TestTransparentResolveError(tservers.TransparentProxTest):
assert self.pathod("304").status_code == 502
-
class MasterIncomplete(tservers.TestMaster):
def handle_request(self, m):
resp = tutils.tresp()
@@ -383,5 +390,3 @@ class TestIncompleteResponse(tservers.HTTPProxTest):
def test_incomplete(self):
assert self.pathod("200").status_code == 502
-
-
diff --git a/test/tservers.py b/test/tservers.py
index 812e8921..cf9b3f73 100644
--- a/test/tservers.py
+++ b/test/tservers.py
@@ -1,4 +1,6 @@
+import os.path
import threading, Queue
+import shutil, tempfile
import flask
import libpathod.test, libpathod.pathoc
from libmproxy import proxy, flow, controller
@@ -28,7 +30,6 @@ class TestMaster(flow.FlowMaster):
self.apps.add(testapp, "testapp", 80)
self.apps.add(errapp, "errapp", 80)
self.clear_log()
- self.start_app(APP_HOST, APP_PORT, False)
def handle_request(self, m):
flow.FlowMaster.handle_request(self, m)
@@ -73,25 +74,31 @@ class ProxTestBase(object):
ssl = None
ssloptions = False
clientcerts = False
- certfile = None
no_upstream_cert = False
authenticator = None
masterclass = TestMaster
+ externalapp = False
@classmethod
def setupAll(cls):
cls.server = libpathod.test.Daemon(ssl=cls.ssl, ssloptions=cls.ssloptions)
cls.server2 = libpathod.test.Daemon(ssl=cls.ssl, ssloptions=cls.ssloptions)
pconf = cls.get_proxy_config()
+ cls.confdir = os.path.join(tempfile.gettempdir(), "mitmproxy")
config = proxy.ProxyConfig(
no_upstream_cert = cls.no_upstream_cert,
- cacert = tutils.test_data.path("data/confdir/mitmproxy-ca.pem"),
+ confdir = cls.confdir,
authenticator = cls.authenticator,
**pconf
)
tmaster = cls.masterclass(config)
+ tmaster.start_app(APP_HOST, APP_PORT, cls.externalapp)
cls.proxy = ProxyThread(tmaster)
cls.proxy.start()
+ @classmethod
+ def tearDownAll(cls):
+ shutil.rmtree(cls.confdir)
+
@property
def master(cls):
return cls.proxy.tmaster
@@ -126,8 +133,6 @@ class ProxTestBase(object):
d = dict()
if cls.clientcerts:
d["clientcerts"] = tutils.test_data.path("data/clientcert")
- if cls.certfile:
- d["certfile"] =tutils.test_data.path("data/testkey.pem")
return d
@@ -252,7 +257,6 @@ class ChainProxTest(ProxTestBase):
"""
n = 2
chain_config = [lambda: proxy.ProxyConfig(
- cacert = tutils.test_data.path("data/confdir/mitmproxy-ca.pem"),
)] * n
@classmethod
def setupAll(cls):
@@ -265,6 +269,7 @@ class ChainProxTest(ProxTestBase):
cls.chain[-1].port
)
tmaster = cls.masterclass(config)
+ tmaster.start_app(APP_HOST, APP_PORT, cls.externalapp)
cls.chain.append(ProxyThread(tmaster))
cls.chain[-1].start()