blob: aaf02d01b339e272af18663b9566c892699fde3f (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
import os.path
import sys
import typing
from mitmproxy import ctx
from mitmproxy import exceptions
from mitmproxy import io
class ReadFile:
"""
An addon that handles reading from file on startup.
"""
def load(self, loader):
loader.add_option(
"rfile", typing.Optional[str], None,
"Read flows from file."
)
def load_flows(self, fo: typing.IO[bytes]) -> int:
cnt = 0
freader = io.FlowReader(fo)
try:
for flow in freader.stream():
ctx.master.load_flow(flow)
cnt += 1
except (IOError, exceptions.FlowReadException) as e:
if cnt:
ctx.log.warn("Flow file corrupted - loaded %i flows." % cnt)
else:
ctx.log.error("Flow file corrupted.")
raise exceptions.FlowReadException(str(e)) from e
else:
return cnt
def load_flows_from_path(self, path: str) -> int:
path = os.path.expanduser(path)
try:
with open(path, "rb") as f:
return self.load_flows(f)
except IOError as e:
ctx.log.error("Cannot load flows: {}".format(e))
raise exceptions.FlowReadException(str(e)) from e
def running(self):
if ctx.options.rfile:
try:
self.load_flows_from_path(ctx.options.rfile)
except exceptions.FlowReadException as e:
raise exceptions.OptionsError(e) from e
finally:
ctx.master.addons.trigger("processing_complete")
class ReadFileStdin(ReadFile):
"""Support the special case of "-" for reading from stdin"""
def load_flows_from_path(self, path: str) -> int:
if path == "-":
return self.load_flows(sys.stdin.buffer)
else:
return super().load_flows_from_path(path)
|