1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
import pytest
from unittest import mock
from mitmproxy.test import tflow
from mitmproxy import io
from mitmproxy import exceptions
from mitmproxy.addons import clientplayback
from mitmproxy.test import taddons
def tdump(path, flows):
with open(path, "wb") as f:
w = io.FlowWriter(f)
for i in flows:
w.add(i)
class MockThread():
def is_alive(self):
return False
class TestClientPlayback:
# @staticmethod
# def wait_until_not_live(flow):
# """
# Race condition: We don't want to replay the flow while it is still live.
# """
# s = time.time()
# while flow.live:
# time.sleep(0.001)
# if time.time() - s > 5:
# raise RuntimeError("Flow is live for too long.")
# def test_replay(self):
# assert self.pathod("304").status_code == 304
# assert len(self.master.state.flows) == 1
# l = self.master.state.flows[-1]
# assert l.response.status_code == 304
# l.request.path = "/p/305"
# self.wait_until_not_live(l)
# rt = self.master.replay_request(l, block=True)
# assert l.response.status_code == 305
# # Disconnect error
# l.request.path = "/p/305:d0"
# rt = self.master.replay_request(l, block=True)
# assert rt
# if isinstance(self, tservers.HTTPUpstreamProxyTest):
# assert l.response.status_code == 502
# else:
# assert l.error
# # Port error
# l.request.port = 1
# # In upstream mode, we get a 502 response from the upstream proxy server.
# # In upstream mode with ssl, the replay will fail as we cannot establish
# # SSL with the upstream proxy.
# rt = self.master.replay_request(l, block=True)
# assert rt
# if isinstance(self, tservers.HTTPUpstreamProxyTest):
# assert l.response.status_code == 502
# else:
# assert l.error
# def test_replay(self):
# opts = options.Options()
# fm = master.Master(opts)
# f = tflow.tflow(resp=True)
# f.request.content = None
# with pytest.raises(ReplayException, match="missing"):
# fm.replay_request(f)
# f.request = None
# with pytest.raises(ReplayException, match="request"):
# fm.replay_request(f)
# f.intercepted = True
# with pytest.raises(ReplayException, match="intercepted"):
# fm.replay_request(f)
# f.live = True
# with pytest.raises(ReplayException, match="live"):
# fm.replay_request(f)
# req = tutils.treq(headers=net_http.Headers(((b":authority", b"foo"), (b"header", b"qvalue"), (b"content-length", b"7"))))
# f = tflow.tflow(req=req)
# f.request.http_version = "HTTP/2.0"
# with mock.patch('mitmproxy.proxy.protocol.http_replay.RequestReplayThread.run'):
# rt = fm.replay_request(f)
# assert rt.f.request.http_version == "HTTP/1.1"
# assert ":authority" not in rt.f.request.headers
def test_load_file(self, tmpdir):
cp = clientplayback.ClientPlayback()
with taddons.context(cp):
fpath = str(tmpdir.join("flows"))
tdump(fpath, [tflow.tflow(resp=True)])
cp.load_file(fpath)
assert cp.count() == 1
with pytest.raises(exceptions.CommandError):
cp.load_file("/nonexistent")
def test_configure(self, tmpdir):
cp = clientplayback.ClientPlayback()
with taddons.context(cp) as tctx:
path = str(tmpdir.join("flows"))
tdump(path, [tflow.tflow()])
assert cp.count() == 0
tctx.configure(cp, client_replay=[path])
assert cp.count() == 1
tctx.configure(cp, client_replay=[])
with pytest.raises(exceptions.OptionsError):
tctx.configure(cp, client_replay=["nonexistent"])
def test_check(self):
cp = clientplayback.ClientPlayback()
with taddons.context(cp):
f = tflow.tflow(resp=True)
f.live = True
assert "live flow" in cp.check(f)
f = tflow.tflow(resp=True)
f.intercepted = True
assert "intercepted flow" in cp.check(f)
f = tflow.tflow(resp=True)
f.request = None
assert "missing request" in cp.check(f)
f = tflow.tflow(resp=True)
f.request.raw_content = None
assert "missing content" in cp.check(f)
def test_playback(self):
cp = clientplayback.ClientPlayback()
with taddons.context(cp):
assert cp.count() == 0
f = tflow.tflow(resp=True)
cp.start_replay([f])
assert cp.count() == 1
cp.stop_replay()
assert cp.count() == 0
|