aboutsummaryrefslogtreecommitdiffstats
path: root/test/mitmproxy/addons/test_readfile.py
blob: f7e0c5c53104d633e804b2546b6f854955e30720 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
import io
from unittest import mock

import pytest
import asynctest

import mitmproxy.io
from mitmproxy import exceptions
from mitmproxy.addons import readfile
from mitmproxy.test import taddons
from mitmproxy.test import tflow


@pytest.fixture
def data():
    f = io.BytesIO()

    w = mitmproxy.io.FlowWriter(f)
    flows = [
        tflow.tflow(resp=True),
        tflow.tflow(err=True),
        tflow.ttcpflow(),
        tflow.ttcpflow(err=True)
    ]
    for flow in flows:
        w.add(flow)

    f.seek(0)
    return f


@pytest.fixture
def corrupt_data():
    f = data()
    f.seek(0, io.SEEK_END)
    f.write(b"qibble")
    f.seek(0)
    return f


class TestReadFile:
    def test_configure(self):
        rf = readfile.ReadFile()
        with taddons.context() as tctx:
            tctx.configure(rf, readfile_filter="~q")
            with pytest.raises(Exception, match="Invalid readfile filter"):
                tctx.configure(rf, readfile_filter="~~")

    @pytest.mark.asyncio
    async def test_read(self, tmpdir, data, corrupt_data):
        rf = readfile.ReadFile()
        with taddons.context(rf) as tctx:
            tf = tmpdir.join("tfile")

            with asynctest.patch('mitmproxy.master.Master.load_flow') as mck:
                tf.write(data.getvalue())
                tctx.configure(
                    rf,
                    rfile = str(tf),
                    readfile_filter = ".*"
                )
                assert not mck.awaited
                rf.running()
                await asyncio.sleep(0)
                assert mck.awaited

            tf.write(corrupt_data.getvalue())
            tctx.configure(rf, rfile=str(tf))
            rf.running()
            assert await tctx.master.await_log("corrupted")

    @pytest.mark.asyncio
    async def test_corrupt(self, corrupt_data):
        rf = readfile.ReadFile()
        with taddons.context(rf) as tctx:
            with pytest.raises(exceptions.FlowReadException):
                await rf.load_flows(io.BytesIO(b"qibble"))

            tctx.master.clear()
            with pytest.raises(exceptions.FlowReadException):
                await rf.load_flows(corrupt_data)
            assert await tctx.master.await_log("file corrupted")

    @pytest.mark.asyncio
    async def test_nonexistent_file(self):
        rf = readfile.ReadFile()
        with taddons.context(rf) as tctx:
            with pytest.raises(exceptions.FlowReadException):
                await rf.load_flows_from_path("nonexistent")
            assert await tctx.master.await_log("nonexistent")


class TestReadFileStdin:
    @asynctest.patch('sys.stdin')
    @pytest.mark.asyncio
    async def test_stdin(self, stdin, data, corrupt_data):
        rf = readfile.ReadFileStdin()
        with taddons.context(rf):
            with asynctest.patch('mitmproxy.master.Master.load_flow') as mck:
                stdin.buffer = data
                assert not mck.awaited
                await rf.load_flows(stdin.buffer)
                assert mck.awaited

                stdin.buffer = corrupt_data
                with pytest.raises(exceptions.FlowReadException):
                    await rf.load_flows(stdin.buffer)

    @pytest.mark.asyncio
    @mock.patch('mitmproxy.master.Master.load_flow')
    async def test_normal(self, load_flow, tmpdir, data):
        rf = readfile.ReadFileStdin()
        with taddons.context(rf) as tctx:
            tf = tmpdir.join("tfile")
            with asynctest.patch('mitmproxy.master.Master.load_flow') as mck:
                tf.write(data.getvalue())
                tctx.configure(rf, rfile=str(tf))
                assert not mck.awaited
                rf.running()
                await asyncio.sleep(0)
                assert mck.awaited