aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/tools/web/static_viewer.py
blob: 416c2539efe6187aeaffeb26657a42cb477d0e7e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import json
import os.path
import pathlib
import shutil
import time
import typing

from mitmproxy import contentviews
from mitmproxy import ctx
from mitmproxy import flowfilter
from mitmproxy import io, flow
from mitmproxy.tools.web.app import flow_to_json

web_dir = pathlib.Path(__file__).absolute().parent


def save_static(path: pathlib.Path) -> None:
    """
    Save the files for the static web view.
    """
    # We want to overwrite the static files to keep track of the update.
    if (path / "static").exists():
        shutil.rmtree(str(path / "static"))
    shutil.copytree(str(web_dir / "static"), str(path / "static"))
    shutil.copyfile(str(web_dir / 'templates' / 'index.html'), str(path / "index.html"))

    with open(str(path / "static" / "static.js"), "w") as f:
        f.write("MITMWEB_STATIC = true;")


def save_filter_help(path: pathlib.Path) -> None:
    with open(str(path / 'filter-help.json'), 'w') as f:
        json.dump(dict(commands=flowfilter.help), f)


def save_flows(path: pathlib.Path, flows: typing.Iterable[flow.Flow]) -> None:
    with open(str(path / 'flows.json'), 'w') as f:
        json.dump(
            [flow_to_json(f) for f in flows],
            f
        )


def save_flows_content(path: pathlib.Path, flows: typing.Iterable[flow.Flow]) -> None:
    for f in flows:
        for m in ('request', 'response'):
            message = getattr(f, m)
            message_path = path / "flows" / f.id / m
            os.makedirs(str(message_path / "content"), exist_ok=True)

            with open(str(message_path / 'content.data'), 'wb') as content_file:
                # don't use raw_content here as this is served with a default content type
                if message:
                    content_file.write(message.content)
                else:
                    content_file.write(b'No content.')

            # content_view
            t = time.time()
            if message:
                description, lines, error = contentviews.get_message_content_view(
                    'Auto', message
                )
            else:
                description, lines = 'No content.', []
            if time.time() - t > 0.1:
                ctx.log(
                    "Slow content view: {} took {}s".format(
                        description.strip(),
                        round(time.time() - t, 1)
                    ),
                    "info"
                )
            with open(str(message_path / "content" / "Auto.json"), "w") as content_view_file:
                json.dump(
                    dict(lines=list(lines), description=description),
                    content_view_file
                )


class StaticViewer:
    # TODO: make this a command at some point.
    def load(self, loader):
        loader.add_option(
            "web_static_viewer", typing.Optional[str], "",
            "The path to output a static viewer."
        )

    def configure(self, updated):
        if "web_static_viewer" in updated and ctx.options.web_static_viewer:
            flows = io.read_flows_from_paths([ctx.options.rfile])
            p = pathlib.Path(ctx.options.web_static_viewer).expanduser()
            self.export(p, flows)

    def export(self, path: pathlib.Path, flows: typing.Iterable[flow.Flow]) -> None:
        save_static(path)
        save_filter_help(path)
        save_flows(path, flows)
        save_flows_content(path, flows)