aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/eventsequence.py
blob: 79b1bed4b3b673eddc8ca86804782c915830636f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import typing

from mitmproxy import controller
from mitmproxy import flow
from mitmproxy import http
from mitmproxy import tcp
from mitmproxy import websocket

Events = frozenset([
    "clientconnect",
    "clientdisconnect",
    "serverconnect",
    "serverdisconnect",
    # TCP
    "tcp_start",
    "tcp_message",
    "tcp_error",
    "tcp_end",
    # HTTP
    "http_connect",
    "request",
    "requestheaders",
    "response",
    "responseheaders",
    "error",
    # WebSocket
    "websocket_handshake",
    "websocket_start",
    "websocket_message",
    "websocket_error",
    "websocket_end",
    # misc
    "next_layer",
    "configure",
    "done",
    "log",
    "load",
    "running",
    "update",
])

TEventGenerator = typing.Iterator[typing.Tuple[str, typing.Any]]


def _iterate_http(f: http.HTTPFlow) -> TEventGenerator:
    if f.request:
        yield "requestheaders", f
        yield "request", f
    if f.response:
        yield "responseheaders", f
        yield "response", f
    if f.error:
        yield "error", f


def _iterate_websocket(f: websocket.WebSocketFlow) -> TEventGenerator:
    messages = f.messages
    f.messages = []
    f.reply = controller.DummyReply()
    yield "websocket_start", f
    while messages:
        f.messages.append(messages.pop(0))
        yield "websocket_message", f
    if f.error:
        yield "websocket_error", f
    yield "websocket_end", f


def _iterate_tcp(f: tcp.TCPFlow) -> TEventGenerator:
    messages = f.messages
    f.messages = []
    f.reply = controller.DummyReply()
    yield "tcp_start", f
    while messages:
        f.messages.append(messages.pop(0))
        yield "tcp_message", f
    if f.error:
        yield "tcp_error", f
    yield "tcp_end", f


_iterate_map: typing.Dict[typing.Type[flow.Flow], typing.Callable[[typing.Any], TEventGenerator]] = {
    http.HTTPFlow: _iterate_http,
    websocket.WebSocketFlow: _iterate_websocket,
    tcp.TCPFlow: _iterate_tcp,
}


def iterate(f: flow.Flow) -> TEventGenerator:
    try:
        e = _iterate_map[type(f)]
    except KeyError as err:
        raise TypeError("Unknown flow type: {}".format(f)) from err
    else:
        yield from e(f)