1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
import contextlib
from . import tutils
import mitmproxy.test.tutils
from mitmproxy import master
from mitmproxy import io
from mitmproxy import proxy
from mitmproxy import http
from mitmproxy import options
class TestMaster:
pass
class MasterTest:
def cycle(self, master, content):
f = tutils.tflow(req=mitmproxy.test.tutils.treq(content=content))
master.clientconnect(f.client_conn)
master.serverconnect(f.server_conn)
master.request(f)
if not f.error:
f.response = http.HTTPResponse.wrap(
mitmproxy.test.tutils.tresp(content=content)
)
master.response(f)
master.clientdisconnect(f)
return f
def dummy_cycle(self, master, n, content):
for i in range(n):
self.cycle(master, content)
master.shutdown()
def flowfile(self, path):
f = open(path, "wb")
fw = io.FlowWriter(f)
t = tutils.tflow(resp=True)
fw.add(t)
f.close()
class RecordingMaster(master.Master):
def __init__(self, *args, **kwargs):
master.Master.__init__(self, *args, **kwargs)
self.event_log = []
def add_log(self, e, level):
self.event_log.append((level, e))
@contextlib.contextmanager
def mockctx():
o = options.Options(refresh_server_playback = True, keepserving=False)
m = RecordingMaster(o, proxy.DummyServer(o))
with m.handlecontext():
yield
|