1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
|
import pytest
from mitmproxy.test import taddons
from mitmproxy.test import tflow
from mitmproxy import io
from mitmproxy import exceptions
from mitmproxy.addons import save
from mitmproxy.addons import view
def test_configure(tmpdir):
sa = save.Save()
with taddons.context(sa) as tctx:
with pytest.raises(exceptions.OptionsError):
tctx.configure(sa, save_stream_file=str(tmpdir))
with pytest.raises(Exception, match="Invalid filter"):
tctx.configure(
sa, save_stream_file=str(tmpdir.join("foo")), save_stream_filter="~~"
)
tctx.configure(sa, save_stream_filter="foo")
assert sa.filt
tctx.configure(sa, save_stream_filter=None)
assert not sa.filt
def rd(p):
with open(p, "rb") as f:
x = io.FlowReader(f)
return list(x.stream())
def test_tcp(tmpdir):
sa = save.Save()
with taddons.context(sa) as tctx:
p = str(tmpdir.join("foo"))
tctx.configure(sa, save_stream_file=p)
tt = tflow.ttcpflow()
sa.tcp_start(tt)
sa.tcp_end(tt)
tctx.configure(sa, save_stream_file=None)
assert rd(p)
def test_websocket(tmpdir):
sa = save.Save()
with taddons.context(sa) as tctx:
p = str(tmpdir.join("foo"))
tctx.configure(sa, save_stream_file=p)
f = tflow.twebsocketflow()
sa.websocket_start(f)
sa.websocket_end(f)
tctx.configure(sa, save_stream_file=None)
assert rd(p)
def test_save_command(tmpdir):
sa = save.Save()
with taddons.context() as tctx:
p = str(tmpdir.join("foo"))
sa.save([tflow.tflow(resp=True)], p)
assert len(rd(p)) == 1
sa.save([tflow.tflow(resp=True)], p)
assert len(rd(p)) == 1
sa.save([tflow.tflow(resp=True)], "+" + p)
assert len(rd(p)) == 2
with pytest.raises(exceptions.CommandError):
sa.save([tflow.tflow(resp=True)], str(tmpdir))
v = view.View()
tctx.master.addons.add(v)
tctx.master.addons.add(sa)
tctx.master.commands.execute("save.file @shown %s" % p)
def test_simple(tmpdir):
sa = save.Save()
with taddons.context(sa) as tctx:
p = str(tmpdir.join("foo"))
tctx.configure(sa, save_stream_file=p)
f = tflow.tflow(resp=True)
sa.request(f)
sa.response(f)
tctx.configure(sa, save_stream_file=None)
assert rd(p)[0].response
tctx.configure(sa, save_stream_file="+" + p)
f = tflow.tflow()
sa.request(f)
tctx.configure(sa, save_stream_file=None)
assert not rd(p)[1].response
|