aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/addons/readfile.py
blob: 2b9ac2df67efc85da2ea333754ac20a1a0bb3d95 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import asyncio
import os.path
import sys
import typing

from mitmproxy import ctx
from mitmproxy import exceptions
from mitmproxy import flowfilter
from mitmproxy import io
from mitmproxy import command


class ReadFile:
    """
        An addon that handles reading from file on startup.
    """
    def __init__(self):
        self.filter = None
        self.is_reading = False

    def load(self, loader):
        loader.add_option(
            "rfile", typing.Optional[str], None,
            "Read flows from file."
        )
        loader.add_option(
            "readfile_filter", typing.Optional[str], None,
            "Read only matching flows."
        )

    def configure(self, updated):
        if "readfile_filter" in updated:
            filt = None
            if ctx.options.readfile_filter:
                filt = flowfilter.parse(ctx.options.readfile_filter)
                if not filt:
                    raise exceptions.OptionsError(
                        "Invalid readfile filter: %s" % ctx.options.readfile_filter
                    )
            self.filter = filt

    async def load_flows(self, fo: typing.IO[bytes]) -> int:
        cnt = 0
        freader = io.FlowReader(fo)
        try:
            for flow in freader.stream():
                if self.filter and not self.filter(flow):
                    continue
                await ctx.master.load_flow(flow)
                cnt += 1
        except (IOError, exceptions.FlowReadException) as e:
            if cnt:
                ctx.log.warn("Flow file corrupted - loaded %i flows." % cnt)
            else:
                ctx.log.error("Flow file corrupted.")
            raise exceptions.FlowReadException(str(e)) from e
        else:
            return cnt

    async def load_flows_from_path(self, path: str) -> int:
        path = os.path.expanduser(path)
        try:
            with open(path, "rb") as f:
                return await self.load_flows(f)
        except IOError as e:
            ctx.log.error("Cannot load flows: {}".format(e))
            raise exceptions.FlowReadException(str(e)) from e

    async def doread(self, rfile):
        self.is_reading = True
        try:
            await self.load_flows_from_path(ctx.options.rfile)
        except exceptions.FlowReadException as e:
            raise exceptions.OptionsError(e) from e
        finally:
            self.is_reading = False

    def running(self):
        if ctx.options.rfile:
            asyncio.get_event_loop().create_task(self.doread(ctx.options.rfile))

    @command.command("readfile.reading")
    def reading(self) -> bool:
        return self.is_reading


class ReadFileStdin(ReadFile):
    """Support the special case of "-" for reading from stdin"""
    async def load_flows_from_path(self, path: str) -> int:
        if path == "-":  # pragma: no cover
            # Need to think about how to test this. This function is scheduled
            # onto the event loop, where a sys.stdin mock has no effect.
            return await self.load_flows(sys.stdin.buffer)
        else:
            return await super().load_flows_from_path(path)