1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
from __future__ import absolute_import, print_function, division
from .. import tutils, mastertest
import os.path
from mitmproxy.builtins import filestreamer
from mitmproxy.flow import master, FlowReader
from mitmproxy.flow import state
from mitmproxy import options
class TestStream(mastertest.MasterTest):
def test_stream(self):
with tutils.tmpdir() as tdir:
p = os.path.join(tdir, "foo")
def r():
r = FlowReader(open(p, "rb"))
return list(r.stream())
s = state.State()
o = options.Options(
outfile = (p, "wb")
)
m = master.FlowMaster(o, None, s)
sa = filestreamer.FileStreamer()
m.addons.add(o, sa)
f = tutils.tflow(resp=True)
self.invoke(m, "request", f)
self.invoke(m, "response", f)
m.addons.remove(sa)
assert r()[0].response
m.options.outfile = (p, "ab")
m.addons.add(o, sa)
f = tutils.tflow()
self.invoke(m, "request", f)
m.addons.remove(sa)
assert not r()[1].response
|