aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/contentviews/protobuf.py
blob: 50f8dcede2d6908490a810a47d7807b41ce0e4f0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import io

from kaitaistruct import KaitaiStream
from . import base
from mitmproxy.contrib.kaitaistruct import google_protobuf


def write_buf(out, field_tag, body, indent_level):
    if body is not None:
        out.write("{: <{level}}{}: {}\n".format('', field_tag, body if isinstance(body, int) else str(body, 'utf-8'),
                                                level=indent_level))
    elif field_tag is not None:
        out.write(' ' * indent_level + str(field_tag) + " {\n")
    else:
        out.write(' ' * indent_level + "}\n")


def format_pbuf(raw):
    out = io.StringIO()
    stack = []

    try:
        buf = google_protobuf.GoogleProtobuf(KaitaiStream(io.BytesIO(raw)))
    except:
        return False
    stack.extend([(pair, 0) for pair in buf.pairs[::-1]])

    while len(stack):
        pair, indent_level = stack.pop()

        if pair.wire_type == pair.WireTypes.group_start:
            body = None
        elif pair.wire_type == pair.WireTypes.group_end:
            body = None
            pair._m_field_tag = None
        elif pair.wire_type == pair.WireTypes.len_delimited:
            body = pair.value.body
        elif pair.wire_type == pair.WireTypes.varint:
            body = pair.value.value
        else:
            body = pair.value

        try:
            next_buf = google_protobuf.GoogleProtobuf(KaitaiStream(io.BytesIO(body)))
            stack.extend([(pair, indent_level + 2) for pair in next_buf.pairs[::-1]])
            write_buf(out, pair.field_tag, None, indent_level)
        except:
            write_buf(out, pair.field_tag, body, indent_level)

        if stack:
            prev_level = stack[-1][1]
        else:
            prev_level = 0

        if prev_level < indent_level:
            levels = int((indent_level - prev_level) / 2)
            for i in range(1, levels + 1):
                write_buf(out, None, None, indent_level - i * 2)

    return out.getvalue()


class ViewProtobuf(base.View):
    """Human friendly view of protocol buffers
    The view uses the protoc compiler to decode the binary
    """

    name = "Protocol Buffer"
    content_types = [
        "application/x-protobuf",
        "application/x-protobuffer",
    ]

    def __call__(self, data, **metadata):
        decoded = format_pbuf(data)
        if not decoded:
            raise ValueError("Failed to parse input.")

        return "Protobuf", base.format_text(decoded)