aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_flow.py
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2016-02-15 14:58:46 +0100
committerMaximilian Hils <git@maximilianhils.com>2016-02-15 14:58:46 +0100
commit33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04 (patch)
tree31914a601302579ff817504019296fd7e9e46765 /test/test_flow.py
parent36f34f701991b5d474c005ec45e3b66e20f326a8 (diff)
downloadmitmproxy-33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04.tar.gz
mitmproxy-33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04.tar.bz2
mitmproxy-33fa49277a821b9d38e8c9bf0bcf2adcfa2f6f04.zip
move mitmproxy
Diffstat (limited to 'test/test_flow.py')
-rw-r--r--test/test_flow.py1333
1 files changed, 0 insertions, 1333 deletions
diff --git a/test/test_flow.py b/test/test_flow.py
deleted file mode 100644
index b122489f..00000000
--- a/test/test_flow.py
+++ /dev/null
@@ -1,1333 +0,0 @@
-import Queue
-import time
-import os.path
-from cStringIO import StringIO
-import email.utils
-
-import mock
-
-import netlib.utils
-from netlib import odict
-from netlib.http import CONTENT_MISSING, Headers
-from libmproxy import filt, controller, tnetstring, flow
-from libmproxy.models import Error
-from libmproxy.models import Flow
-from libmproxy.models import HTTPFlow
-from libmproxy.models import HTTPRequest
-from libmproxy.models import HTTPResponse
-from libmproxy.proxy.config import HostMatcher
-from libmproxy.proxy import ProxyConfig
-from libmproxy.proxy.server import DummyServer
-from libmproxy.models.connections import ClientConnection
-from . import tutils
-
-
-def test_app_registry():
- ar = flow.AppRegistry()
- ar.add("foo", "domain", 80)
-
- r = HTTPRequest.wrap(netlib.tutils.treq())
- r.host = "domain"
- r.port = 80
- assert ar.get(r)
-
- r.port = 81
- assert not ar.get(r)
-
- r = HTTPRequest.wrap(netlib.tutils.treq())
- r.host = "domain2"
- r.port = 80
- assert not ar.get(r)
- r.headers["host"] = "domain"
- assert ar.get(r)
-
-
-class TestStickyCookieState:
-
- def _response(self, cookie, host):
- s = flow.StickyCookieState(filt.parse(".*"))
- f = tutils.tflow(req=netlib.tutils.treq(host=host, port=80), resp=True)
- f.response.headers["Set-Cookie"] = cookie
- s.handle_response(f)
- return s, f
-
- def test_domain_match(self):
- s = flow.StickyCookieState(filt.parse(".*"))
- assert s.domain_match("www.google.com", ".google.com")
- assert s.domain_match("google.com", ".google.com")
-
- def test_handle_response(self):
- c = "SSID=mooo; domain=.google.com, FOO=bar; Domain=.google.com; Path=/; "\
- "Expires=Wed, 13-Jan-2021 22:23:01 GMT; Secure; "
-
- s, f = self._response(c, "host")
- assert not s.jar.keys()
-
- s, f = self._response(c, "www.google.com")
- assert s.jar.keys()
-
- s, f = self._response("SSID=mooo", "www.google.com")
- assert s.jar.keys()[0] == ('www.google.com', 80, '/')
-
- def test_handle_request(self):
- s, f = self._response("SSID=mooo", "www.google.com")
- assert "cookie" not in f.request.headers
- s.handle_request(f)
- assert "cookie" in f.request.headers
-
-
-class TestStickyAuthState:
-
- def test_handle_response(self):
- s = flow.StickyAuthState(filt.parse(".*"))
- f = tutils.tflow(resp=True)
- f.request.headers["authorization"] = "foo"
- s.handle_request(f)
- assert "address" in s.hosts
-
- f = tutils.tflow(resp=True)
- s.handle_request(f)
- assert f.request.headers["authorization"] == "foo"
-
-
-class TestClientPlaybackState:
-
- def test_tick(self):
- first = tutils.tflow()
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- fm.start_client_playback([first, tutils.tflow()], True)
- c = fm.client_playback
- c.testing = True
-
- assert not c.done()
- assert not s.flow_count()
- assert c.count() == 2
- c.tick(fm)
- assert s.flow_count()
- assert c.count() == 1
-
- c.tick(fm)
- assert c.count() == 1
-
- c.clear(c.current)
- c.tick(fm)
- assert c.count() == 0
- c.clear(c.current)
- assert c.done()
-
- q = Queue.Queue()
- fm.state.clear()
- fm.tick(q, timeout=0)
-
- fm.stop_client_playback()
- assert not fm.client_playback
-
-
-class TestServerPlaybackState:
-
- def test_hash(self):
- s = flow.ServerPlaybackState(
- None,
- [],
- False,
- False,
- None,
- False,
- None,
- False)
- r = tutils.tflow()
- r2 = tutils.tflow()
-
- assert s._hash(r)
- assert s._hash(r) == s._hash(r2)
- r.request.headers["foo"] = "bar"
- assert s._hash(r) == s._hash(r2)
- r.request.path = "voing"
- assert s._hash(r) != s._hash(r2)
-
- r.request.path = "path?blank_value"
- r2.request.path = "path?"
- assert s._hash(r) != s._hash(r2)
-
- def test_headers(self):
- s = flow.ServerPlaybackState(
- ["foo"],
- [],
- False,
- False,
- None,
- False,
- None,
- False)
- r = tutils.tflow(resp=True)
- r.request.headers["foo"] = "bar"
- r2 = tutils.tflow(resp=True)
- assert not s._hash(r) == s._hash(r2)
- r2.request.headers["foo"] = "bar"
- assert s._hash(r) == s._hash(r2)
- r2.request.headers["oink"] = "bar"
- assert s._hash(r) == s._hash(r2)
-
- r = tutils.tflow(resp=True)
- r2 = tutils.tflow(resp=True)
- assert s._hash(r) == s._hash(r2)
-
- def test_load(self):
- r = tutils.tflow(resp=True)
- r.request.headers["key"] = "one"
-
- r2 = tutils.tflow(resp=True)
- r2.request.headers["key"] = "two"
-
- s = flow.ServerPlaybackState(
- None, [
- r, r2], False, False, None, False, None, False)
- assert s.count() == 2
- assert len(s.fmap.keys()) == 1
-
- n = s.next_flow(r)
- assert n.request.headers["key"] == "one"
- assert s.count() == 1
-
- n = s.next_flow(r)
- assert n.request.headers["key"] == "two"
- assert s.count() == 0
-
- assert not s.next_flow(r)
-
- def test_load_with_nopop(self):
- r = tutils.tflow(resp=True)
- r.request.headers["key"] = "one"
-
- r2 = tutils.tflow(resp=True)
- r2.request.headers["key"] = "two"
-
- s = flow.ServerPlaybackState(
- None, [
- r, r2], False, True, None, False, None, False)
-
- assert s.count() == 2
- s.next_flow(r)
- assert s.count() == 2
-
- def test_ignore_params(self):
- s = flow.ServerPlaybackState(
- None, [], False, False, [
- "param1", "param2"], False, None, False)
- r = tutils.tflow(resp=True)
- r.request.path = "/test?param1=1"
- r2 = tutils.tflow(resp=True)
- r2.request.path = "/test"
- assert s._hash(r) == s._hash(r2)
- r2.request.path = "/test?param1=2"
- assert s._hash(r) == s._hash(r2)
- r2.request.path = "/test?param2=1"
- assert s._hash(r) == s._hash(r2)
- r2.request.path = "/test?param3=2"
- assert not s._hash(r) == s._hash(r2)
-
- def test_ignore_payload_params(self):
- s = flow.ServerPlaybackState(
- None, [], False, False, None, False, [
- "param1", "param2"], False)
- r = tutils.tflow(resp=True)
- r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
- r.request.content = "paramx=x&param1=1"
- r2 = tutils.tflow(resp=True)
- r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
- r2.request.content = "paramx=x&param1=1"
- # same parameters
- assert s._hash(r) == s._hash(r2)
- # ignored parameters !=
- r2.request.content = "paramx=x&param1=2"
- assert s._hash(r) == s._hash(r2)
- # missing parameter
- r2.request.content = "paramx=x"
- assert s._hash(r) == s._hash(r2)
- # ignorable parameter added
- r2.request.content = "paramx=x&param1=2"
- assert s._hash(r) == s._hash(r2)
- # not ignorable parameter changed
- r2.request.content = "paramx=y&param1=1"
- assert not s._hash(r) == s._hash(r2)
- # not ignorable parameter missing
- r2.request.content = "param1=1"
- assert not s._hash(r) == s._hash(r2)
-
- def test_ignore_payload_params_other_content_type(self):
- s = flow.ServerPlaybackState(
- None, [], False, False, None, False, [
- "param1", "param2"], False)
- r = tutils.tflow(resp=True)
- r.request.headers["Content-Type"] = "application/json"
- r.request.content = '{"param1":"1"}'
- r2 = tutils.tflow(resp=True)
- r2.request.headers["Content-Type"] = "application/json"
- r2.request.content = '{"param1":"1"}'
- # same content
- assert s._hash(r) == s._hash(r2)
- # distint content (note only x-www-form-urlencoded payload is analysed)
- r2.request.content = '{"param1":"2"}'
- assert not s._hash(r) == s._hash(r2)
-
- def test_ignore_payload_wins_over_params(self):
- # NOTE: parameters are mutually exclusive in options
- s = flow.ServerPlaybackState(
- None, [], False, False, None, True, [
- "param1", "param2"], False)
- r = tutils.tflow(resp=True)
- r.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
- r.request.content = "paramx=y"
- r2 = tutils.tflow(resp=True)
- r2.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
- r2.request.content = "paramx=x"
- # same parameters
- assert s._hash(r) == s._hash(r2)
-
- def test_ignore_content(self):
- s = flow.ServerPlaybackState(
- None,
- [],
- False,
- False,
- None,
- False,
- None,
- False)
- r = tutils.tflow(resp=True)
- r2 = tutils.tflow(resp=True)
-
- r.request.content = "foo"
- r2.request.content = "foo"
- assert s._hash(r) == s._hash(r2)
- r2.request.content = "bar"
- assert not s._hash(r) == s._hash(r2)
-
- # now ignoring content
- s = flow.ServerPlaybackState(
- None,
- [],
- False,
- False,
- None,
- True,
- None,
- False)
- r = tutils.tflow(resp=True)
- r2 = tutils.tflow(resp=True)
- r.request.content = "foo"
- r2.request.content = "foo"
- assert s._hash(r) == s._hash(r2)
- r2.request.content = "bar"
- assert s._hash(r) == s._hash(r2)
- r2.request.content = ""
- assert s._hash(r) == s._hash(r2)
- r2.request.content = None
- assert s._hash(r) == s._hash(r2)
-
- def test_ignore_host(self):
- s = flow.ServerPlaybackState(
- None,
- [],
- False,
- False,
- None,
- False,
- None,
- True)
- r = tutils.tflow(resp=True)
- r2 = tutils.tflow(resp=True)
-
- r.request.host = "address"
- r2.request.host = "address"
- assert s._hash(r) == s._hash(r2)
- r2.request.host = "wrong_address"
- assert s._hash(r) == s._hash(r2)
-
-
-class TestFlow(object):
-
- def test_copy(self):
- f = tutils.tflow(resp=True)
- f.get_state()
- f2 = f.copy()
- a = f.get_state()
- b = f2.get_state()
- del a["id"]
- del b["id"]
- assert a == b
- assert not f == f2
- assert not f is f2
- assert f.request.get_state() == f2.request.get_state()
- assert not f.request is f2.request
- assert f.request.headers == f2.request.headers
- assert not f.request.headers is f2.request.headers
- assert f.response.get_state() == f2.response.get_state()
- assert not f.response is f2.response
-
- f = tutils.tflow(err=True)
- f2 = f.copy()
- assert not f is f2
- assert not f.request is f2.request
- assert f.request.headers == f2.request.headers
- assert not f.request.headers is f2.request.headers
- assert f.error.get_state() == f2.error.get_state()
- assert not f.error is f2.error
-
- def test_match(self):
- f = tutils.tflow(resp=True)
- assert not f.match("~b test")
- assert f.match(None)
- assert not f.match("~b test")
-
- f = tutils.tflow(err=True)
- assert f.match("~e")
-
- tutils.raises(ValueError, f.match, "~")
-
- def test_backup(self):
- f = tutils.tflow()
- f.response = HTTPResponse.wrap(netlib.tutils.tresp())
- f.request.content = "foo"
- assert not f.modified()
- f.backup()
- f.request.content = "bar"
- assert f.modified()
- f.revert()
- assert f.request.content == "foo"
-
- def test_backup_idempotence(self):
- f = tutils.tflow(resp=True)
- f.backup()
- f.revert()
- f.backup()
- f.revert()
-
- def test_getset_state(self):
- f = tutils.tflow(resp=True)
- state = f.get_state()
- assert f.get_state() == HTTPFlow.from_state(
- state).get_state()
-
- f.response = None
- f.error = Error("error")
- state = f.get_state()
- assert f.get_state() == HTTPFlow.from_state(
- state).get_state()
-
- f2 = f.copy()
- f2.id = f.id # copy creates a different uuid
- assert f.get_state() == f2.get_state()
- assert not f == f2
- f2.error = Error("e2")
- assert not f == f2
- f.set_state(f2.get_state())
- assert f.get_state() == f2.get_state()
-
- def test_kill(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- f = tutils.tflow()
- f.intercept(mock.Mock())
- assert not f.reply.acked
- f.kill(fm)
- assert f.reply.acked
-
- def test_killall(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
-
- f = tutils.tflow()
- fm.handle_request(f)
-
- f = tutils.tflow()
- fm.handle_request(f)
-
- for i in s.view:
- assert not i.reply.acked
- s.killall(fm)
- for i in s.view:
- assert i.reply.acked
-
- def test_accept_intercept(self):
- f = tutils.tflow()
-
- f.intercept(mock.Mock())
- assert not f.reply.acked
- f.accept_intercept(mock.Mock())
- assert f.reply.acked
-
- def test_replace_unicode(self):
- f = tutils.tflow(resp=True)
- f.response.content = "\xc2foo"
- f.replace("foo", u"bar")
-
- def test_replace_no_content(self):
- f = tutils.tflow()
- f.request.content = CONTENT_MISSING
- assert f.replace("foo", "bar") == 0
-
- def test_replace(self):
- f = tutils.tflow(resp=True)
- f.request.headers["foo"] = "foo"
- f.request.content = "afoob"
-
- f.response.headers["foo"] = "foo"
- f.response.content = "afoob"
-
- assert f.replace("foo", "bar") == 6
-
- assert f.request.headers["bar"] == "bar"
- assert f.request.content == "abarb"
- assert f.response.headers["bar"] == "bar"
- assert f.response.content == "abarb"
-
- def test_replace_encoded(self):
- f = tutils.tflow(resp=True)
- f.request.content = "afoob"
- f.request.encode("gzip")
- f.response.content = "afoob"
- f.response.encode("gzip")
-
- f.replace("foo", "bar")
-
- assert f.request.content != "abarb"
- f.request.decode()
- assert f.request.content == "abarb"
-
- assert f.response.content != "abarb"
- f.response.decode()
- assert f.response.content == "abarb"
-
-
-class TestState:
-
- def test_backup(self):
- c = flow.State()
- f = tutils.tflow()
- c.add_flow(f)
- f.backup()
- c.revert(f)
-
- def test_flow(self):
- """
- normal flow:
-
- connect -> request -> response
- """
- c = flow.State()
- f = tutils.tflow()
- c.add_flow(f)
- assert f
- assert c.flow_count() == 1
- assert c.active_flow_count() == 1
-
- newf = tutils.tflow()
- assert c.add_flow(newf)
- assert c.active_flow_count() == 2
-
- f.response = HTTPResponse.wrap(netlib.tutils.tresp())
- assert c.update_flow(f)
- assert c.flow_count() == 2
- assert c.active_flow_count() == 1
-
- assert not c.update_flow(None)
- assert c.active_flow_count() == 1
-
- newf.response = HTTPResponse.wrap(netlib.tutils.tresp())
- assert c.update_flow(newf)
- assert c.active_flow_count() == 0
-
- def test_err(self):
- c = flow.State()
- f = tutils.tflow()
- c.add_flow(f)
- f.error = Error("message")
- assert c.update_flow(f)
-
- c = flow.State()
- f = tutils.tflow()
- c.add_flow(f)
- c.set_limit("~e")
- assert not c.view
- f.error = tutils.terr()
- assert c.update_flow(f)
- assert c.view
-
- def test_set_limit(self):
- c = flow.State()
-
- f = tutils.tflow()
- assert len(c.view) == 0
-
- c.add_flow(f)
- assert len(c.view) == 1
-
- c.set_limit("~s")
- assert c.limit_txt == "~s"
- assert len(c.view) == 0
- f.response = HTTPResponse.wrap(netlib.tutils.tresp())
- c.update_flow(f)
- assert len(c.view) == 1
- c.set_limit(None)
- assert len(c.view) == 1
-
- f = tutils.tflow()
- c.add_flow(f)
- assert len(c.view) == 2
- c.set_limit("~q")
- assert len(c.view) == 1
- c.set_limit("~s")
- assert len(c.view) == 1
-
- assert "Invalid" in c.set_limit("~")
-
- def test_set_intercept(self):
- c = flow.State()
- assert not c.set_intercept("~q")
- assert c.intercept_txt == "~q"
- assert "Invalid" in c.set_intercept("~")
- assert not c.set_intercept(None)
- assert c.intercept_txt is None
-
- def _add_request(self, state):
- f = tutils.tflow()
- state.add_flow(f)
- return f
-
- def _add_response(self, state):
- f = tutils.tflow()
- state.add_flow(f)
- f.response = HTTPResponse.wrap(netlib.tutils.tresp())
- state.update_flow(f)
-
- def _add_error(self, state):
- f = tutils.tflow(err=True)
- state.add_flow(f)
-
- def test_clear(self):
- c = flow.State()
- f = self._add_request(c)
- f.intercepted = True
-
- c.clear()
- assert c.flow_count() == 0
-
- def test_dump_flows(self):
- c = flow.State()
- self._add_request(c)
- self._add_response(c)
- self._add_request(c)
- self._add_response(c)
- self._add_request(c)
- self._add_response(c)
- self._add_error(c)
-
- flows = c.view[:]
- c.clear()
-
- c.load_flows(flows)
- assert isinstance(c.flows[0], Flow)
-
- def test_accept_all(self):
- c = flow.State()
- self._add_request(c)
- self._add_response(c)
- self._add_request(c)
- c.accept_all(mock.Mock())
-
-
-class TestSerialize:
-
- def _treader(self):
- sio = StringIO()
- w = flow.FlowWriter(sio)
- for i in range(3):
- f = tutils.tflow(resp=True)
- w.add(f)
- for i in range(3):
- f = tutils.tflow(err=True)
- w.add(f)
-
- sio.seek(0)
- return flow.FlowReader(sio)
-
- def test_roundtrip(self):
- sio = StringIO()
- f = tutils.tflow()
- f.request.content = "".join(chr(i) for i in range(255))
- w = flow.FlowWriter(sio)
- w.add(f)
-
- sio.seek(0)
- r = flow.FlowReader(sio)
- l = list(r.stream())
- assert len(l) == 1
-
- f2 = l[0]
- assert f2.get_state() == f.get_state()
- assert f2.request == f.request
-
- def test_load_flows(self):
- r = self._treader()
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- fm.load_flows(r)
- assert len(s.flows) == 6
-
- def test_load_flows_reverse(self):
- r = self._treader()
- s = flow.State()
- conf = ProxyConfig(
- mode="reverse",
- upstream_server=("https", ("use-this-domain", 80))
- )
- fm = flow.FlowMaster(DummyServer(conf), s)
- fm.load_flows(r)
- assert s.flows[0].request.host == "use-this-domain"
-
- def test_filter(self):
- sio = StringIO()
- fl = filt.parse("~c 200")
- w = flow.FilteredFlowWriter(sio, fl)
-
- f = tutils.tflow(resp=True)
- f.response.status_code = 200
- w.add(f)
-
- f = tutils.tflow(resp=True)
- f.response.status_code = 201
- w.add(f)
-
- sio.seek(0)
- r = flow.FlowReader(sio)
- assert len(list(r.stream()))
-
- def test_error(self):
- sio = StringIO()
- sio.write("bogus")
- sio.seek(0)
- r = flow.FlowReader(sio)
- tutils.raises(flow.FlowReadError, list, r.stream())
-
- f = flow.FlowReadError("foo")
- assert f.strerror == "foo"
-
- def test_versioncheck(self):
- f = tutils.tflow()
- d = f.get_state()
- d["version"] = (0, 0)
- sio = StringIO()
- tnetstring.dump(d, sio)
- sio.seek(0)
-
- r = flow.FlowReader(sio)
- tutils.raises("version", list, r.stream())
-
-
-class TestFlowMaster:
-
- def test_load_script(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- assert not fm.load_script(tutils.test_data.path("scripts/a.py"))
- assert not fm.load_script(tutils.test_data.path("scripts/a.py"))
- assert not fm.unload_scripts()
- assert fm.load_script("nonexistent")
- assert "ValueError" in fm.load_script(
- tutils.test_data.path("scripts/starterr.py"))
- assert len(fm.scripts) == 0
-
- def test_getset_ignore(self):
- p = mock.Mock()
- p.config.check_ignore = HostMatcher()
- fm = flow.FlowMaster(p, flow.State())
- assert not fm.get_ignore_filter()
- fm.set_ignore_filter(["^apple\.com:", ":443$"])
- assert fm.get_ignore_filter()
-
- def test_replay(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- f = tutils.tflow(resp=True)
- f.request.content = CONTENT_MISSING
- assert "missing" in fm.replay_request(f)
-
- f.intercepted = True
- assert "intercepting" in fm.replay_request(f)
-
- f.live = True
- assert "live" in fm.replay_request(f, run_scripthooks=True)
-
- def test_script_reqerr(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- assert not fm.load_script(tutils.test_data.path("scripts/reqerr.py"))
- f = tutils.tflow()
- fm.handle_clientconnect(f.client_conn)
- assert fm.handle_request(f)
-
- def test_script(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
- f = tutils.tflow(resp=True)
-
- fm.handle_clientconnect(f.client_conn)
- assert fm.scripts[0].ns["log"][-1] == "clientconnect"
- fm.handle_serverconnect(f.server_conn)
- assert fm.scripts[0].ns["log"][-1] == "serverconnect"
- fm.handle_request(f)
- assert fm.scripts[0].ns["log"][-1] == "request"
- fm.handle_response(f)
- assert fm.scripts[0].ns["log"][-1] == "response"
- # load second script
- assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
- assert len(fm.scripts) == 2
- fm.handle_clientdisconnect(f.server_conn)
- assert fm.scripts[0].ns["log"][-1] == "clientdisconnect"
- assert fm.scripts[1].ns["log"][-1] == "clientdisconnect"
-
- # unload first script
- fm.unload_scripts()
- assert len(fm.scripts) == 0
- assert not fm.load_script(tutils.test_data.path("scripts/all.py"))
-
- f.error = tutils.terr()
- fm.handle_error(f)
- assert fm.scripts[0].ns["log"][-1] == "error"
-
- def test_duplicate_flow(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- f = tutils.tflow(resp=True)
- f = fm.load_flow(f)
- assert s.flow_count() == 1
- f2 = fm.duplicate_flow(f)
- assert f2.response
- assert s.flow_count() == 2
- assert s.index(f2) == 1
-
- def test_all(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- fm.anticache = True
- fm.anticomp = True
- f = tutils.tflow(req=None)
- fm.handle_clientconnect(f.client_conn)
- f.request = HTTPRequest.wrap(netlib.tutils.treq())
- fm.handle_request(f)
- assert s.flow_count() == 1
-
- f.response = HTTPResponse.wrap(netlib.tutils.tresp())
- fm.handle_response(f)
- assert not fm.handle_response(None)
- assert s.flow_count() == 1
-
- fm.handle_clientdisconnect(f.client_conn)
-
- f.error = Error("msg")
- f.error.reply = controller.DummyReply()
- fm.handle_error(f)
-
- fm.load_script(tutils.test_data.path("scripts/a.py"))
- fm.shutdown()
-
- def test_client_playback(self):
- s = flow.State()
-
- f = tutils.tflow(resp=True)
- pb = [tutils.tflow(resp=True), f]
- fm = flow.FlowMaster(DummyServer(ProxyConfig()), s)
- assert not fm.start_server_playback(
- pb,
- False,
- [],
- False,
- False,
- None,
- False,
- None,
- False)
- assert not fm.start_client_playback(pb, False)
- fm.client_playback.testing = True
-
- q = Queue.Queue()
- assert not fm.state.flow_count()
- fm.tick(q, 0)
- assert fm.state.flow_count()
-
- f.error = Error("error")
- fm.handle_error(f)
-
- def test_server_playback(self):
- s = flow.State()
-
- f = tutils.tflow()
- f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=f.request))
- pb = [f]
-
- fm = flow.FlowMaster(None, s)
- fm.refresh_server_playback = True
- assert not fm.do_server_playback(tutils.tflow())
-
- fm.start_server_playback(
- pb,
- False,
- [],
- False,
- False,
- None,
- False,
- None,
- False)
- assert fm.do_server_playback(tutils.tflow())
-
- fm.start_server_playback(
- pb,
- False,
- [],
- True,
- False,
- None,
- False,
- None,
- False)
- r = tutils.tflow()
- r.request.content = "gibble"
- assert not fm.do_server_playback(r)
- assert fm.do_server_playback(tutils.tflow())
-
- fm.start_server_playback(
- pb,
- False,
- [],
- True,
- False,
- None,
- False,
- None,
- False)
- q = Queue.Queue()
- fm.tick(q, 0)
- assert fm.should_exit.is_set()
-
- fm.stop_server_playback()
- assert not fm.server_playback
-
- def test_server_playback_kill(self):
- s = flow.State()
- f = tutils.tflow()
- f.response = HTTPResponse.wrap(netlib.tutils.tresp(content=f.request))
- pb = [f]
- fm = flow.FlowMaster(None, s)
- fm.refresh_server_playback = True
- fm.start_server_playback(
- pb,
- True,
- [],
- False,
- False,
- None,
- False,
- None,
- False)
-
- f = tutils.tflow()
- f.request.host = "nonexistent"
- fm.process_new_request(f)
- assert "killed" in f.error.msg
-
- def test_stickycookie(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- assert "Invalid" in fm.set_stickycookie("~h")
- fm.set_stickycookie(".*")
- assert fm.stickycookie_state
- fm.set_stickycookie(None)
- assert not fm.stickycookie_state
-
- fm.set_stickycookie(".*")
- f = tutils.tflow(resp=True)
- f.response.headers["set-cookie"] = "foo=bar"
- fm.handle_request(f)
- fm.handle_response(f)
- assert fm.stickycookie_state.jar
- assert not "cookie" in f.request.headers
- f = f.copy()
- fm.handle_request(f)
- assert f.request.headers["cookie"] == "foo=bar"
-
- def test_stickyauth(self):
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- assert "Invalid" in fm.set_stickyauth("~h")
- fm.set_stickyauth(".*")
- assert fm.stickyauth_state
- fm.set_stickyauth(None)
- assert not fm.stickyauth_state
-
- fm.set_stickyauth(".*")
- f = tutils.tflow(resp=True)
- f.request.headers["authorization"] = "foo"
- fm.handle_request(f)
-
- f = tutils.tflow(resp=True)
- assert fm.stickyauth_state.hosts
- assert not "authorization" in f.request.headers
- fm.handle_request(f)
- assert f.request.headers["authorization"] == "foo"
-
- def test_stream(self):
- with tutils.tmpdir() as tdir:
- p = os.path.join(tdir, "foo")
-
- def r():
- r = flow.FlowReader(open(p, "rb"))
- return list(r.stream())
-
- s = flow.State()
- fm = flow.FlowMaster(None, s)
- f = tutils.tflow(resp=True)
-
- fm.start_stream(file(p, "ab"), None)
- fm.handle_request(f)
- fm.handle_response(f)
- fm.stop_stream()
-
- assert r()[0].response
-
- f = tutils.tflow()
- fm.start_stream(file(p, "ab"), None)
- fm.handle_request(f)
- fm.shutdown()
-
- assert not r()[1].response
-
-
-class TestRequest:
-
- def test_simple(self):
- f = tutils.tflow()
- r = f.request
- u = r.url
- r.url = u
- tutils.raises(ValueError, setattr, r, "url", "")
- assert r.url == u
- r2 = r.copy()
- assert r.get_state() == r2.get_state()
-
- def test_get_url(self):
- r = HTTPRequest.wrap(netlib.tutils.treq())
-
- assert r.url == "http://address:22/path"
-
- r.scheme = "https"
- assert r.url == "https://address:22/path"
-
- r.host = "host"
- r.port = 42
- assert r.url == "https://host:42/path"
-
- r.host = "address"
- r.port = 22
- assert r.url == "https://address:22/path"
-
- assert r.pretty_url == "https://address:22/path"
- r.headers["Host"] = "foo.com"
- assert r.url == "https://address:22/path"
- assert r.pretty_url == "https://foo.com:22/path"
-
- def test_path_components(self):
- r = HTTPRequest.wrap(netlib.tutils.treq())
- r.path = "/"
- assert r.get_path_components() == []
- r.path = "/foo/bar"
- assert r.get_path_components() == ["foo", "bar"]
- q = odict.ODict()
- q["test"] = ["123"]
- r.set_query(q)
- assert r.get_path_components() == ["foo", "bar"]
-
- r.set_path_components([])
- assert r.get_path_components() == []
- r.set_path_components(["foo"])
- assert r.get_path_components() == ["foo"]
- r.set_path_components(["/oo"])
- assert r.get_path_components() == ["/oo"]
- assert "%2F" in r.path
-
- def test_getset_form_urlencoded(self):
- d = odict.ODict([("one", "two"), ("three", "four")])
- r = HTTPRequest.wrap(netlib.tutils.treq(content=netlib.utils.urlencode(d.lst)))
- r.headers["content-type"] = "application/x-www-form-urlencoded"
- assert r.get_form_urlencoded() == d
-
- d = odict.ODict([("x", "y")])
- r.set_form_urlencoded(d)
- assert r.get_form_urlencoded() == d
-
- r.headers["content-type"] = "foo"
- assert not r.get_form_urlencoded()
-
- def test_getset_query(self):
- r = HTTPRequest.wrap(netlib.tutils.treq())
- r.path = "/foo?x=y&a=b"
- q = r.get_query()
- assert q.lst == [("x", "y"), ("a", "b")]
-
- r.path = "/"
- q = r.get_query()
- assert not q
-
- r.path = "/?adsfa"
- q = r.get_query()
- assert q.lst == [("adsfa", "")]
-
- r.path = "/foo?x=y&a=b"
- assert r.get_query()
- r.set_query(odict.ODict([]))
- assert not r.get_query()
- qv = odict.ODict([("a", "b"), ("c", "d")])
- r.set_query(qv)
- assert r.get_query() == qv
-
- def test_anticache(self):
- r = HTTPRequest.wrap(netlib.tutils.treq())
- r.headers = Headers()
- r.headers["if-modified-since"] = "test"
- r.headers["if-none-match"] = "test"
- r.anticache()
- assert not "if-modified-since" in r.headers
- assert not "if-none-match" in r.headers
-
- def test_replace(self):
- r = HTTPRequest.wrap(netlib.tutils.treq())
- r.path = "path/foo"
- r.headers["Foo"] = "fOo"
- r.content = "afoob"
- assert r.replace("foo(?i)", "boo") == 4
- assert r.path == "path/boo"
- assert not "foo" in r.content
- assert r.headers["boo"] == "boo"
-
- def test_constrain_encoding(self):
- r = HTTPRequest.wrap(netlib.tutils.treq())
- r.headers["accept-encoding"] = "gzip, oink"
- r.constrain_encoding()
- assert "oink" not in r.headers["accept-encoding"]
-
- r.headers.set_all("accept-encoding", ["gzip", "oink"])
- r.constrain_encoding()
- assert "oink" not in r.headers["accept-encoding"]
-
- def test_get_decoded_content(self):
- r = HTTPRequest.wrap(netlib.tutils.treq())
- r.content = None
- r.headers["content-encoding"] = "identity"
- assert r.get_decoded_content() is None
-
- r.content = "falafel"
- r.encode("gzip")
- assert r.get_decoded_content() == "falafel"
-
- def test_get_content_type(self):
- resp = HTTPResponse.wrap(netlib.tutils.tresp())
- resp.headers = Headers(content_type="text/plain")
- assert resp.headers["content-type"] == "text/plain"
-
-
-class TestResponse:
-
- def test_simple(self):
- f = tutils.tflow(resp=True)
- resp = f.response
- resp2 = resp.copy()
- assert resp2.get_state() == resp.get_state()
-
- def test_refresh(self):
- r = HTTPResponse.wrap(netlib.tutils.tresp())
- n = time.time()
- r.headers["date"] = email.utils.formatdate(n)
- pre = r.headers["date"]
- r.refresh(n)
- assert pre == r.headers["date"]
- r.refresh(n + 60)
-
- d = email.utils.parsedate_tz(r.headers["date"])
- d = email.utils.mktime_tz(d)
- # Weird that this is not exact...
- assert abs(60 - (d - n)) <= 1
-
- r.headers["set-cookie"] = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
- r.refresh()
-
- def test_refresh_cookie(self):
- r = HTTPResponse.wrap(netlib.tutils.tresp())
-
- # Invalid expires format, sent to us by Reddit.
- c = "rfoo=bar; Domain=reddit.com; expires=Thu, 31 Dec 2037 23:59:59 GMT; Path=/"
- assert r._refresh_cookie(c, 60)
-
- c = "MOO=BAR; Expires=Tue, 08-Mar-2011 00:20:38 GMT; Path=foo.com; Secure"
- assert "00:21:38" in r._refresh_cookie(c, 60)
-
- # https://github.com/mitmproxy/mitmproxy/issues/773
- c = ">=A"
- with tutils.raises(ValueError):
- r._refresh_cookie(c, 60)
-
- def test_replace(self):
- r = HTTPResponse.wrap(netlib.tutils.tresp())
- r.headers["Foo"] = "fOo"
- r.content = "afoob"
- assert r.replace("foo(?i)", "boo") == 3
- assert not "foo" in r.content
- assert r.headers["boo"] == "boo"
-
- def test_get_content_type(self):
- resp = HTTPResponse.wrap(netlib.tutils.tresp())
- resp.headers = Headers(content_type="text/plain")
- assert resp.headers["content-type"] == "text/plain"
-
-
-class TestError:
-
- def test_getset_state(self):
- e = Error("Error")
- state = e.get_state()
- assert Error.from_state(state).get_state() == e.get_state()
-
- assert e.copy()
-
- e2 = Error("bar")
- assert not e == e2
- e.set_state(e2.get_state())
- assert e.get_state() == e2.get_state()
-
- e3 = e.copy()
- assert e3.get_state() == e.get_state()
-
-
-class TestClientConnection:
-
- def test_state(self):
-
- c = tutils.tclient_conn()
- assert ClientConnection.from_state(c.get_state()).get_state() ==\
- c.get_state()
-
- c2 = tutils.tclient_conn()
- c2.address.address = (c2.address.host, 4242)
- assert not c == c2
-
- c2.timestamp_start = 42
- c.set_state(c2.get_state())
- assert c.timestamp_start == 42
-
- c3 = c.copy()
- assert c3.get_state() == c.get_state()
-
- assert str(c)
-
-
-def test_replacehooks():
- h = flow.ReplaceHooks()
- h.add("~q", "foo", "bar")
- assert h.lst
-
- h.set(
- [
- (".*", "one", "two"),
- (".*", "three", "four"),
- ]
- )
- assert h.count() == 2
-
- h.clear()
- assert not h.lst
-
- h.add("~q", "foo", "bar")
- h.add("~s", "foo", "bar")
-
- v = h.get_specs()
- assert v == [('~q', 'foo', 'bar'), ('~s', 'foo', 'bar')]
- assert h.count() == 2
- h.clear()
- assert h.count() == 0
-
- f = tutils.tflow()
- f.request.content = "foo"
- h.add("~s", "foo", "bar")
- h.run(f)
- assert f.request.content == "foo"
-
- f = tutils.tflow(resp=True)
- f.request.content = "foo"
- f.response.content = "foo"
- h.run(f)
- assert f.response.content == "bar"
- assert f.request.content == "foo"
-
- f = tutils.tflow()
- h.clear()
- h.add("~q", "foo", "bar")
- f.request.content = "foo"
- h.run(f)
- assert f.request.content == "bar"
-
- assert not h.add("~", "foo", "bar")
- assert not h.add("foo", "*", "bar")
-
-
-def test_setheaders():
- h = flow.SetHeaders()
- h.add("~q", "foo", "bar")
- assert h.lst
-
- h.set(
- [
- (".*", "one", "two"),
- (".*", "three", "four"),
- ]
- )
- assert h.count() == 2
-
- h.clear()
- assert not h.lst
-
- h.add("~q", "foo", "bar")
- h.add("~s", "foo", "bar")
-
- v = h.get_specs()
- assert v == [('~q', 'foo', 'bar'), ('~s', 'foo', 'bar')]
- assert h.count() == 2
- h.clear()
- assert h.count() == 0
-
- f = tutils.tflow()
- f.request.content = "foo"
- h.add("~s", "foo", "bar")
- h.run(f)
- assert f.request.content == "foo"
-
- h.clear()
- h.add("~s", "one", "two")
- h.add("~s", "one", "three")
- f = tutils.tflow(resp=True)
- f.request.headers["one"] = "xxx"
- f.response.headers["one"] = "xxx"
- h.run(f)
- assert f.request.headers["one"] == "xxx"
- assert f.response.headers.get_all("one") == ["two", "three"]
-
- h.clear()
- h.add("~q", "one", "two")
- h.add("~q", "one", "three")
- f = tutils.tflow()
- f.request.headers["one"] = "xxx"
- h.run(f)
- assert f.request.headers.get_all("one") == ["two", "three"]
-
- assert not h.add("~", "foo", "bar")