diff options
Diffstat (limited to 'libmproxy')
| -rw-r--r-- | libmproxy/console.py | 1888 | ||||
| -rw-r--r-- | libmproxy/console/__init__.py | 931 | ||||
| -rw-r--r-- | libmproxy/console/common.py | 148 | ||||
| -rw-r--r-- | libmproxy/console/connlist.py | 197 | ||||
| -rw-r--r-- | libmproxy/console/connview.py | 533 | ||||
| -rw-r--r-- | libmproxy/console/help.py | 114 | ||||
| -rw-r--r-- | libmproxy/console/kveditor.py | 232 | ||||
| -rw-r--r-- | libmproxy/utils.py | 12 | 
8 files changed, 2167 insertions, 1888 deletions
diff --git a/libmproxy/console.py b/libmproxy/console.py deleted file mode 100644 index 9f24c3ab..00000000 --- a/libmproxy/console.py +++ /dev/null @@ -1,1888 +0,0 @@ -# Copyright (C) 2010  Aldo Cortesi -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program.  If not, see <http://www.gnu.org/licenses/>. - -import mailcap, mimetypes, tempfile, os, subprocess, glob, time, re -import os.path, sys -import cStringIO -import urwid -import controller, utils, filt, flow, encoding - -VIEW_CUTOFF = 1024*100 -EVENTLOG_SIZE = 500 - - -class Stop(Exception): pass - - -def shortcuts(k): -    if k == " ": -        k = "page down" -    elif k == "j": -        k = "down" -    elif k == "k": -        k = "up" -    return k - - -def highlight_key(s, k): -    l = [] -    parts = s.split(k, 1) -    if parts[0]: -        l.append(("text", parts[0])) -    l.append(("key", k)) -    if parts[1]: -        l.append(("text", parts[1])) -    return l - - -def format_keyvals(lst, key="key", val="text", space=5, indent=0): -    """ -        Format a list of (key, value) tuples. - -        If key is None, it's treated specially: -            - We assume a sub-value, and add an extra indent. -            - The value is treated as a pre-formatted list of directives. -    """ -    ret = [] -    if lst: -        pad = max(len(i[0]) for i in lst if i and i[0]) + space -        for i in lst: -            if i is None: -                ret.extend("\n") -            elif i[0] is None: -                ret.append(" "*(pad + indent*2)) -                ret.extend(i[1]) -                ret.append("\n") -            else: -                ret.extend( -                    [ -                        " "*indent, -                        (key, i[0]), -                        " "*(pad-len(i[0])), -                        (val, i[1]), -                        "\n" -                    ] -                ) -    return ret - - -def format_flow(f, focus, extended=False, padding=2): -    txt = [] -    if extended: -        txt.append(("highlight", utils.format_timestamp(f.request.timestamp))) -    txt.append(" ") -    if f.request.is_replay(): -        txt.append(("method", "[replay]")) -    txt.extend([ -        ("ack", "!") if f.intercepting and not f.request.acked else " ", -        ("method", f.request.method), -        " ", -        ( -            "text" if (f.response or f.error) else "title", -            f.request.get_url(), -        ), -    ]) -    if f.response or f.error or f.request.is_replay(): -        tsr = f.response or f.error -        if extended and tsr: -            ts = ("highlight", utils.format_timestamp(tsr.timestamp) + " ") -        else: -            ts = " " - -        txt.append("\n") -        txt.append(("text", ts)) -        txt.append(" "*(padding+2)) - -    if f.response: -        txt.append( -           ("ack", "!") if f.intercepting and not f.response.acked else " " -        ) -        txt.append("<- ") -        if f.response.is_replay(): -            txt.append(("method", "[replay] ")) -        if f.response.code in [200, 304]: -            txt.append(("goodcode", str(f.response.code))) -        else: -            txt.append(("error", str(f.response.code))) -        t = f.response.headers["content-type"] -        if t: -            t = t[0].split(";")[0] -            txt.append(("text", " %s"%t)) -        if f.response.content: -            txt.append(", %s"%utils.pretty_size(len(f.response.content))) -    elif f.error: -        txt.append( -           ("error", f.error.msg) -        ) - -    if focus: -        txt.insert(0, ("focus", ">>" + " "*(padding-2))) -    else: -        txt.insert(0, " "*padding) -    return txt - - - -#begin nocover - -def int_version(v): -    SIG = 3 -    v = urwid.__version__.split("-")[0].split(".") -    x = 0 -    for i in range(min(SIG, len(v))): -        x += int(v[i]) * 10**(SIG-i) -    return x - - -# We have to do this to be portable over 0.9.8 and 0.9.9 If compatibility -# becomes a pain to maintain, we'll just mandate 0.9.9 or newer. -class WWrap(urwid.WidgetWrap): -    if int_version(urwid.__version__) >= 990: -        def set_w(self, x): -            self._w = x -        def get_w(self): -            return self._w -        w = property(get_w, set_w) - - -class ConnectionItem(WWrap): -    def __init__(self, master, state, flow, focus): -        self.master, self.state, self.flow = master, state, flow -        self.focus = focus -        w = self.get_text() -        WWrap.__init__(self, w) - -    def get_text(self): -        return urwid.Text(format_flow(self.flow, self.focus)) - -    def selectable(self): -        return True - -    def keypress(self, (maxcol,), key): -        key = shortcuts(key) -        if key == "a": -            self.flow.accept_intercept() -            self.master.sync_list_view() -        elif key == "d": -            self.flow.kill(self.master) -            self.state.delete_flow(self.flow) -            self.master.sync_list_view() -        elif key == "r": -            r = self.master.replay_request(self.flow) -            if r: -                self.master.statusbar.message(r) -            self.master.sync_list_view() -        elif key == "R": -            self.state.revert(self.flow) -            self.master.sync_list_view() -        elif key == "W": -            self.master.path_prompt( -                "Save this flow: ", -                self.state.last_saveload, -                self.master.save_one_flow, -                self.flow -            ) -        elif key == "X": -            self.flow.kill(self.master) -        elif key == "v": -            self.master.toggle_eventlog() -        elif key == "enter": -            if self.flow.request: -                self.master.view_flow(self.flow) -        elif key == "|": -            self.master.path_prompt( -                "Send flow to script: ", self.state.last_script, -                self.master.run_script_once, self.flow -            ) -        return key - - -class ConnectionListView(urwid.ListWalker): -    def __init__(self, master, state): -        self.master, self.state = master, state -        if self.state.flow_count(): -            self.set_focus(0) - -    def get_focus(self): -        f, i = self.state.get_focus() -        f = ConnectionItem(self.master, self.state, f, True) if f else None -        return f, i - -    def set_focus(self, focus): -        ret = self.state.set_focus(focus) -        self._modified() -        return ret - -    def get_next(self, pos): -        f, i = self.state.get_next(pos) -        f = ConnectionItem(self.master, self.state, f, False) if f else None -        return f, i - -    def get_prev(self, pos): -        f, i = self.state.get_prev(pos) -        f = ConnectionItem(self.master, self.state, f, False) if f else None -        return f, i - - -class ConnectionListBox(urwid.ListBox): -    def __init__(self, master): -        self.master = master -        urwid.ListBox.__init__(self, master.conn_list_view) - -    def keypress(self, size, key): -        key = shortcuts(key) -        if key == "A": -            self.master.accept_all() -            self.master.sync_list_view() -            key = None -        elif key == "C": -            self.master.clear_connections() -            key = None -        elif key == "v": -            self.master.toggle_eventlog() -            key = None -        return urwid.ListBox.keypress(self, size, key) - - -class EventListBox(urwid.ListBox): -    def __init__(self, master): -        self.master = master -        urwid.ListBox.__init__(self, master.eventlist) - -    def keypress(self, size, key): -        key = shortcuts(key) -        if key == "C": -            self.master.clear_events() -            key = None -        return urwid.ListBox.keypress(self, size, key) - - -class KVEditor(WWrap): -    def __init__(self, master, title, value, callback): -        self.master, self.title, self.value, self.callback = master, title, value, callback -        p = urwid.Text(title) -        p = urwid.Padding(p, align="left", width=("relative", 100)) -        p = urwid.AttrWrap(p, "heading") -        maxk = max(len(v[0]) for v in value) -        parts = [] -        for k, v in value: -            parts.append( -                urwid.Columns( -                    [ -                        ( -                            "fixed", -                            maxk + 2, -                            urwid.AttrWrap(urwid.Edit(edit_text=k, wrap="any"), "editfield"), -                        ), -                        urwid.AttrWrap(urwid.Edit(edit_text=v, wrap="any"), "editfield"), -                    ], -                    dividechars = 2 -                ) -            ) -            parts.append(urwid.Text(" ")) -        self.lb = urwid.ListBox(parts) -        self.w = urwid.Frame(self.lb, header = p) -        self.master.statusbar.update("") - -    def keypress(self, size, key): -        if key in ("tab", "enter"): -            cw = self.lb.get_focus()[0] -            col = cw.get_focus_column() -            if col == 0: -                cw.set_focus_column(1) -            else: -                self.lb._keypress_down(size) -                cw = self.lb.get_focus()[0] -                cw.set_focus_column(0) -            return None -        return self.w.keypress(size, key) - - -class ConnectionViewHeader(WWrap): -    def __init__(self, master, f): -        self.master, self.flow = master, f -        self.w = urwid.Text(format_flow(f, False, extended=True, padding=0)) - -    def refresh_connection(self, f): -        if f == self.flow: -            self.w = urwid.Text(format_flow(f, False, extended=True, padding=0)) - - -VIEW_BODY_RAW = 0 -VIEW_BODY_HEX = 1 -VIEW_BODY_PRETTY = 2 - -BODY_VIEWS = { -    VIEW_BODY_RAW: "raw", -    VIEW_BODY_HEX: "hex", -    VIEW_BODY_PRETTY: "pretty" -} - -VIEW_FLOW_REQUEST = 0 -VIEW_FLOW_RESPONSE = 1 - -class ConnectionView(WWrap): -    REQ = 0 -    RESP = 1 -    methods = [ -        ("get", "g"), -        ("post", "p"), -        ("put", "u"), -        ("head", "h"), -        ("trace", "t"), -        ("delete", "d"), -        ("options", "o"), -    ] -    def __init__(self, master, state, flow): -        self.master, self.state, self.flow = master, state, flow -        if self.state.view_flow_mode == VIEW_FLOW_RESPONSE and flow.response: -            self.view_response() -        else: -            self.view_request() - -    def _tab(self, content, active): -        if active: -            attr = "heading" -        else: -            attr = "inactive" -        p = urwid.Text(content) -        p = urwid.Padding(p, align="left", width=("relative", 100)) -        p = urwid.AttrWrap(p, attr) -        return p - -    def wrap_body(self, active, body): -        parts = [] - -        if self.flow.intercepting and not self.flow.request.acked: -            qt = "Request (intercepted)" -        else: -            qt = "Request" -        if active == VIEW_FLOW_REQUEST: -            parts.append(self._tab(qt, True)) -        else: -            parts.append(self._tab(qt, False)) - -        if self.flow.intercepting and not self.flow.response.acked: -            st = "Response (intercepted)" -        else: -            st = "Response" -        if active == VIEW_FLOW_RESPONSE: -            parts.append(self._tab(st, True)) -        else: -            parts.append(self._tab(st, False)) - -        h = urwid.Columns(parts, dividechars=1) -        f = urwid.Frame( -                    body, -                    header=h -                ) -        return f - -    def _conn_text(self, conn, viewmode): -        e = conn.headers["content-encoding"] -        e = e[0] if e else None -        return self.master._cached_conn_text( -                    e, -                    conn.content, -                    tuple(tuple(i) for i in conn.headers.lst), -                    viewmode -                ) - -    def view_request(self): -        self.state.view_flow_mode = VIEW_FLOW_REQUEST -        self.master.statusbar.update("Calculating view...") -        body = self._conn_text( -            self.flow.request, -            self.state.view_body_mode -        ) -        self.w = self.wrap_body(VIEW_FLOW_REQUEST, body) -        self.master.statusbar.update("") - -    def view_response(self): -        self.state.view_flow_mode = VIEW_FLOW_RESPONSE -        self.master.statusbar.update("Calculating view...") -        if self.flow.response: -            body = self._conn_text( -                self.flow.response, -                self.state.view_body_mode -            ) -        else: -            body = urwid.ListBox( -                        [ -                            urwid.Text(""), -                            urwid.Text( -                                [ -                                    ("highlight", "No response. Press "), -                                    ("key", "e"), -                                    ("highlight", " and edit any aspect to add one."), -                                ] -                            ) -                        ] -                   ) -        self.w = self.wrap_body(VIEW_FLOW_RESPONSE, body) -        self.master.statusbar.update("") - -    def refresh_connection(self, c=None): -        if c == self.flow: -            if self.state.view_flow_mode == VIEW_FLOW_RESPONSE and self.flow.response: -                self.view_response() -            else: -                self.view_request() - -    def _spawn_editor(self, data): -        fd, name = tempfile.mkstemp('', "mproxy") -        os.write(fd, data) -        os.close(fd) -        c = os.environ.get("EDITOR") -        #If no EDITOR is set, assume 'vi' -        if not c: -            c = "vi" -        cmd = [c, name] -        self.master.ui.stop() -        try: -            subprocess.call(cmd) -        except: -            self.master.statusbar.message("Can't start editor: %s" % c) -            self.master.ui.start() -            os.unlink(name) -            return data -        self.master.ui.start() -        data = open(name).read() -        os.unlink(name) -        return data - -    def edit_method(self, m): -        for i in self.methods: -            if i[1] == m: -                self.flow.request.method = i[0].upper() -        self.master.refresh_connection(self.flow) - -    def save_body(self, path): -        if not path: -            return -        self.state.last_saveload = path -        if self.state.view_flow_mode == VIEW_FLOW_REQUEST: -            c = self.flow.request -        else: -            c = self.flow.response -        path = os.path.expanduser(path) -        try: -            f = file(path, "wb") -            f.write(str(c.content)) -            f.close() -        except IOError, v: -            self.master.statusbar.message(v.strerror) - -    def set_url(self, url): -        request = self.flow.request -        if not request.set_url(str(url)): -            return "Invalid URL." -        self.master.refresh_connection(self.flow) - -    def set_resp_code(self, code): -        response = self.flow.response -        try: -            response.code = int(code) -        except ValueError: -            return None -        import BaseHTTPServer -        if BaseHTTPServer.BaseHTTPRequestHandler.responses.has_key(int(code)): -            response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[int(code)][0] -        self.master.refresh_connection(self.flow) - -    def set_resp_msg(self, msg): -        response = self.flow.response -        response.msg = msg -        self.master.refresh_connection(self.flow) - -    def edit(self, part): -        if self.state.view_flow_mode == VIEW_FLOW_REQUEST: -            conn = self.flow.request -        else: -            if not self.flow.response: -                self.flow.response = flow.Response(self.flow.request, 200, "OK", flow.Headers(), "") -            conn = self.flow.response - -        self.flow.backup() -        if part == "b": -            c = self._spawn_editor(conn.content or "") -            conn.content = c.rstrip("\n") -        elif part == "h": -            headertext = self._spawn_editor(repr(conn.headers)) -            headers = flow.Headers() -            fp = cStringIO.StringIO(headertext) -            headers.read(fp) -            conn.headers = headers -        elif part == "u" and self.state.view_flow_mode == VIEW_FLOW_REQUEST: -            self.master.prompt_edit("URL", conn.get_url(), self.set_url) -        elif part == "m" and self.state.view_flow_mode == VIEW_FLOW_REQUEST: -            self.master.prompt_onekey("Method", self.methods, self.edit_method) -        elif part == "c" and self.state.view_flow_mode == VIEW_FLOW_RESPONSE: -            self.master.prompt_edit("Code", str(conn.code), self.set_resp_code) -        elif part == "m" and self.state.view_flow_mode == VIEW_FLOW_RESPONSE: -            self.master.prompt_edit("Message", conn.msg, self.set_resp_msg) -        self.master.refresh_connection(self.flow) - -    def keypress(self, size, key): -        if key == " ": -            self.master.view_next_flow(self.flow) -            return key - -        key = shortcuts(key) -        if self.state.view_flow_mode == VIEW_FLOW_REQUEST: -            conn = self.flow.request -        else: -            conn = self.flow.response -        if key == "tab": -            if self.state.view_flow_mode == VIEW_FLOW_REQUEST: -                self.view_response() -            else: -                self.view_request() -        elif key in ("up", "down", "page up", "page down"): -            # Why doesn't this just work?? -            self.w.body.keypress(size, key) -        elif key == "a": -            self.flow.accept_intercept() -            self.master.view_flow(self.flow) -        elif key == "A": -            self.master.accept_all() -            self.master.view_flow(self.flow) -        elif key == "e": -            if self.state.view_flow_mode == VIEW_FLOW_REQUEST: -                self.master.prompt_onekey( -                    "Edit request", -                    ( -                        ("header", "h"), -                        ("body", "b"), -                        ("url", "u"), -                        ("method", "m"), -                    ), -                    self.edit -                ) -            else: -                self.master.prompt_onekey( -                    "Edit response", -                    ( -                        ("code", "c"), -                        ("message", "m"), -                        ("header", "h"), -                        ("body", "b"), -                    ), -                    self.edit -                ) -            key = None -        elif key == "p": -            self.master.view_prev_flow(self.flow) -        elif key == "r": -            r = self.master.replay_request(self.flow) -            if r: -                self.master.statusbar.message(r) -            self.master.refresh_connection(self.flow) -        elif key == "R": -            self.state.revert(self.flow) -            self.master.refresh_connection(self.flow) -        elif key == "W": -            self.master.path_prompt( -                "Save this flow: ", -                self.state.last_saveload, -                self.master.save_one_flow, -                self.flow -            ) -        elif key == "v": -            if conn and conn.content: -                t = conn.headers["content-type"] or [None] -                t = t[0] -                self.master.spawn_external_viewer(conn.content, t) -        elif key == "b": -            if conn: -                if self.state.view_flow_mode == VIEW_FLOW_REQUEST: -                    self.master.path_prompt( -                        "Save request body: ", -                        self.state.last_saveload, -                        self.save_body -                    ) -                else: -                    self.master.path_prompt( -                        "Save response body: ", -                        self.state.last_saveload, -                        self.save_body -                    ) -        elif key == "|": -            self.master.path_prompt( -                "Send flow to script: ", self.state.last_script, -                self.master.run_script_once, self.flow -            ) -        elif key == "z": -            if conn: -                e = conn.headers["content-encoding"] or ["identity"] -                if e[0] != "identity": -                    conn.decode() -                else: -                    self.master.prompt_onekey( -                        "Select encoding: ", -                        ( -                            ("gzip", "z"), -                            ("deflate", "d"), -                        ), -                        self.encode_callback, -                        conn -                    ) -                self.master.refresh_connection(self.flow) -        return key - -    def encode_callback(self, key, conn): -        encoding_map = { -            "z": "gzip", -            "d": "deflate", -        } -        conn.encode(encoding_map[key]) -        self.master.refresh_connection(self.flow) - - -class _PathCompleter: -    def __init__(self, _testing=False): -        """ -            _testing: disables reloading of the lookup table to make testing possible. -        """ -        self.lookup, self.offset = None, None -        self.final = None -        self._testing = _testing - -    def reset(self): -        self.lookup = None -        self.offset = -1 - -    def complete(self, txt): -        """ -            Returns the next completion for txt, or None if there is no completion. -        """ -        path = os.path.expanduser(txt) -        if not self.lookup: -            if not self._testing: -                # Lookup is a set of (display value, actual value) tuples. -                self.lookup = [] -                if os.path.isdir(path): -                    files = glob.glob(os.path.join(path, "*")) -                    prefix = txt -                else: -                    files = glob.glob(path+"*") -                    prefix = os.path.dirname(txt) -                prefix = prefix or "./" -                for f in files: -                    display = os.path.join(prefix, os.path.basename(f)) -                    if os.path.isdir(f): -                        display += "/" -                    self.lookup.append((display, f)) -            if not self.lookup: -                self.final = path -                return path -            self.lookup.sort() -            self.offset = -1 -            self.lookup.append((txt, txt)) -        self.offset += 1 -        if self.offset >= len(self.lookup): -            self.offset = 0 -        ret = self.lookup[self.offset] -        self.final = ret[1] -        return ret[0] - - -class PathEdit(urwid.Edit, _PathCompleter): -    def __init__(self, *args, **kwargs): -        urwid.Edit.__init__(self, *args, **kwargs) -        _PathCompleter.__init__(self) - -    def keypress(self, size, key): -        if key == "tab": -            comp = self.complete(self.get_edit_text()) -            self.set_edit_text(comp) -            self.set_edit_pos(len(comp)) -        else: -            self.reset() -        return urwid.Edit.keypress(self, size, key) - - -class ActionBar(WWrap): -    def __init__(self): -        self.message("") - -    def selectable(self): -        return True - -    def path_prompt(self, prompt, text): -        self.w = PathEdit(prompt, text) - -    def prompt(self, prompt, text = ""): -        self.w = urwid.Edit(prompt, text or "") - -    def message(self, message): -        self.w = urwid.Text(message) - - -class StatusBar(WWrap): -    def __init__(self, master, helptext): -        self.master, self.helptext = master, helptext -        self.expire = None -        self.ab = ActionBar() -        self.ib = WWrap(urwid.Text("")) -        self.w = urwid.Pile([self.ib, self.ab]) - -    def get_status(self): -        r = [] - -        if self.master.client_playback: -            r.append("[") -            r.append(("statusbar_highlight", "cplayback")) -            r.append(":%s to go]"%self.master.client_playback.count()) -        if self.master.server_playback: -            r.append("[") -            r.append(("statusbar_highlight", "splayback")) -            r.append(":%s to go]"%self.master.server_playback.count()) -        if self.master.state.intercept_txt: -            r.append("[") -            r.append(("statusbar_highlight", "i")) -            r.append(":%s]"%self.master.state.intercept_txt) -        if self.master.state.limit_txt: -            r.append("[") -            r.append(("statusbar_highlight", "l")) -            r.append(":%s]"%self.master.state.limit_txt) -        if self.master.stickycookie_txt: -            r.append("[") -            r.append(("statusbar_highlight", "t")) -            r.append(":%s]"%self.master.stickycookie_txt) -        if self.master.stickyauth_txt: -            r.append("[") -            r.append(("statusbar_highlight", "u")) -            r.append(":%s]"%self.master.stickyauth_txt) - -        opts = [] -        if self.master.anticache: -            opts.append("anticache") -        if self.master.anticomp: -            opts.append("anticomp") -        if not self.master.refresh_server_playback: -            opts.append("norefresh") -        if self.master.killextra: -            opts.append("killextra") - -        if opts: -            r.append("[%s]"%(":".join(opts))) - -        if self.master.script: -            r.append("[script:%s]"%self.master.script.path) - -        if self.master.debug: -            r.append("[lt:%0.3f]"%self.master.looptime) - -        return r - -    def redraw(self): -        if self.expire and time.time() > self.expire: -            self.message("") - -        t = [ -                ('statusbar_text', ("[%s]"%self.master.state.flow_count()).ljust(7)), -            ] -        t.extend(self.get_status()) - -        if self.master.server: -            boundaddr = "[%s:%s]"%(self.master.server.address or "*", self.master.server.port) -        else: -            boundaddr = "" - -        status = urwid.AttrWrap(urwid.Columns([ -            urwid.Text(t), -            urwid.Text( -                [ -                    self.helptext, -                    " ", -                    ('statusbar_text', "["), -                    ('statusbar_key', "m"), -                    ('statusbar_text', (":%s]"%BODY_VIEWS[self.master.state.view_body_mode])), -                    ('statusbar_text', boundaddr), -                ], -                align="right" -            ), -        ]), "statusbar") -        self.ib.set_w(status) - -    def update(self, text): -        self.helptext = text -        self.redraw() -        self.master.drawscreen() - -    def selectable(self): -        return True - -    def get_edit_text(self): -        return self.ab.w.get_edit_text() - -    def path_prompt(self, prompt, text): -        return self.ab.path_prompt(prompt, text) - -    def prompt(self, prompt, text = ""): -        self.ab.prompt(prompt, text) - -    def message(self, msg, expire=None): -        if expire: -            self.expire = time.time() + float(expire)/1000 -        else: -            self.expire = None -        self.ab.message(msg) - - -#end nocover - -class ConsoleState(flow.State): -    def __init__(self): -        flow.State.__init__(self) -        self.focus = None - -        self.view_body_mode = VIEW_BODY_PRETTY -        self.view_flow_mode = VIEW_FLOW_REQUEST - -        self.last_script = "" -        self.last_saveload = "" - -    def add_request(self, req): -        f = flow.State.add_request(self, req) -        if self.focus is None: -            self.set_focus(0) -        return f - -    def add_response(self, resp): -        f = flow.State.add_response(self, resp) -        if self.focus is None: -            self.set_focus(0) -        return f - -    def set_limit(self, limit): -        ret = flow.State.set_limit(self, limit) -        self.set_focus(self.focus) -        return ret - -    def get_focus(self): -        if not self.view or self.focus is None: -            return None, None -        return self.view[self.focus], self.focus - -    def set_focus(self, idx): -        if self.view: -            if idx >= len(self.view): -                idx = len(self.view) - 1 -            elif idx < 0: -                idx = 0 -            self.focus = idx - -    def get_from_pos(self, pos): -        if len(self.view) <= pos or pos < 0: -            return None, None -        return self.view[pos], pos - -    def get_next(self, pos): -        return self.get_from_pos(pos+1) - -    def get_prev(self, pos): -        return self.get_from_pos(pos-1) - -    def delete_flow(self, f): -        ret = flow.State.delete_flow(self, f) -        self.set_focus(self.focus) -        return ret - - - -class Options(object): -    __slots__ = [ -        "anticache", -        "anticomp", -        "client_replay", -        "debug", -        "eventlog", -        "keepserving", -        "kill", -        "intercept", -        "no_server", -        "refresh_server_playback", -        "rfile", -        "script", -        "rheaders", -        "server_replay", -        "stickycookie", -        "stickyauth", -        "verbosity", -        "wfile", -    ] -    def __init__(self, **kwargs): -        for k, v in kwargs.items(): -            setattr(self, k, v) -        for i in self.__slots__: -            if not hasattr(self, i): -                setattr(self, i, None) - - -#begin nocover - -class BodyPile(urwid.Pile): -    def __init__(self, master): -        h = urwid.Text("Event log") -        h = urwid.Padding(h, align="left", width=("relative", 100)) - -        self.inactive_header = urwid.AttrWrap(h, "inactive_heading") -        self.active_header = urwid.AttrWrap(h, "heading") - -        urwid.Pile.__init__( -            self, -            [ -                ConnectionListBox(master), -                urwid.Frame(EventListBox(master), header = self.inactive_header) -            ] -        ) -        self.master = master -        self.focus = 0 - -    def keypress(self, size, key): -        if key == "tab": -            self.focus = (self.focus + 1)%len(self.widget_list) -            self.set_focus(self.focus) -            if self.focus == 1: -                self.widget_list[1].header = self.active_header -            else: -                self.widget_list[1].header = self.inactive_header -            key = None -        elif key == "v": -            self.master.toggle_eventlog() -            key = None - -        # This is essentially a copypasta from urwid.Pile's keypress handler. -        # So much for "closed for modification, but open for extension". -        item_rows = None -        if len(size)==2: -            item_rows = self.get_item_rows( size, focus=True ) -        i = self.widget_list.index(self.focus_item) -        tsize = self.get_item_size(size,i,True,item_rows) -        return self.focus_item.keypress( tsize, key ) - - -VIEW_CONNLIST = 0 -VIEW_FLOW = 1 -VIEW_HELP = 2 -VIEW_KVEDITOR = 3 - -class ConsoleMaster(flow.FlowMaster): -    palette = [] -    footer_text_default = [ -        ('statusbar_key', "?"), ":help ", -    ] -    footer_text_help = [ -        ('statusbar_key', "q"), ":back", -    ] -    footer_text_connview = [ -        ('statusbar_key', "tab"), ":toggle view ", -        ('statusbar_key', "?"), ":help ", -        ('statusbar_key', "q"), ":back ", -    ] -    def __init__(self, server, options): -        flow.FlowMaster.__init__(self, server, ConsoleState()) -        self.looptime = 0 -        self.options = options - -        self.conn_list_view = None -        self.set_palette() - -        r = self.set_intercept(options.intercept) -        if r: -            print >> sys.stderr, "Intercept error:", r -            sys.exit(1) - -        r = self.set_stickycookie(options.stickycookie) -        if r: -            print >> sys.stderr, "Sticky cookies error:", r -            sys.exit(1) - -        r = self.set_stickyauth(options.stickyauth) -        if r: -            print >> sys.stderr, "Sticky auth error:", r -            sys.exit(1) - -        self.refresh_server_playback = options.refresh_server_playback -        self.anticache = options.anticache -        self.anticomp = options.anticomp -        self.killextra = options.kill -        self.rheaders = options.rheaders - -        self.eventlog = options.eventlog -        self.eventlist = urwid.SimpleListWalker([]) - -        if options.client_replay: -            self.client_playback_path(options.client_replay) - -        if options.server_replay: -            self.server_playback_path(options.server_replay) - -        self.debug = options.debug - -        if options.script: -            err = self.load_script(options.script) -            if err: -                print >> sys.stderr, "Script load error:", err -                sys.exit(1) - - -    def run_script_once(self, path, f): -        ret = self.get_script(path) -        if ret[0]: -            self.statusbar.message(ret[0]) -        s = ret[1] - -        if f.request: -            s.run("request", f) -        if f.response: -            s.run("response", f) -        if f.error: -            s.run("error", f) -        s.run("done") -        self.refresh_connection(f) -        self.state.last_script = path - -    def set_script(self, path): -        if not path: -            return -        ret = self.load_script(path) -        if ret: -            self.statusbar.message(ret) -        self.state.last_script = path - -    def toggle_eventlog(self): -        self.eventlog = not self.eventlog -        self.view_connlist() - -    def _trailer(self, clen, txt): -        rem = clen - VIEW_CUTOFF -        if rem > 0: -            txt.append(urwid.Text("")) -            txt.append( -                urwid.Text( -                    [ -                        ("highlight", "... %s of data not shown"%utils.pretty_size(rem)) -                    ] -                ) -            ) - -    def _view_conn_raw(self, content): -        txt = [] -        for i in utils.cleanBin(content[:VIEW_CUTOFF]).splitlines(): -            txt.append( -                urwid.Text(("text", i)) -            ) -        self._trailer(len(content), txt) -        return txt - -    def _view_conn_binary(self, content): -        txt = [] -        for offset, hexa, s in utils.hexdump(content[:VIEW_CUTOFF]): -            txt.append(urwid.Text([ -                ("offset", offset), -                " ", -                ("text", hexa), -                "   ", -                ("text", s), -            ])) -        self._trailer(len(content), txt) -        return txt - -    def _view_conn_xmlish(self, content): -        txt = [] -        for i in utils.pretty_xmlish(content[:VIEW_CUTOFF]): -            txt.append( -                urwid.Text(("text", i)), -            ) -        self._trailer(len(content), txt) -        return txt - -    def _view_conn_json(self, lines): -        txt = [] -        sofar = 0 -        for i in lines: -            sofar += len(i) -            txt.append( -                urwid.Text(("text", i)), -            ) -            if sofar > VIEW_CUTOFF: -                break -        self._trailer(sum(len(i) for i in lines), txt) -        return txt - -    def _view_conn_formdata(self, content, boundary): -        rx = re.compile(r'\bname="([^"]+)"') -        keys = [] -        vals = [] - -        for i in content.split("--" + boundary): -            parts = i.splitlines() -            if len(parts) > 1 and parts[0][0:2] != "--": -                match = rx.search(parts[1]) -                if match: -                    keys.append(match.group(1) + ":") -                    vals.append(utils.cleanBin( -                        "\n".join(parts[3+parts[2:].index(""):]) -                    )) -        kv = format_keyvals( -            zip(keys, vals), -            key = "header", -            val = "text" -        ) -        return [ -            urwid.Text(("highlight", "Form data:\n")), -            urwid.Text(kv) -        ] - -    def _view_conn_urlencoded(self, lines): -        kv = format_keyvals( -                [(k+":", v) for (k, v) in lines], -                key = "header", -                val = "text" -             ) -        return [ -                    urwid.Text(("highlight", "URLencoded data:\n")), -                    urwid.Text(kv) -                ] - -    def _find_pretty_view(self, content, hdrItems): -        ctype = None -        for i in hdrItems: -            if i[0].lower() == "content-type": -                ctype = i[1] -                break -        if ctype and "x-www-form-urlencoded" in ctype: -            data = utils.urldecode(content) -            if data: -                return self._view_conn_urlencoded(data) -        if utils.isXML(content): -            return self._view_conn_xmlish(content) -        elif ctype and "application/json" in ctype: -            lines = utils.pretty_json(content) -            if lines: -                return self._view_conn_json(lines) -        elif ctype and "multipart/form-data" in ctype: -            boundary = ctype.split('boundary=') -            if len(boundary) > 1: -                return self._view_conn_formdata(content, boundary[1].split(';')[0]) -        return self._view_conn_raw(content) - -    @utils.LRUCache(20) -    def _cached_conn_text(self, e, content, hdrItems, viewmode): -        hdr = [] -        hdr.extend( -            format_keyvals( -                [(h+":", v) for (h, v) in hdrItems], -                key = "header", -                val = "text" -            ) -        ) -        hdr.append("\n") - -        txt = [urwid.Text(hdr)] -        if content: -            if viewmode == VIEW_BODY_HEX: -                txt.extend(self._view_conn_binary(content)) -            elif viewmode == VIEW_BODY_PRETTY: -                if e: -                    decoded = encoding.decode(e, content) -                    if decoded: -                        content = decoded -                        if e and e != "identity": -                            txt.append( -                                urwid.Text(("highlight", "Decoded %s data:\n"%e)) -                            ) -                txt.extend(self._find_pretty_view(content, hdrItems)) -            else: -                txt.extend(self._view_conn_raw(content)) -        return urwid.ListBox(txt) - -    def _readflow(self, path): -        path = os.path.expanduser(path) -        try: -            f = file(path, "r") -            flows = list(flow.FlowReader(f).stream()) -        except (IOError, flow.FlowReadError), v: -            return True, v.strerror -        return False, flows - -    def client_playback_path(self, path): -        err, ret = self._readflow(path) -        if err: -            self.statusbar.message(ret) -        else: -            self.start_client_playback(ret, False) - -    def server_playback_path(self, path): -        err, ret = self._readflow(path) -        if err: -            self.statusbar.message(ret) -        else: -            self.start_server_playback( -                ret, -                self.killextra, self.rheaders, -                False -            ) - -    def spawn_external_viewer(self, data, contenttype): -        if contenttype: -            ext = mimetypes.guess_extension(contenttype) or "" -        else: -            ext = "" -        fd, name = tempfile.mkstemp(ext, "mproxy") -        os.write(fd, data) -        os.close(fd) - -        cmd = None -        shell = False - -        if contenttype: -            c = mailcap.getcaps() -            cmd, _ = mailcap.findmatch(c, contenttype, filename=name) -            if cmd: -                shell = True -        if not cmd: -            c = os.environ.get("PAGER") or os.environ.get("EDITOR") -            cmd = [c, name] -        self.ui.stop() -        subprocess.call(cmd, shell=shell) -        self.ui.start() -        os.unlink(name) - -    def set_palette(self): -        BARBG = "dark blue" -        self.palette = [ -            ('body', 'black', 'dark cyan', 'standout'), -            ('foot', 'light gray', 'default'), -            ('title', 'white,bold', 'default',), -            ('editline', 'white', 'default',), - -            # Status bar -            ('statusbar', 'light gray', BARBG), -            ('statusbar_key', 'light cyan', BARBG), -            ('statusbar_text', 'light gray', BARBG), -            ('statusbar_highlight', 'white', BARBG), - -            # Help -            ('key', 'light cyan', 'default', 'underline'), -            ('head', 'white,bold', 'default'), -            ('text', 'light gray', 'default'), - -            # List and Connections -            ('method', 'dark cyan', 'default'), -            ('focus', 'yellow', 'default'), -            ('goodcode', 'light green', 'default'), -            ('error', 'light red', 'default'), -            ('header', 'dark cyan', 'default'), -            ('heading', 'white,bold', 'dark blue'), -            ('inactive_heading', 'white', 'dark gray'), -            ('highlight', 'white,bold', 'default'), -            ('inactive', 'dark gray', 'default'), -            ('ack', 'light red', 'default'), - -            # Hex view -            ('offset', 'dark cyan', 'default'), - -            # KV Editor -            ('editfield', 'black', 'light cyan'), -        ] - -    def run(self): -        self.viewstate = VIEW_CONNLIST -        self.currentflow = None - -        self.ui = urwid.raw_display.Screen() -        self.ui.register_palette(self.palette) -        self.conn_list_view = ConnectionListView(self, self.state) - -        self.view = None -        self.statusbar = None -        self.header = None -        self.body = None - -        self.prompting = False -        self.onekey = False -        self.view_connlist() - -        if self.server: -            slave = controller.Slave(self.masterq, self.server) -            slave.start() - -        if self.options.rfile: -            ret = self.load_flows(self.options.rfile) -            if ret: -                self.shutdown() -                print >> sys.stderr, "Could not load file:", ret -                sys.exit(1) - -        self.ui.run_wrapper(self.loop) -        # If True, quit just pops out to connection list view. -        print >> sys.stderr, "Shutting down..." -        sys.stderr.flush() -        self.shutdown() - -    def make_view(self): -        self.view = urwid.Frame( -                        self.body, -                        header = self.header, -                        footer = self.statusbar -                    ) -        self.view.set_focus("body") - - -    def view_help(self): -        self.statusbar = StatusBar(self, self.footer_text_help) -        self.body = self.helptext() -        self.header = None -        self.viewstate = VIEW_HELP -        self.make_view() - -    def view_kveditor(self, title, value, callback): -        self.statusbar = StatusBar(self, self.footer_text_help) -        self.body = KVEditor(self, title, value, callback) -        self.header = None -        self.viewstate = VIEW_KVEDITOR -        self.make_view() - -    def focus_current(self): -        if self.currentflow: -            try: -                ids = [id(i) for i in self.state.view] -                idx = ids.index(id(self.currentflow)) -                self.conn_list_view.set_focus(idx) -            except (IndexError, ValueError): -                pass - -    def view_connlist(self): -        if self.ui.started: -            self.ui.clear() -        self.focus_current() -        if self.eventlog: -            self.body = BodyPile(self) -        else: -            self.body = ConnectionListBox(self) -        self.statusbar = StatusBar(self, self.footer_text_default) -        self.header = None -        self.viewstate = VIEW_CONNLIST -        self.currentflow = None -        self.make_view() - -    def view_flow(self, flow): -        self.statusbar = StatusBar(self, self.footer_text_connview) -        self.body = ConnectionView(self, self.state, flow) -        self.header = ConnectionViewHeader(self, flow) -        self.viewstate = VIEW_FLOW -        self.currentflow = flow -        self.make_view() - -    def _view_nextprev_flow(self, np, flow): -        try: -            idx = self.state.view.index(flow) -        except IndexError: -            return -        if np == "next": -            new_flow, new_idx = self.state.get_next(idx) -        else: -            new_flow, new_idx = self.state.get_prev(idx) -        if new_idx is None: -            return -        self.view_flow(new_flow) - -    def view_next_flow(self, flow): -        return self._view_nextprev_flow("next", flow) - -    def view_prev_flow(self, flow): -        return self._view_nextprev_flow("prev", flow) - -    def _write_flows(self, path, flows): -        self.state.last_saveload = path -        if not path: -            return -        path = os.path.expanduser(path) -        try: -            f = file(path, "wb") -            fw = flow.FlowWriter(f) -            for i in flows: -                fw.add(i) -            f.close() -        except IOError, v: -            self.statusbar.message(v.strerror) - -    def save_one_flow(self, path, flow): -        return self._write_flows(path, [flow]) - -    def save_flows(self, path): -        return self._write_flows(path, self.state.view) - -    def load_flows_callback(self, path): -        if not path: -            return -        ret = self.load_flows(path) -        return ret or "Flows loaded from %s"%path - -    def load_flows(self, path): -        self.state.last_saveload = path -        path = os.path.expanduser(path) -        try: -            f = file(path, "r") -            fr = flow.FlowReader(f) -        except IOError, v: -            return v.strerror -        flow.FlowMaster.load_flows(self, fr) -        f.close() -        if self.conn_list_view: -            self.sync_list_view() -            self.focus_current() - -    def helptext(self): -        text = [] -        text.append(("head", "Global keys:\n")) -        keys = [ -            ("A", "accept all intercepted connections"), -            ("a", "accept this intercepted connection"), -            ("c", "client replay"), -            ("i", "set interception pattern"), -            ("j, k", "up, down"), -            ("l", "set limit filter pattern"), -            ("L", "load saved flows"), - -            ("m", "change body display mode"), -            (None, -                highlight_key("raw", "r") + -                [("text", ": raw data")] -            ), -            (None, -                highlight_key("pretty", "p") + -                [("text", ": pretty-print XML, HTML and JSON")] -            ), -            (None, -                highlight_key("hex", "h") + -                [("text", ": hex dump")] -            ), - -            ("o", "toggle options:"), -            (None, -                highlight_key("anticache", "a") + -                [("text", ": prevent cached responses")] -            ), -            (None, -                highlight_key("anticomp", "c") + -                [("text", ": prevent compressed responses")] -            ), -            (None, -                highlight_key("killextra", "k") + -                [("text", ": kill requests not part of server replay")] -            ), -            (None, -                highlight_key("norefresh", "n") + -                [("text", ": disable server replay response refresh")] -            ), - -            ("q", "quit / return to connection list"), -            ("Q", "quit without confirm prompt"), -            ("r", "replay request"), -            ("R", "revert changes to request"), -            ("s", "set/unset script"), -            ("S", "server replay"), -            ("t", "set sticky cookie expression"), -            ("u", "set sticky auth expression"), -            ("w", "save all flows matching current limit"), -            ("W", "save this flow"), -            ("|", "run script on this flow"), -            ("space", "page down"), -            ("pg up/down", "page up/down"), -        ] -        text.extend(format_keyvals(keys, key="key", val="text", indent=4)) - -        text.append(("head", "\n\nConnection list keys:\n")) -        keys = [ -            ("C", "clear connection list or eventlog"), -            ("d", "delete connection from view"), -            ("v", "toggle eventlog"), -            ("X", "kill and delete connection, even if it's mid-intercept"), -            ("tab", "tab between eventlog and connection list"), -            ("enter", "view connection"), -        ] -        text.extend(format_keyvals(keys, key="key", val="text", indent=4)) - -        text.append(("head", "\n\nConnection view keys:\n")) -        keys = [ -            ("b", "save request/response body"), -            ("e", "edit request/response"), -            ("p", "previous flow"), -            ("v", "view body in external viewer"), -            ("z", "encode/decode a request/response"), -            ("tab", "toggle request/response view"), -            ("space", "next flow"), -        ] -        text.extend(format_keyvals(keys, key="key", val="text", indent=4)) - -        text.append(("head", "\n\nFilter expressions:\n")) -        f = [] -        for i in filt.filt_unary: -            f.append( -                ("~%s"%i.code, i.help) -            ) -        for i in filt.filt_rex: -            f.append( -                ("~%s regex"%i.code, i.help) -            ) -        for i in filt.filt_int: -            f.append( -                ("~%s int"%i.code, i.help) -            ) -        f.sort() -        f.extend( -            [ -                ("!", "unary not"), -                ("&", "and"), -                ("|", "or"), -                ("(...)", "grouping"), -            ] -        ) -        text.extend(format_keyvals(f, key="key", val="text", indent=4)) - -        text.extend( -           [ -                "\n", -                ("text", "    Regexes are Python-style.\n"), -                ("text", "    Regexes can be specified as quoted strings.\n"), -                ("text", "    Header matching (~h, ~hq, ~hs) is against a string of the form \"name: value\".\n"), -                ("text", "    Expressions with no operators are regex matches against URL.\n"), -                ("text", "    Default binary operator is &.\n"), -                ("head", "\n    Examples:\n"), -           ] -        ) -        examples = [ -                ("google\.com", "Url containing \"google.com"), -                ("~q ~b test", "Requests where body contains \"test\""), -                ("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."), -        ] -        text.extend(format_keyvals(examples, key="key", val="text", indent=4)) -        return urwid.ListBox([urwid.Text(text)]) - -    def path_prompt(self, prompt, text, callback, *args): -        self.statusbar.path_prompt(prompt, text) -        self.view.set_focus("footer") -        self.prompting = (callback, args) - -    def prompt(self, prompt, text, callback, *args): -        self.statusbar.prompt(prompt, text) -        self.view.set_focus("footer") -        self.prompting = (callback, args) - -    def prompt_edit(self, prompt, text, callback): -        self.statusbar.prompt(prompt + ": ", text) -        self.view.set_focus("footer") -        self.prompting = (callback, []) - -    def prompt_onekey(self, prompt, keys, callback, *args): -        """ -            Keys are a set of (word, key) tuples. The appropriate key in the -            word is highlighted. -        """ -        prompt = [prompt, " ("] -        mkup = [] -        for i, e in enumerate(keys): -            mkup.extend(highlight_key(e[0], e[1])) -            if i < len(keys)-1: -                mkup.append(",") -        prompt.extend(mkup) -        prompt.append(")? ") -        self.onekey = "".join(i[1] for i in keys) -        self.prompt(prompt, "", callback, *args) - -    def prompt_done(self): -        self.prompting = False -        self.onekey = False -        self.view.set_focus("body") -        self.statusbar.message("") - -    def prompt_execute(self, txt=None): -        if not txt: -            txt = self.statusbar.get_edit_text() -        p, args = self.prompting -        self.prompt_done() -        msg = p(txt, *args) -        if msg: -            self.statusbar.message(msg, 1000) - -    def prompt_cancel(self): -        self.prompt_done() - -    def accept_all(self): -        self.state.accept_all() - -    def set_limit(self, txt): -        return self.state.set_limit(txt) - -    def set_intercept(self, txt): -        return self.state.set_intercept(txt) - -    def changeview(self, v): -        if v == "r": -            self.state.view_body_mode = VIEW_BODY_RAW -        elif v == "h": -            self.state.view_body_mode = VIEW_BODY_HEX -        elif v == "p": -            self.state.view_body_mode = VIEW_BODY_PRETTY -        self.refresh_connection(self.currentflow) - -    def drawscreen(self): -        size = self.ui.get_cols_rows() -        canvas = self.view.render(size, focus=1) -        self.ui.draw_screen(size, canvas) -        return size - -    def loop(self): -        changed = True -        try: -            while not controller.should_exit: -                startloop = time.time() -                if changed: -                    self.statusbar.redraw() -                    size = self.drawscreen() -                changed = self.tick(self.masterq) -                self.ui.set_input_timeouts(max_wait=0.1) -                keys = self.ui.get_input() -                if keys: -                    changed = True -                for k in keys: -                    if self.prompting: -                        if k == "esc": -                            self.prompt_cancel() -                        elif self.onekey: -                            if k == "enter": -                                self.prompt_cancel() -                            elif k in self.onekey: -                                self.prompt_execute(k) -                        elif k == "enter": -                            self.prompt_execute() -                        else: -                            self.view.keypress(size, k) -                    else: -                        k = self.view.keypress(size, k) -                        if k: -                            self.statusbar.message("") -                            if k == "?": -                                self.view_help() -                            elif k == "c": -                                if not self.client_playback: -                                    self.path_prompt( -                                        "Client replay: ", -                                        self.state.last_saveload, -                                        self.client_playback_path -                                    ) -                                else: -                                    self.prompt_onekey( -                                        "Stop current client replay?", -                                        ( -                                            ("yes", "y"), -                                            ("no", "n"), -                                        ), -                                        self.stop_client_playback_prompt, -                                    ) -                            elif k == "l": -                                self.prompt("Limit: ", self.state.limit_txt, self.set_limit) -                                self.sync_list_view() -                            elif k == "i": -                                self.prompt( -                                    "Intercept filter: ", -                                    self.state.intercept_txt, -                                    self.set_intercept -                                ) -                                self.sync_list_view() -                            elif k == "m": -                                self.prompt_onekey( -                                    "View", -                                    ( -                                        ("raw", "r"), -                                        ("pretty", "p"), -                                        ("hex", "h"), -                                    ), -                                    self.changeview -                                ) -                            elif k in ("q", "Q"): -                                if k == "Q": -                                    raise Stop -                                if self.viewstate == VIEW_FLOW: -                                    self.view_connlist() -                                elif self.viewstate in (VIEW_HELP, VIEW_KVEDITOR): -                                    if self.currentflow: -                                        self.view_flow(self.currentflow) -                                    else: -                                        self.view_connlist() -                                else: -                                    self.prompt_onekey( -                                        "Quit", -                                        ( -                                            ("yes", "y"), -                                            ("no", "n"), -                                        ), -                                        self.quit, -                                    ) -                            elif k == "w": -                                self.path_prompt( -                                    "Save flows: ", -                                    self.state.last_saveload, -                                    self.save_flows -                                ) -                            elif k == "s": -                                if self.script: -                                    self.load_script(None) -                                else: -                                    self.path_prompt( -                                        "Set script: ", -                                        self.state.last_script, -                                        self.set_script -                                    ) -                            elif k == "S": -                                if not self.server_playback: -                                    self.path_prompt( -                                        "Server replay: ", -                                        self.state.last_saveload, -                                        self.server_playback_path -                                    ) -                                else: -                                    self.prompt_onekey( -                                        "Stop current server replay?", -                                        ( -                                            ("yes", "y"), -                                            ("no", "n"), -                                        ), -                                        self.stop_server_playback_prompt, -                                    ) -                            elif k == "L": -                                self.path_prompt( -                                    "Load flows: ", -                                    self.state.last_saveload, -                                    self.load_flows_callback -                                ) -                            elif k == "o": -                                self.prompt_onekey( -                                        "Options", -                                        ( -                                            ("anticache", "a"), -                                            ("anticomp", "c"), -                                            ("killextra", "k"), -                                            ("norefresh", "n"), -                                        ), -                                        self._change_options -                                ) -                            elif k == "t": -                                self.prompt( -                                    "Sticky cookie filter: ", -                                    self.stickycookie_txt, -                                    self.set_stickycookie -                                ) -                            elif k == "u": -                                self.prompt( -                                    "Sticky auth filter: ", -                                    self.stickyauth_txt, -                                    self.set_stickyauth -                                ) -                self.looptime = time.time() - startloop -        except (Stop, KeyboardInterrupt): -            pass - -    def stop_client_playback_prompt(self, a): -        if a != "n": -            self.stop_client_playback() - -    def stop_server_playback_prompt(self, a): -        if a != "n": -            self.stop_server_playback() - -    def quit(self, a): -        if a != "n": -            raise Stop - -    def _change_options(self, a): -        if a == "a": -            self.anticache = not self.anticache -        if a == "c": -            self.anticomp = not self.anticomp -        elif a == "k": -            self.killextra = not self.killextra -        elif a == "n": -            self.refresh_server_playback = not self.refresh_server_playback - -    def shutdown(self): -        self.state.killall(self) -        controller.Master.shutdown(self) - -    def sync_list_view(self): -        self.conn_list_view._modified() - -    def clear_connections(self): -        self.state.clear() -        self.sync_list_view() - -    def delete_connection(self, f): -        self.state.delete_flow(f) -        self.sync_list_view() - -    def refresh_connection(self, c): -        if hasattr(self.header, "refresh_connection"): -            self.header.refresh_connection(c) -        if hasattr(self.body, "refresh_connection"): -            self.body.refresh_connection(c) -        if hasattr(self.statusbar, "refresh_connection"): -            self.statusbar.refresh_connection(c) - -    def process_flow(self, f, r): -        if self.state.intercept and f.match(self.state.intercept) and not f.request.is_replay(): -            f.intercept() -        else: -            r._ack() -        self.sync_list_view() -        self.refresh_connection(f) - -    def clear_events(self): -        self.eventlist[:] = [] - -    def add_event(self, e, level="info"): -        if level == "info": -            e = urwid.Text(e) -        elif level == "error": -            e = urwid.Text(("error", e)) - -        self.eventlist.append(e) -        if len(self.eventlist) > EVENTLOG_SIZE: -            self.eventlist.pop(0) -        self.eventlist.set_focus(len(self.eventlist)) - -    # Handlers -    def handle_error(self, r): -        f = flow.FlowMaster.handle_error(self, r) -        if f: -            self.process_flow(f, r) -        return f - -    def handle_request(self, r): -        f = flow.FlowMaster.handle_request(self, r) -        if f: -            self.process_flow(f, r) -        return f - -    def handle_response(self, r): -        f = flow.FlowMaster.handle_response(self, r) -        if f: -            self.process_flow(f, r) -        return f - diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py new file mode 100644 index 00000000..608742a4 --- /dev/null +++ b/libmproxy/console/__init__.py @@ -0,0 +1,931 @@ +# Copyright (C) 2010  Aldo Cortesi +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program.  If not, see <http://www.gnu.org/licenses/>. + +import mailcap, mimetypes, tempfile, os, subprocess, glob, time, re +import os.path, sys +import cStringIO +import urwid +from .. import controller, utils, filt, flow +import connlist, connview, help, common, kveditor + +EVENTLOG_SIZE = 500 + + +class Stop(Exception): pass + + +#begin nocover + + +class _PathCompleter: +    def __init__(self, _testing=False): +        """ +            _testing: disables reloading of the lookup table to make testing possible. +        """ +        self.lookup, self.offset = None, None +        self.final = None +        self._testing = _testing + +    def reset(self): +        self.lookup = None +        self.offset = -1 + +    def complete(self, txt): +        """ +            Returns the next completion for txt, or None if there is no completion. +        """ +        path = os.path.expanduser(txt) +        if not self.lookup: +            if not self._testing: +                # Lookup is a set of (display value, actual value) tuples. +                self.lookup = [] +                if os.path.isdir(path): +                    files = glob.glob(os.path.join(path, "*")) +                    prefix = txt +                else: +                    files = glob.glob(path+"*") +                    prefix = os.path.dirname(txt) +                prefix = prefix or "./" +                for f in files: +                    display = os.path.join(prefix, os.path.basename(f)) +                    if os.path.isdir(f): +                        display += "/" +                    self.lookup.append((display, f)) +            if not self.lookup: +                self.final = path +                return path +            self.lookup.sort() +            self.offset = -1 +            self.lookup.append((txt, txt)) +        self.offset += 1 +        if self.offset >= len(self.lookup): +            self.offset = 0 +        ret = self.lookup[self.offset] +        self.final = ret[1] +        return ret[0] + + +class PathEdit(urwid.Edit, _PathCompleter): +    def __init__(self, *args, **kwargs): +        urwid.Edit.__init__(self, *args, **kwargs) +        _PathCompleter.__init__(self) + +    def keypress(self, size, key): +        if key == "tab": +            comp = self.complete(self.get_edit_text()) +            self.set_edit_text(comp) +            self.set_edit_pos(len(comp)) +        else: +            self.reset() +        return urwid.Edit.keypress(self, size, key) + + +class ActionBar(common.WWrap): +    def __init__(self): +        self.message("") + +    def selectable(self): +        return True + +    def path_prompt(self, prompt, text): +        self.w = PathEdit(prompt, text) + +    def prompt(self, prompt, text = ""): +        self.w = urwid.Edit(prompt, text or "") + +    def message(self, message): +        self.w = urwid.Text(message) + + +class StatusBar(common.WWrap): +    def __init__(self, master, helptext): +        self.master, self.helptext = master, helptext +        self.expire = None +        self.ab = ActionBar() +        self.ib = common.WWrap(urwid.Text("")) +        self.w = urwid.Pile([self.ib, self.ab]) + +    def get_status(self): +        r = [] + +        if self.master.client_playback: +            r.append("[") +            r.append(("statusbar_highlight", "cplayback")) +            r.append(":%s to go]"%self.master.client_playback.count()) +        if self.master.server_playback: +            r.append("[") +            r.append(("statusbar_highlight", "splayback")) +            r.append(":%s to go]"%self.master.server_playback.count()) +        if self.master.state.intercept_txt: +            r.append("[") +            r.append(("statusbar_highlight", "i")) +            r.append(":%s]"%self.master.state.intercept_txt) +        if self.master.state.limit_txt: +            r.append("[") +            r.append(("statusbar_highlight", "l")) +            r.append(":%s]"%self.master.state.limit_txt) +        if self.master.stickycookie_txt: +            r.append("[") +            r.append(("statusbar_highlight", "t")) +            r.append(":%s]"%self.master.stickycookie_txt) +        if self.master.stickyauth_txt: +            r.append("[") +            r.append(("statusbar_highlight", "u")) +            r.append(":%s]"%self.master.stickyauth_txt) + +        opts = [] +        if self.master.anticache: +            opts.append("anticache") +        if self.master.anticomp: +            opts.append("anticomp") +        if not self.master.refresh_server_playback: +            opts.append("norefresh") +        if self.master.killextra: +            opts.append("killextra") + +        if opts: +            r.append("[%s]"%(":".join(opts))) + +        if self.master.script: +            r.append("[script:%s]"%self.master.script.path) + +        if self.master.debug: +            r.append("[lt:%0.3f]"%self.master.looptime) + +        return r + +    def redraw(self): +        if self.expire and time.time() > self.expire: +            self.message("") + +        t = [ +                ('statusbar_text', ("[%s]"%self.master.state.flow_count()).ljust(7)), +            ] +        t.extend(self.get_status()) + +        if self.master.server: +            boundaddr = "[%s:%s]"%(self.master.server.address or "*", self.master.server.port) +        else: +            boundaddr = "" + +        status = urwid.AttrWrap(urwid.Columns([ +            urwid.Text(t), +            urwid.Text( +                [ +                    self.helptext, +                    " ", +                    ('statusbar_text', "["), +                    ('statusbar_key', "m"), +                    ('statusbar_text', (":%s]"%common.BODY_VIEWS[self.master.state.view_body_mode])), +                    ('statusbar_text', boundaddr), +                ], +                align="right" +            ), +        ]), "statusbar") +        self.ib.set_w(status) + +    def update(self, text): +        self.helptext = text +        self.redraw() +        self.master.drawscreen() + +    def selectable(self): +        return True + +    def get_edit_text(self): +        return self.ab.w.get_edit_text() + +    def path_prompt(self, prompt, text): +        return self.ab.path_prompt(prompt, text) + +    def prompt(self, prompt, text = ""): +        self.ab.prompt(prompt, text) + +    def message(self, msg, expire=None): +        if expire: +            self.expire = time.time() + float(expire)/1000 +        else: +            self.expire = None +        self.ab.message(msg) + + +#end nocover + +class ConsoleState(flow.State): +    def __init__(self): +        flow.State.__init__(self) +        self.focus = None +        self.view_body_mode = common.VIEW_BODY_PRETTY +        self.view_flow_mode = common.VIEW_FLOW_REQUEST +        self.last_script = "" +        self.last_saveload = "" + +    def add_request(self, req): +        f = flow.State.add_request(self, req) +        if self.focus is None: +            self.set_focus(0) +        return f + +    def add_response(self, resp): +        f = flow.State.add_response(self, resp) +        if self.focus is None: +            self.set_focus(0) +        return f + +    def set_limit(self, limit): +        ret = flow.State.set_limit(self, limit) +        self.set_focus(self.focus) +        return ret + +    def get_focus(self): +        if not self.view or self.focus is None: +            return None, None +        return self.view[self.focus], self.focus + +    def set_focus(self, idx): +        if self.view: +            if idx >= len(self.view): +                idx = len(self.view) - 1 +            elif idx < 0: +                idx = 0 +            self.focus = idx + +    def get_from_pos(self, pos): +        if len(self.view) <= pos or pos < 0: +            return None, None +        return self.view[pos], pos + +    def get_next(self, pos): +        return self.get_from_pos(pos+1) + +    def get_prev(self, pos): +        return self.get_from_pos(pos-1) + +    def delete_flow(self, f): +        ret = flow.State.delete_flow(self, f) +        self.set_focus(self.focus) +        return ret + + + +class Options(object): +    __slots__ = [ +        "anticache", +        "anticomp", +        "client_replay", +        "debug", +        "eventlog", +        "keepserving", +        "kill", +        "intercept", +        "no_server", +        "refresh_server_playback", +        "rfile", +        "script", +        "rheaders", +        "server_replay", +        "stickycookie", +        "stickyauth", +        "verbosity", +        "wfile", +    ] +    def __init__(self, **kwargs): +        for k, v in kwargs.items(): +            setattr(self, k, v) +        for i in self.__slots__: +            if not hasattr(self, i): +                setattr(self, i, None) + + +#begin nocover + + +class ConsoleMaster(flow.FlowMaster): +    palette = [] +    footer_text_default = [ +        ('statusbar_key', "?"), ":help ", +    ] +    footer_text_help = [ +        ('statusbar_key', "q"), ":back", +    ] +    footer_text_connview = [ +        ('statusbar_key', "tab"), ":toggle view ", +        ('statusbar_key', "?"), ":help ", +        ('statusbar_key', "q"), ":back ", +    ] +    def __init__(self, server, options): +        flow.FlowMaster.__init__(self, server, ConsoleState()) +        self.looptime = 0 +        self.options = options + +        self.conn_list_view = None +        self.set_palette() + +        r = self.set_intercept(options.intercept) +        if r: +            print >> sys.stderr, "Intercept error:", r +            sys.exit(1) + +        r = self.set_stickycookie(options.stickycookie) +        if r: +            print >> sys.stderr, "Sticky cookies error:", r +            sys.exit(1) + +        r = self.set_stickyauth(options.stickyauth) +        if r: +            print >> sys.stderr, "Sticky auth error:", r +            sys.exit(1) + +        self.refresh_server_playback = options.refresh_server_playback +        self.anticache = options.anticache +        self.anticomp = options.anticomp +        self.killextra = options.kill +        self.rheaders = options.rheaders + +        self.eventlog = options.eventlog +        self.eventlist = urwid.SimpleListWalker([]) + +        if options.client_replay: +            self.client_playback_path(options.client_replay) + +        if options.server_replay: +            self.server_playback_path(options.server_replay) + +        self.debug = options.debug + +        if options.script: +            err = self.load_script(options.script) +            if err: +                print >> sys.stderr, "Script load error:", err +                sys.exit(1) + +    def run_script_once(self, path, f): +        if not path: +            return +        ret = self.get_script(path) +        if ret[0]: +            self.statusbar.message(ret[0]) +            return +        s = ret[1] +        if f.request: +            s.run("request", f) +        if f.response: +            s.run("response", f) +        if f.error: +            s.run("error", f) +        s.run("done") +        self.refresh_connection(f) +        self.state.last_script = path + +    def set_script(self, path): +        if not path: +            return +        ret = self.load_script(path) +        if ret: +            self.statusbar.message(ret) +        self.state.last_script = path + +    def toggle_eventlog(self): +        self.eventlog = not self.eventlog +        self.view_connlist() + +    def _readflow(self, path): +        path = os.path.expanduser(path) +        try: +            f = file(path, "r") +            flows = list(flow.FlowReader(f).stream()) +        except (IOError, flow.FlowReadError), v: +            return True, v.strerror +        return False, flows + +    def client_playback_path(self, path): +        err, ret = self._readflow(path) +        if err: +            self.statusbar.message(ret) +        else: +            self.start_client_playback(ret, False) + +    def server_playback_path(self, path): +        err, ret = self._readflow(path) +        if err: +            self.statusbar.message(ret) +        else: +            self.start_server_playback( +                ret, +                self.killextra, self.rheaders, +                False +            ) + +    def spawn_editor(self, data): +        fd, name = tempfile.mkstemp('', "mproxy") +        os.write(fd, data) +        os.close(fd) +        c = os.environ.get("EDITOR") +        #If no EDITOR is set, assume 'vi' +        if not c: +            c = "vi" +        cmd = [c, name] +        self.ui.stop() +        try: +            subprocess.call(cmd) +        except: +            self.statusbar.message("Can't start editor: %s" % c) +            self.ui.start() +            os.unlink(name) +            return data +        self.ui.start() +        data = open(name).read() +        os.unlink(name) +        return data + +    def spawn_external_viewer(self, data, contenttype): +        if contenttype: +            ext = mimetypes.guess_extension(contenttype) or "" +        else: +            ext = "" +        fd, name = tempfile.mkstemp(ext, "mproxy") +        os.write(fd, data) +        os.close(fd) + +        cmd = None +        shell = False + +        if contenttype: +            c = mailcap.getcaps() +            cmd, _ = mailcap.findmatch(c, contenttype, filename=name) +            if cmd: +                shell = True +        if not cmd: +            c = os.environ.get("PAGER") or os.environ.get("EDITOR") +            cmd = [c, name] +        self.ui.stop() +        subprocess.call(cmd, shell=shell) +        self.ui.start() +        os.unlink(name) + +    def set_palette(self): +        BARBG = "dark blue" +        self.palette = [ +            ('body', 'black', 'dark cyan', 'standout'), +            ('foot', 'light gray', 'default'), +            ('title', 'white,bold', 'default',), +            ('editline', 'white', 'default',), + +            # Status bar +            ('statusbar', 'light gray', BARBG), +            ('statusbar_key', 'light cyan', BARBG), +            ('statusbar_text', 'light gray', BARBG), +            ('statusbar_highlight', 'white', BARBG), + +            # Help +            ('key', 'light cyan', 'default', 'underline'), +            ('head', 'white,bold', 'default'), +            ('text', 'light gray', 'default'), + +            # List and Connections +            ('method', 'dark cyan', 'default'), +            ('focus', 'yellow', 'default'), +            ('goodcode', 'light green', 'default'), +            ('error', 'light red', 'default'), +            ('header', 'dark cyan', 'default'), +            ('heading', 'white,bold', 'dark blue'), +            ('inactive_heading', 'white', 'dark gray'), +            ('highlight', 'white,bold', 'default'), +            ('inactive', 'dark gray', 'default'), +            ('ack', 'light red', 'default'), + +            # Hex view +            ('offset', 'dark cyan', 'default'), + +            # KV Editor +            ('focusfield', 'black', 'light gray'), +            ('editfield', 'black', 'light cyan'), +        ] + +    def run(self): +        self.currentflow = None + +        self.ui = urwid.raw_display.Screen() +        self.ui.register_palette(self.palette) +        self.conn_list_view = connlist.ConnectionListView(self, self.state) + +        self.view = None +        self.statusbar = None +        self.header = None +        self.body = None +        self.help_context = None + +        self.prompting = False +        self.onekey = False + +        self.view_connlist() + +        if self.server: +            slave = controller.Slave(self.masterq, self.server) +            slave.start() + +        if self.options.rfile: +            ret = self.load_flows(self.options.rfile) +            if ret: +                self.shutdown() +                print >> sys.stderr, "Could not load file:", ret +                sys.exit(1) + +        self.ui.run_wrapper(self.loop) +        # If True, quit just pops out to connection list view. +        print >> sys.stderr, "Shutting down..." +        sys.stderr.flush() +        self.shutdown() + +    def focus_current(self): +        if self.currentflow: +            try: +                ids = [id(i) for i in self.state.view] +                idx = ids.index(id(self.currentflow)) +                self.conn_list_view.set_focus(idx) +            except (IndexError, ValueError): +                pass + +    def make_view(self): +        self.view = urwid.Frame( +                        self.body, +                        header = self.header, +                        footer = self.statusbar +                    ) +        self.view.set_focus("body") + +    def view_help(self): +        h = help.HelpView(self, self.help_context, (self.statusbar, self.body, self.header)) + +        self.statusbar = StatusBar(self, self.footer_text_help) +        self.body = h +        self.header = None +        self.make_view() + +    def view_kveditor(self, title, value, callback, *args, **kwargs): +        self.statusbar = StatusBar(self, "foo") +        self.body = kveditor.KVEditor(self, title, value, callback, *args, **kwargs) +        self.header = None + +        self.help_context = kveditor.help_context +        self.make_view() + +    def view_connlist(self): +        if self.ui.started: +            self.ui.clear() +        self.focus_current() +        if self.eventlog: +            self.body = connlist.BodyPile(self) +        else: +            self.body = connlist.ConnectionListBox(self) +        self.statusbar = StatusBar(self, self.footer_text_default) +        self.header = None +        self.currentflow = None + +        self.make_view() +        self.help_context = connlist.help_context + +    def view_flow(self, flow): +        self.statusbar = StatusBar(self, self.footer_text_connview) +        self.body = connview.ConnectionView(self, self.state, flow) +        self.header = connview.ConnectionViewHeader(self, flow) +        self.currentflow = flow + +        self.make_view() +        self.help_context = connview.help_context + +    def _write_flows(self, path, flows): +        self.state.last_saveload = path +        if not path: +            return +        path = os.path.expanduser(path) +        try: +            f = file(path, "wb") +            fw = flow.FlowWriter(f) +            for i in flows: +                fw.add(i) +            f.close() +        except IOError, v: +            self.statusbar.message(v.strerror) + +    def save_one_flow(self, path, flow): +        return self._write_flows(path, [flow]) + +    def save_flows(self, path): +        return self._write_flows(path, self.state.view) + +    def load_flows_callback(self, path): +        if not path: +            return +        ret = self.load_flows(path) +        return ret or "Flows loaded from %s"%path + +    def load_flows(self, path): +        self.state.last_saveload = path +        path = os.path.expanduser(path) +        try: +            f = file(path, "r") +            fr = flow.FlowReader(f) +        except IOError, v: +            return v.strerror +        flow.FlowMaster.load_flows(self, fr) +        f.close() +        if self.conn_list_view: +            self.sync_list_view() +            self.focus_current() + +    def path_prompt(self, prompt, text, callback, *args): +        self.statusbar.path_prompt(prompt, text) +        self.view.set_focus("footer") +        self.prompting = (callback, args) + +    def prompt(self, prompt, text, callback, *args): +        self.statusbar.prompt(prompt, text) +        self.view.set_focus("footer") +        self.prompting = (callback, args) + +    def prompt_edit(self, prompt, text, callback): +        self.statusbar.prompt(prompt + ": ", text) +        self.view.set_focus("footer") +        self.prompting = (callback, []) + +    def prompt_onekey(self, prompt, keys, callback, *args): +        """ +            Keys are a set of (word, key) tuples. The appropriate key in the +            word is highlighted. +        """ +        prompt = [prompt, " ("] +        mkup = [] +        for i, e in enumerate(keys): +            mkup.extend(common.highlight_key(e[0], e[1])) +            if i < len(keys)-1: +                mkup.append(",") +        prompt.extend(mkup) +        prompt.append(")? ") +        self.onekey = "".join(i[1] for i in keys) +        self.prompt(prompt, "", callback, *args) + +    def prompt_done(self): +        self.prompting = False +        self.onekey = False +        self.view.set_focus("body") +        self.statusbar.message("") + +    def prompt_execute(self, txt=None): +        if not txt: +            txt = self.statusbar.get_edit_text() +        p, args = self.prompting +        self.prompt_done() +        msg = p(txt, *args) +        if msg: +            self.statusbar.message(msg, 1000) + +    def prompt_cancel(self): +        self.prompt_done() + +    def accept_all(self): +        self.state.accept_all() + +    def set_limit(self, txt): +        return self.state.set_limit(txt) + +    def set_intercept(self, txt): +        return self.state.set_intercept(txt) + +    def changeview(self, v): +        if v == "r": +            self.state.view_body_mode = common.VIEW_BODY_RAW +        elif v == "h": +            self.state.view_body_mode = common.VIEW_BODY_HEX +        elif v == "p": +            self.state.view_body_mode = common.VIEW_BODY_PRETTY +        self.refresh_connection(self.currentflow) + +    def drawscreen(self): +        size = self.ui.get_cols_rows() +        canvas = self.view.render(size, focus=1) +        self.ui.draw_screen(size, canvas) +        return size + +    def pop_view(self): +        if self.currentflow: +            self.view_flow(self.currentflow) +        else: +            self.view_connlist() + +    def loop(self): +        changed = True +        try: +            while not controller.should_exit: +                startloop = time.time() +                if changed: +                    self.statusbar.redraw() +                    size = self.drawscreen() +                changed = self.tick(self.masterq) +                self.ui.set_input_timeouts(max_wait=0.1) +                keys = self.ui.get_input() +                if keys: +                    changed = True +                for k in keys: +                    if self.prompting: +                        if k == "esc": +                            self.prompt_cancel() +                        elif self.onekey: +                            if k == "enter": +                                self.prompt_cancel() +                            elif k in self.onekey: +                                self.prompt_execute(k) +                        elif k == "enter": +                            self.prompt_execute() +                        else: +                            self.view.keypress(size, k) +                    else: +                        k = self.view.keypress(size, k) +                        if k: +                            self.statusbar.message("") +                            if k == "?": +                                self.view_help() +                            elif k == "c": +                                if not self.client_playback: +                                    self.path_prompt( +                                        "Client replay: ", +                                        self.state.last_saveload, +                                        self.client_playback_path +                                    ) +                                else: +                                    self.prompt_onekey( +                                        "Stop current client replay?", +                                        ( +                                            ("yes", "y"), +                                            ("no", "n"), +                                        ), +                                        self.stop_client_playback_prompt, +                                    ) +                            elif k == "i": +                                self.prompt( +                                    "Intercept filter: ", +                                    self.state.intercept_txt, +                                    self.set_intercept +                                ) +                                self.sync_list_view() +                            elif k == "Q": +                                raise Stop +                            elif k == "q": +                                self.prompt_onekey( +                                    "Quit", +                                    ( +                                        ("yes", "y"), +                                        ("no", "n"), +                                    ), +                                    self.quit, +                                ) +                            elif k == "s": +                                if self.script: +                                    self.load_script(None) +                                else: +                                    self.path_prompt( +                                        "Set script: ", +                                        self.state.last_script, +                                        self.set_script +                                    ) +                            elif k == "S": +                                if not self.server_playback: +                                    self.path_prompt( +                                        "Server replay: ", +                                        self.state.last_saveload, +                                        self.server_playback_path +                                    ) +                                else: +                                    self.prompt_onekey( +                                        "Stop current server replay?", +                                        ( +                                            ("yes", "y"), +                                            ("no", "n"), +                                        ), +                                        self.stop_server_playback_prompt, +                                    ) +                            elif k == "o": +                                self.prompt_onekey( +                                        "Options", +                                        ( +                                            ("anticache", "a"), +                                            ("anticomp", "c"), +                                            ("killextra", "k"), +                                            ("norefresh", "n"), +                                        ), +                                        self._change_options +                                ) +                            elif k == "t": +                                self.prompt( +                                    "Sticky cookie filter: ", +                                    self.stickycookie_txt, +                                    self.set_stickycookie +                                ) +                            elif k == "u": +                                self.prompt( +                                    "Sticky auth filter: ", +                                    self.stickyauth_txt, +                                    self.set_stickyauth +                                ) +                self.looptime = time.time() - startloop +        except (Stop, KeyboardInterrupt): +            pass + +    def stop_client_playback_prompt(self, a): +        if a != "n": +            self.stop_client_playback() + +    def stop_server_playback_prompt(self, a): +        if a != "n": +            self.stop_server_playback() + +    def quit(self, a): +        if a != "n": +            raise Stop + +    def _change_options(self, a): +        if a == "a": +            self.anticache = not self.anticache +        if a == "c": +            self.anticomp = not self.anticomp +        elif a == "k": +            self.killextra = not self.killextra +        elif a == "n": +            self.refresh_server_playback = not self.refresh_server_playback + +    def shutdown(self): +        self.state.killall(self) +        controller.Master.shutdown(self) + +    def sync_list_view(self): +        self.conn_list_view._modified() + +    def clear_connections(self): +        self.state.clear() +        self.sync_list_view() + +    def delete_connection(self, f): +        self.state.delete_flow(f) +        self.sync_list_view() + +    def refresh_connection(self, c): +        if hasattr(self.header, "refresh_connection"): +            self.header.refresh_connection(c) +        if hasattr(self.body, "refresh_connection"): +            self.body.refresh_connection(c) +        if hasattr(self.statusbar, "refresh_connection"): +            self.statusbar.refresh_connection(c) + +    def process_flow(self, f, r): +        if self.state.intercept and f.match(self.state.intercept) and not f.request.is_replay(): +            f.intercept() +        else: +            r._ack() +        self.sync_list_view() +        self.refresh_connection(f) + +    def clear_events(self): +        self.eventlist[:] = [] + +    def add_event(self, e, level="info"): +        if level == "info": +            e = urwid.Text(e) +        elif level == "error": +            e = urwid.Text(("error", e)) + +        self.eventlist.append(e) +        if len(self.eventlist) > EVENTLOG_SIZE: +            self.eventlist.pop(0) +        self.eventlist.set_focus(len(self.eventlist)) + +    # Handlers +    def handle_error(self, r): +        f = flow.FlowMaster.handle_error(self, r) +        if f: +            self.process_flow(f, r) +        return f + +    def handle_request(self, r): +        f = flow.FlowMaster.handle_request(self, r) +        if f: +            self.process_flow(f, r) +        return f + +    def handle_response(self, r): +        f = flow.FlowMaster.handle_response(self, r) +        if f: +            self.process_flow(f, r) +        return f + diff --git a/libmproxy/console/common.py b/libmproxy/console/common.py new file mode 100644 index 00000000..6fbb5e19 --- /dev/null +++ b/libmproxy/console/common.py @@ -0,0 +1,148 @@ +import urwid +from .. import utils + + +VIEW_BODY_RAW = 0 +VIEW_BODY_HEX = 1 +VIEW_BODY_PRETTY = 2 + +BODY_VIEWS = { +    VIEW_BODY_RAW: "raw", +    VIEW_BODY_HEX: "hex", +    VIEW_BODY_PRETTY: "pretty" +} + +VIEW_FLOW_REQUEST = 0 +VIEW_FLOW_RESPONSE = 1 + + +def highlight_key(s, k): +    l = [] +    parts = s.split(k, 1) +    if parts[0]: +        l.append(("text", parts[0])) +    l.append(("key", k)) +    if parts[1]: +        l.append(("text", parts[1])) +    return l + + +def format_keyvals(lst, key="key", val="text", space=5, indent=0): +    """ +        Format a list of (key, value) tuples. + +        If key is None, it's treated specially: +            - We assume a sub-value, and add an extra indent. +            - The value is treated as a pre-formatted list of directives. +    """ +    ret = [] +    if lst: +        pad = max(len(i[0]) for i in lst if i and i[0]) + space +        for i in lst: +            if i is None: +                ret.extend("\n") +            elif i[0] is None: +                ret.append(" "*(pad + indent*2)) +                ret.extend(i[1]) +                ret.append("\n") +            else: +                ret.extend( +                    [ +                        " "*indent, +                        (key, i[0]), +                        " "*(pad-len(i[0])), +                        (val, i[1]), +                        "\n" +                    ] +                ) +    return ret + + +def shortcuts(k): +    if k == " ": +        k = "page down" +    elif k == "j": +        k = "down" +    elif k == "k": +        k = "up" +    return k + + +def format_flow(f, focus, extended=False, padding=2): +    txt = [] +    if extended: +        txt.append(("highlight", utils.format_timestamp(f.request.timestamp))) +    txt.append(" ") +    if f.request.is_replay(): +        txt.append(("method", "[replay]")) +    txt.extend([ +        ("ack", "!") if f.intercepting and not f.request.acked else " ", +        ("method", f.request.method), +        " ", +        ( +            "text" if (f.response or f.error) else "title", +            f.request.get_url(), +        ), +    ]) +    if f.response or f.error or f.request.is_replay(): +        tsr = f.response or f.error +        if extended and tsr: +            ts = ("highlight", utils.format_timestamp(tsr.timestamp) + " ") +        else: +            ts = " " + +        txt.append("\n") +        txt.append(("text", ts)) +        txt.append(" "*(padding+2)) + +    if f.response: +        txt.append( +           ("ack", "!") if f.intercepting and not f.response.acked else " " +        ) +        txt.append("<- ") +        if f.response.is_replay(): +            txt.append(("method", "[replay] ")) +        if f.response.code in [200, 304]: +            txt.append(("goodcode", str(f.response.code))) +        else: +            txt.append(("error", str(f.response.code))) +        t = f.response.headers["content-type"] +        if t: +            t = t[0].split(";")[0] +            txt.append(("text", " %s"%t)) +        if f.response.content: +            txt.append(", %s"%utils.pretty_size(len(f.response.content))) +    elif f.error: +        txt.append( +           ("error", f.error.msg) +        ) + +    if focus: +        txt.insert(0, ("focus", ">>" + " "*(padding-2))) +    else: +        txt.insert(0, " "*padding) +    return txt + + + + +def int_version(v): +    SIG = 3 +    v = urwid.__version__.split("-")[0].split(".") +    x = 0 +    for i in range(min(SIG, len(v))): +        x += int(v[i]) * 10**(SIG-i) +    return x + + +# We have to do this to be portable over 0.9.8 and 0.9.9 If compatibility +# becomes a pain to maintain, we'll just mandate 0.9.9 or newer. +class WWrap(urwid.WidgetWrap): +    if int_version(urwid.__version__) >= 990: +        def set_w(self, x): +            self._w = x +        def get_w(self): +            return self._w +        w = property(get_w, set_w) + + diff --git a/libmproxy/console/connlist.py b/libmproxy/console/connlist.py new file mode 100644 index 00000000..0f238322 --- /dev/null +++ b/libmproxy/console/connlist.py @@ -0,0 +1,197 @@ +import urwid +import common + +def _mkhelp(): +    text = [] +    keys = [ +        ("A", "accept all intercepted connections"), +        ("a", "accept this intercepted connection"), +        ("C", "clear connection list or eventlog"), +        ("d", "delete connection from view"), +        ("l", "set limit filter pattern"), +        ("L", "load saved flows"), +        ("r", "replay request"), +        ("R", "revert changes to request"), +        ("v", "toggle eventlog"), +        ("w", "save all flows matching current limit"), +        ("W", "save this flow"), +        ("X", "kill and delete connection, even if it's mid-intercept"), +        ("tab", "tab between eventlog and connection list"), +        ("enter", "view connection"), +        ("|", "run script on this flow"), +    ] +    text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) +    return text +help_context = _mkhelp() + + +class EventListBox(urwid.ListBox): +    def __init__(self, master): +        self.master = master +        urwid.ListBox.__init__(self, master.eventlist) + +    def keypress(self, size, key): +        key = common.shortcuts(key) +        if key == "C": +            self.master.clear_events() +            key = None +        return urwid.ListBox.keypress(self, size, key) + + +class BodyPile(urwid.Pile): +    def __init__(self, master): +        h = urwid.Text("Event log") +        h = urwid.Padding(h, align="left", width=("relative", 100)) + +        self.inactive_header = urwid.AttrWrap(h, "inactive_heading") +        self.active_header = urwid.AttrWrap(h, "heading") + +        urwid.Pile.__init__( +            self, +            [ +                ConnectionListBox(master), +                urwid.Frame(EventListBox(master), header = self.inactive_header) +            ] +        ) +        self.master = master +        self.focus = 0 + +    def keypress(self, size, key): +        if key == "tab": +            self.focus = (self.focus + 1)%len(self.widget_list) +            self.set_focus(self.focus) +            if self.focus == 1: +                self.widget_list[1].header = self.active_header +            else: +                self.widget_list[1].header = self.inactive_header +            key = None +        elif key == "v": +            self.master.toggle_eventlog() +            key = None + +        # This is essentially a copypasta from urwid.Pile's keypress handler. +        # So much for "closed for modification, but open for extension". +        item_rows = None +        if len(size)==2: +            item_rows = self.get_item_rows( size, focus=True ) +        i = self.widget_list.index(self.focus_item) +        tsize = self.get_item_size(size,i,True,item_rows) +        return self.focus_item.keypress( tsize, key ) + + + +class ConnectionItem(common.WWrap): +    def __init__(self, master, state, flow, focus): +        self.master, self.state, self.flow = master, state, flow +        self.focus = focus +        w = self.get_text() +        common.WWrap.__init__(self, w) + +    def get_text(self): +        return urwid.Text(common.format_flow(self.flow, self.focus)) + +    def selectable(self): +        return True + +    def keypress(self, (maxcol,), key): +        key = common.shortcuts(key) +        if key == "a": +            self.flow.accept_intercept() +            self.master.sync_list_view() +        elif key == "d": +            self.flow.kill(self.master) +            self.state.delete_flow(self.flow) +            self.master.sync_list_view() +        elif key == "l": +            self.master.prompt("Limit: ", self.state.limit_txt, self.master.set_limit) +            self.master.sync_list_view() +        elif key == "L": +            self.master.path_prompt( +                "Load flows: ", +                self.state.last_saveload, +                self.master.load_flows_callback +            ) +        elif key == "r": +            r = self.master.replay_request(self.flow) +            if r: +                self.master.statusbar.message(r) +            self.master.sync_list_view() +        elif key == "R": +            self.state.revert(self.flow) +            self.master.sync_list_view() +        elif key == "w": +            self.master.path_prompt( +                "Save flows: ", +                self.state.last_saveload, +                self.master.save_flows +            ) +        elif key == "W": +            self.master.path_prompt( +                "Save this flow: ", +                self.state.last_saveload, +                self.master.save_one_flow, +                self.flow +            ) +        elif key == "X": +            self.flow.kill(self.master) +        elif key == "v": +            self.master.toggle_eventlog() +        elif key == "enter": +            if self.flow.request: +                self.master.view_flow(self.flow) +        elif key == "|": +            self.master.path_prompt( +                "Send flow to script: ", +                self.state.last_script, +                self.master.run_script_once, +                self.flow +            ) +        else: +            return key + + +class ConnectionListView(urwid.ListWalker): +    def __init__(self, master, state): +        self.master, self.state = master, state +        if self.state.flow_count(): +            self.set_focus(0) + +    def get_focus(self): +        f, i = self.state.get_focus() +        f = ConnectionItem(self.master, self.state, f, True) if f else None +        return f, i + +    def set_focus(self, focus): +        ret = self.state.set_focus(focus) +        self._modified() +        return ret + +    def get_next(self, pos): +        f, i = self.state.get_next(pos) +        f = ConnectionItem(self.master, self.state, f, False) if f else None +        return f, i + +    def get_prev(self, pos): +        f, i = self.state.get_prev(pos) +        f = ConnectionItem(self.master, self.state, f, False) if f else None +        return f, i + + +class ConnectionListBox(urwid.ListBox): +    def __init__(self, master): +        self.master = master +        urwid.ListBox.__init__(self, master.conn_list_view) + +    def keypress(self, size, key): +        key = common.shortcuts(key) +        if key == "A": +            self.master.accept_all() +            self.master.sync_list_view() +            key = None +        elif key == "C": +            self.master.clear_connections() +            key = None +        elif key == "v": +            self.master.toggle_eventlog() +            key = None +        return urwid.ListBox.keypress(self, size, key) diff --git a/libmproxy/console/connview.py b/libmproxy/console/connview.py new file mode 100644 index 00000000..289d8024 --- /dev/null +++ b/libmproxy/console/connview.py @@ -0,0 +1,533 @@ +import urwid +import common +from .. import utils, encoding, flow + +def _mkhelp(): +    text = [] +    keys = [ +        ("A", "accept all intercepted connections"), +        ("a", "accept this intercepted connection"), +        ("b", "save request/response body"), +        ("e", "edit request/response"), +        ("m", "change body display mode"), +            (None, +                common.highlight_key("raw", "r") + +                [("text", ": raw data")] +            ), +            (None, +                common.highlight_key("pretty", "p") + +                [("text", ": pretty-print XML, HTML and JSON")] +            ), +            (None, +                common.highlight_key("hex", "h") + +                [("text", ": hex dump")] +            ), +        ("p", "previous flow"), +        ("r", "replay request"), +        ("R", "revert changes to request"), +        ("v", "view body in external viewer"), +        ("w", "save all flows matching current limit"), +        ("W", "save this flow"), +        ("z", "encode/decode a request/response"), +        ("tab", "toggle request/response view"), +        ("space", "next flow"), +        ("|", "run script on this flow"), +    ] +    text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) +    return text +help_context = _mkhelp() + + +VIEW_CUTOFF = 1024*100 + +class ConnectionViewHeader(common.WWrap): +    def __init__(self, master, f): +        self.master, self.flow = master, f +        self.w = urwid.Text(common.format_flow(f, False, extended=True, padding=0)) + +    def refresh_connection(self, f): +        if f == self.flow: +            self.w = urwid.Text(common.format_flow(f, False, extended=True, padding=0)) + + +class CallbackCache: +    @utils.LRUCache(20) +    def callback(self, obj, method, *args, **kwargs): +        return getattr(obj, method)(*args, **kwargs) +cache = CallbackCache() + + +class ConnectionView(common.WWrap): +    REQ = 0 +    RESP = 1 +    methods = [ +        ("get", "g"), +        ("post", "p"), +        ("put", "u"), +        ("head", "h"), +        ("trace", "t"), +        ("delete", "d"), +        ("options", "o"), +    ] +    def __init__(self, master, state, flow): +        self.master, self.state, self.flow = master, state, flow +        if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and flow.response: +            self.view_response() +        else: +            self.view_request() + +    def _trailer(self, clen, txt): +        rem = clen - VIEW_CUTOFF +        if rem > 0: +            txt.append(urwid.Text("")) +            txt.append( +                urwid.Text( +                    [ +                        ("highlight", "... %s of data not shown"%utils.pretty_size(rem)) +                    ] +                ) +            ) + +    def _view_conn_raw(self, content): +        txt = [] +        for i in utils.cleanBin(content[:VIEW_CUTOFF]).splitlines(): +            txt.append( +                urwid.Text(("text", i)) +            ) +        self._trailer(len(content), txt) +        return txt + +    def _view_conn_binary(self, content): +        txt = [] +        for offset, hexa, s in utils.hexdump(content[:VIEW_CUTOFF]): +            txt.append(urwid.Text([ +                ("offset", offset), +                " ", +                ("text", hexa), +                "   ", +                ("text", s), +            ])) +        self._trailer(len(content), txt) +        return txt + +    def _view_conn_xmlish(self, content): +        txt = [] +        for i in utils.pretty_xmlish(content[:VIEW_CUTOFF]): +            txt.append( +                urwid.Text(("text", i)), +            ) +        self._trailer(len(content), txt) +        return txt + +    def _view_conn_json(self, lines): +        txt = [] +        sofar = 0 +        for i in lines: +            sofar += len(i) +            txt.append( +                urwid.Text(("text", i)), +            ) +            if sofar > VIEW_CUTOFF: +                break +        self._trailer(sum(len(i) for i in lines), txt) +        return txt + +    def _view_conn_formdata(self, content, boundary): +        rx = re.compile(r'\bname="([^"]+)"') +        keys = [] +        vals = [] + +        for i in content.split("--" + boundary): +            parts = i.splitlines() +            if len(parts) > 1 and parts[0][0:2] != "--": +                match = rx.search(parts[1]) +                if match: +                    keys.append(match.group(1) + ":") +                    vals.append(utils.cleanBin( +                        "\n".join(parts[3+parts[2:].index(""):]) +                    )) +        kv = common.format_keyvals( +            zip(keys, vals), +            key = "header", +            val = "text" +        ) +        return [ +            urwid.Text(("highlight", "Form data:\n")), +            urwid.Text(kv) +        ] + +    def _view_conn_urlencoded(self, lines): +        kv = common.format_keyvals( +                [(k+":", v) for (k, v) in lines], +                key = "header", +                val = "text" +             ) +        return [ +                    urwid.Text(("highlight", "URLencoded data:\n")), +                    urwid.Text(kv) +                ] + +    def _find_pretty_view(self, content, hdrItems): +        ctype = None +        for i in hdrItems: +            if i[0].lower() == "content-type": +                ctype = i[1] +                break +        if ctype and "x-www-form-urlencoded" in ctype: +            data = utils.urldecode(content) +            if data: +                return self._view_conn_urlencoded(data) +        if utils.isXML(content): +            return self._view_conn_xmlish(content) +        elif ctype and "application/json" in ctype: +            lines = utils.pretty_json(content) +            if lines: +                return self._view_conn_json(lines) +        elif ctype and "multipart/form-data" in ctype: +            boundary = ctype.split('boundary=') +            if len(boundary) > 1: +                return self._view_conn_formdata(content, boundary[1].split(';')[0]) +        return self._view_conn_raw(content) + +    def _cached_conn_text(self, e, content, hdrItems, viewmode): +        hdr = [] +        hdr.extend( +            common.format_keyvals( +                [(h+":", v) for (h, v) in hdrItems], +                key = "header", +                val = "text" +            ) +        ) +        hdr.append("\n") + +        txt = [urwid.Text(hdr)] +        if content: +            if viewmode == common.VIEW_BODY_HEX: +                txt.extend(self._view_conn_binary(content)) +            elif viewmode == common.VIEW_BODY_PRETTY: +                if e: +                    decoded = encoding.decode(e, content) +                    if decoded: +                        content = decoded +                        if e and e != "identity": +                            txt.append( +                                urwid.Text(("highlight", "Decoded %s data:\n"%e)) +                            ) +                txt.extend(self._find_pretty_view(content, hdrItems)) +            else: +                txt.extend(self._view_conn_raw(content)) +        return urwid.ListBox(txt) + + + + +    def _tab(self, content, active): +        if active: +            attr = "heading" +        else: +            attr = "inactive" +        p = urwid.Text(content) +        p = urwid.Padding(p, align="left", width=("relative", 100)) +        p = urwid.AttrWrap(p, attr) +        return p + +    def wrap_body(self, active, body): +        parts = [] + +        if self.flow.intercepting and not self.flow.request.acked: +            qt = "Request (intercepted)" +        else: +            qt = "Request" +        if active == common.VIEW_FLOW_REQUEST: +            parts.append(self._tab(qt, True)) +        else: +            parts.append(self._tab(qt, False)) + +        if self.flow.intercepting and not self.flow.response.acked: +            st = "Response (intercepted)" +        else: +            st = "Response" +        if active == common.VIEW_FLOW_RESPONSE: +            parts.append(self._tab(st, True)) +        else: +            parts.append(self._tab(st, False)) + +        h = urwid.Columns(parts, dividechars=1) +        f = urwid.Frame( +                    body, +                    header=h +                ) +        return f + +    def _conn_text(self, conn, viewmode): +        e = conn.headers["content-encoding"] +        e = e[0] if e else None +        return cache.callback( +                    self, "_cached_conn_text", +                    e, +                    conn.content, +                    tuple(tuple(i) for i in conn.headers.lst), +                    viewmode +                ) + +    def view_request(self): +        self.state.view_flow_mode = common.VIEW_FLOW_REQUEST +        self.master.statusbar.update("Calculating view...") +        body = self._conn_text( +            self.flow.request, +            self.state.view_body_mode +        ) +        self.w = self.wrap_body(common.VIEW_FLOW_REQUEST, body) +        self.master.statusbar.update("") + +    def view_response(self): +        self.state.view_flow_mode = common.VIEW_FLOW_RESPONSE +        self.master.statusbar.update("Calculating view...") +        if self.flow.response: +            body = self._conn_text( +                self.flow.response, +                self.state.view_body_mode +            ) +        else: +            body = urwid.ListBox( +                        [ +                            urwid.Text(""), +                            urwid.Text( +                                [ +                                    ("highlight", "No response. Press "), +                                    ("key", "e"), +                                    ("highlight", " and edit any aspect to add one."), +                                ] +                            ) +                        ] +                   ) +        self.w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body) +        self.master.statusbar.update("") + +    def refresh_connection(self, c=None): +        if c == self.flow: +            if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response: +                self.view_response() +            else: +                self.view_request() + +    def edit_method(self, m): +        for i in self.methods: +            if i[1] == m: +                self.flow.request.method = i[0].upper() +        self.master.refresh_connection(self.flow) + +    def save_body(self, path): +        if not path: +            return +        self.state.last_saveload = path +        if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: +            c = self.flow.request +        else: +            c = self.flow.response +        path = os.path.expanduser(path) +        try: +            f = file(path, "wb") +            f.write(str(c.content)) +            f.close() +        except IOError, v: +            self.master.statusbar.message(v.strerror) + +    def set_url(self, url): +        request = self.flow.request +        if not request.set_url(str(url)): +            return "Invalid URL." +        self.master.refresh_connection(self.flow) + +    def set_resp_code(self, code): +        response = self.flow.response +        try: +            response.code = int(code) +        except ValueError: +            return None +        import BaseHTTPServer +        if BaseHTTPServer.BaseHTTPRequestHandler.responses.has_key(int(code)): +            response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[int(code)][0] +        self.master.refresh_connection(self.flow) + +    def set_resp_msg(self, msg): +        response = self.flow.response +        response.msg = msg +        self.master.refresh_connection(self.flow) + +    def set_headers(self, lst, conn): +        conn.headers = flow.Headers(lst) + +    def edit(self, part): +        if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: +            conn = self.flow.request +        else: +            if not self.flow.response: +                self.flow.response = flow.Response(self.flow.request, 200, "OK", flow.Headers(), "") +            conn = self.flow.response + +        self.flow.backup() +        if part == "b": +            c = self.master.spawn_editor(conn.content or "") +            conn.content = c.rstrip("\n") +        elif part == "h": +            self.master.view_kveditor("Editing headers", conn.headers.lst, self.set_headers, conn) +        elif part == "u" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: +            self.master.prompt_edit("URL", conn.get_url(), self.set_url) +        elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: +            self.master.prompt_onekey("Method", self.methods, self.edit_method) +        elif part == "c" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: +            self.master.prompt_edit("Code", str(conn.code), self.set_resp_code) +        elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: +            self.master.prompt_edit("Message", conn.msg, self.set_resp_msg) +        self.master.refresh_connection(self.flow) + +    def _view_nextprev_flow(self, np, flow): +        try: +            idx = self.state.view.index(flow) +        except IndexError: +            return +        if np == "next": +            new_flow, new_idx = self.state.get_next(idx) +        else: +            new_flow, new_idx = self.state.get_prev(idx) +        if new_idx is None: +            return +        self.master.view_flow(new_flow) + +    def view_next_flow(self, flow): +        return self._view_nextprev_flow("next", flow) + +    def view_prev_flow(self, flow): +        return self._view_nextprev_flow("prev", flow) + +    def keypress(self, size, key): +        if key == " ": +            self.view_next_flow(self.flow) +            return key + +        key = common.shortcuts(key) +        if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: +            conn = self.flow.request +        else: +            conn = self.flow.response + +        if key == "q": +            self.master.view_connlist() +            key = None +        elif key == "tab": +            if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: +                self.view_response() +            else: +                self.view_request() +        elif key in ("up", "down", "page up", "page down"): +            # Why doesn't this just work?? +            self.w.body.keypress(size, key) +        elif key == "a": +            self.flow.accept_intercept() +            self.master.view_flow(self.flow) +        elif key == "A": +            self.master.accept_all() +            self.master.view_flow(self.flow) +        elif key == "e": +            if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: +                self.master.prompt_onekey( +                    "Edit request", +                    ( +                        ("header", "h"), +                        ("body", "b"), +                        ("url", "u"), +                        ("method", "m"), +                    ), +                    self.edit +                ) +            else: +                self.master.prompt_onekey( +                    "Edit response", +                    ( +                        ("code", "c"), +                        ("message", "m"), +                        ("header", "h"), +                        ("body", "b"), +                    ), +                    self.edit +                ) +            key = None +        elif key == "m": +            self.master.prompt_onekey( +                "View", +                ( +                    ("raw", "r"), +                    ("pretty", "p"), +                    ("hex", "h"), +                ), +                self.master.changeview +            ) +            key = None +        elif key == "p": +            self.view_prev_flow(self.flow) +        elif key == "r": +            r = self.master.replay_request(self.flow) +            if r: +                self.master.statusbar.message(r) +            self.master.refresh_connection(self.flow) +        elif key == "R": +            self.state.revert(self.flow) +            self.master.refresh_connection(self.flow) +        elif key == "W": +            self.master.path_prompt( +                "Save this flow: ", +                self.state.last_saveload, +                self.master.save_one_flow, +                self.flow +            ) +        elif key == "v": +            if conn and conn.content: +                t = conn.headers["content-type"] or [None] +                t = t[0] +                self.master.spawn_external_viewer(conn.content, t) +        elif key == "b": +            if conn: +                if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: +                    self.master.path_prompt( +                        "Save request body: ", +                        self.state.last_saveload, +                        self.save_body +                    ) +                else: +                    self.master.path_prompt( +                        "Save response body: ", +                        self.state.last_saveload, +                        self.save_body +                    ) +        elif key == "|": +            self.master.path_prompt( +                "Send flow to script: ", self.state.last_script, +                self.master.run_script_once, self.flow +            ) +        elif key == "z": +            if conn: +                e = conn.headers["content-encoding"] or ["identity"] +                if e[0] != "identity": +                    conn.decode() +                else: +                    self.master.prompt_onekey( +                        "Select encoding: ", +                        ( +                            ("gzip", "z"), +                            ("deflate", "d"), +                        ), +                        self.encode_callback, +                        conn +                    ) +                self.master.refresh_connection(self.flow) +        else: +            return key + +    def encode_callback(self, key, conn): +        encoding_map = { +            "z": "gzip", +            "d": "deflate", +        } +        conn.encode(encoding_map[key]) +        self.master.refresh_connection(self.flow) diff --git a/libmproxy/console/help.py b/libmproxy/console/help.py new file mode 100644 index 00000000..22b42475 --- /dev/null +++ b/libmproxy/console/help.py @@ -0,0 +1,114 @@ +import urwid +import common +from .. import filt + +class HelpView(urwid.ListBox): +    def __init__(self, master, help_context, state): +        self.master, self.state = master, state +        self.help_context = help_context or [] +        urwid.ListBox.__init__( +            self, +            self.helptext() +        ) + +    def keypress(self, size, key): +        key = common.shortcuts(key) +        if key == "q": +            self.master.statusbar = self.state[0] +            self.master.body = self.state[1] +            self.master.header = self.state[2] +            self.master.make_view() +            return None +        return urwid.ListBox.keypress(self, size, key) + +    def helptext(self): +        text = [] +        text.append(("head", "Keys for this view:\n")) +        text.extend(self.help_context) + +        text.append(("head", "\n\nMovement:\n")) +        keys = [ +            ("j, k", "up, down"), +            ("h, l", "left, right (in some contexts)"), +            ("space", "page down"), +            ("pg up/down", "page up/down"), +            ("arrows", "up, down, left, right"), +        ] +        text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + +        text.append(("head", "\n\nGlobal keys:\n")) +        keys = [ +            ("c", "client replay"), +            ("i", "set interception pattern"), + +            ("o", "toggle options:"), +            (None, +                common.highlight_key("anticache", "a") + +                [("text", ": prevent cached responses")] +            ), +            (None, +                common.highlight_key("anticomp", "c") + +                [("text", ": prevent compressed responses")] +            ), +            (None, +                common.highlight_key("killextra", "k") + +                [("text", ": kill requests not part of server replay")] +            ), +            (None, +                common.highlight_key("norefresh", "n") + +                [("text", ": disable server replay response refresh")] +            ), + +            ("q", "quit / return to connection list"), +            ("Q", "quit without confirm prompt"), +            ("s", "set/unset script"), +            ("S", "server replay"), +            ("t", "set sticky cookie expression"), +            ("u", "set sticky auth expression"), +        ] +        text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + +        text.append(("head", "\n\nFilter expressions:\n")) +        f = [] +        for i in filt.filt_unary: +            f.append( +                ("~%s"%i.code, i.help) +            ) +        for i in filt.filt_rex: +            f.append( +                ("~%s regex"%i.code, i.help) +            ) +        for i in filt.filt_int: +            f.append( +                ("~%s int"%i.code, i.help) +            ) +        f.sort() +        f.extend( +            [ +                ("!", "unary not"), +                ("&", "and"), +                ("|", "or"), +                ("(...)", "grouping"), +            ] +        ) +        text.extend(common.format_keyvals(f, key="key", val="text", indent=4)) + +        text.extend( +           [ +                "\n", +                ("text", "    Regexes are Python-style.\n"), +                ("text", "    Regexes can be specified as quoted strings.\n"), +                ("text", "    Header matching (~h, ~hq, ~hs) is against a string of the form \"name: value\".\n"), +                ("text", "    Expressions with no operators are regex matches against URL.\n"), +                ("text", "    Default binary operator is &.\n"), +                ("head", "\n    Examples:\n"), +           ] +        ) +        examples = [ +                ("google\.com", "Url containing \"google.com"), +                ("~q ~b test", "Requests where body contains \"test\""), +                ("!(~q & ~t \"text/html\")", "Anything but requests with a text/html content type."), +        ] +        text.extend(common.format_keyvals(examples, key="key", val="text", indent=4)) +        return [urwid.Text(text)] + diff --git a/libmproxy/console/kveditor.py b/libmproxy/console/kveditor.py new file mode 100644 index 00000000..632a6992 --- /dev/null +++ b/libmproxy/console/kveditor.py @@ -0,0 +1,232 @@ +import copy +import urwid +import common +from .. import utils + + +def _mkhelp(): +    text = [] +    keys = [ +        ("A", "insert row before cursor"), +        ("a", "add row after cursor"), +        ("d", "delete row"), +        ("e", "spawn external editor on current field"), +        ("q", "return to flow view"), +        ("esc", "return to flow view/exit field edit mode"), +        ("tab", "next field"), +        ("enter", "edit field"), +    ] +    text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) +    return text +help_context = _mkhelp() + + +class SText(common.WWrap): +    def __init__(self, txt, focused): +        w = urwid.Text(txt, wrap="any") +        if focused: +            w = urwid.AttrWrap(w, "focusfield") +        common.WWrap.__init__(self, w) + +    def get_text(self): +        return self.w.get_text()[0] + +    def keypress(self, size, key): +        return key + +    def selectable(self): +        return True + + +class SEdit(common.WWrap): +    def __init__(self, txt): +        w = urwid.Edit(edit_text=txt, wrap="any", multiline=True) +        w = urwid.AttrWrap(w, "editfield") +        common.WWrap.__init__(self, w) + +    def get_text(self): +        return self.w.get_text()[0] + +    def selectable(self): +        return True + + +class KVItem(common.WWrap): +    def __init__(self, focused, editing, maxk, k, v): +        self.focused, self.editing, self.maxk = focused, editing, maxk +        if focused == 0 and editing: +            self.editing = self.kf = SEdit(k) +        else: +            self.kf = SText(k, True if focused == 0 else False) + +        if focused == 1 and editing: +            self.editing = self.vf = SEdit(v) +        else: +            self.vf = SText(v, True if focused == 1 else False) + +        w = urwid.Columns( +            [ +                ("fixed", maxk + 2, self.kf), +                self.vf +            ], +            dividechars = 2 +        ) +        if focused is not None: +            w.set_focus_column(focused) +        common.WWrap.__init__(self, w) + +    def get_kv(self): +        return (self.kf.get_text(), self.vf.get_text()) + +    def keypress(self, s, k): +        if self.editing: +            k = self.editing.keypress((s[0]-self.maxk-4,), k) +        return k + +    def selectable(self): +        return True + + +class KVWalker(urwid.ListWalker): +    def __init__(self, lst): +        self.lst = lst +        self.maxk = max(len(v[0]) for v in lst) +        self.focus = 0 +        self.focus_col = 0 +        self.editing = False + +    def get_current_value(self): +        if self.lst: +            return self.lst[self.focus][self.focus_col] + +    def set_current_value(self, val): +        row = list(self.lst[self.focus]) +        row[self.focus_col] = val +        self.lst[self.focus] = tuple(row) + +    def delete_focus(self): +        if self.lst: +            del self.lst[self.focus] +            self.focus = min(len(self.lst)-1, self.focus) +            self._modified() + +    def _insert(self, pos): +        self.focus = pos +        self.lst.insert(self.focus, ("", "")) +        self.focus_col = 0 +        self.start_edit() + +    def insert(self): +        return self._insert(self.focus) + +    def add(self): +        return self._insert(min(self.focus + 1, len(self.lst))) + +    def start_edit(self): +        self.editing = KVItem(self.focus_col, True, self.maxk, *self.lst[self.focus]) +        self._modified() + +    def stop_edit(self): +        if self.editing: +            self.lst[self.focus] = self.editing.get_kv() +            self.editing = False +            self._modified() + +    def left(self): +        self.focus_col = 0 +        self._modified() + +    def right(self): +        self.focus_col = 1 +        self._modified() + +    def tab_next(self): +        self.stop_edit() +        if self.focus_col == 0: +            self.focus_col = 1 +        elif self.focus != len(self.lst)-1: +            self.focus_col = 0 +            self.focus += 1 +        self._modified() + +    def get_focus(self): +        if self.editing: +            return self.editing, self.focus +        elif self.lst: +            return KVItem(self.focus_col, False, self.maxk, *self.lst[self.focus]), self.focus +        else: +            return None, None + +    def set_focus(self, focus): +        self.stop_edit() +        self.focus = focus + +    def get_next(self, pos): +        if pos+1 >= len(self.lst): +            return None, None +        return KVItem(None, False, self.maxk, *self.lst[pos+1]), pos+1 + +    def get_prev(self, pos): +        if pos-1 < 0: +            return None, None +        return KVItem(None, False, self.maxk, *self.lst[pos-1]), pos-1 + + +class KVListBox(urwid.ListBox): +    def __init__(self, lw): +        urwid.ListBox.__init__(self, lw) + + +class KVEditor(common.WWrap): +    def __init__(self, master, title, value, callback, *cb_args, **cb_kwargs): +        value = copy.deepcopy(value) +        self.master, self.title, self.value, self.callback = master, title, value, callback +        self.cb_args, self.cb_kwargs = cb_args, cb_kwargs +        p = urwid.Text(title) +        p = urwid.Padding(p, align="left", width=("relative", 100)) +        p = urwid.AttrWrap(p, "heading") +        self.walker = KVWalker(self.value) +        self.lb = KVListBox(self.walker) +        self.w = urwid.Frame(self.lb, header = p) +        self.master.statusbar.update("") + +    def keypress(self, size, key): +        if self.walker.editing: +            if key in ["esc", "enter"]: +                self.walker.stop_edit() +            elif key == "tab": +                pf = self.walker.focus +                self.walker.tab_next() +                if self.walker.focus == pf: +                    self.walker.start_edit() +            else: +                self.w.keypress(size, key) +            return None + +        key = common.shortcuts(key) +        if key in ["q", "esc"]: +            self.callback(self.walker.lst, *self.cb_args, **self.cb_kwargs) +            self.master.pop_view() +        elif key in ["h", "left"]: +            self.walker.left() +        elif key in ["l", "right"]: +            self.walker.right() +        elif key == "tab": +            self.walker.tab_next() +        elif key == "a": +            self.walker.add() +        elif key == "A": +            self.walker.insert() +        elif key == "d": +            self.walker.delete_focus() +        elif key == "e": +            o = self.walker.get_current_value() +            if o is not None: +                n = self.master.spawn_editor(o) +                n = utils.clean_hanging_newline(n) +                self.walker.set_current_value(n) +                self.walker._modified() +        elif key in ["enter"]: +            self.walker.start_edit() +        else: +            return self.w.keypress(size, key) diff --git a/libmproxy/utils.py b/libmproxy/utils.py index 4e53e6ce..4339ec6d 100644 --- a/libmproxy/utils.py +++ b/libmproxy/utils.py @@ -411,6 +411,18 @@ def parse_url(url):      return scheme, host, port, path +def clean_hanging_newline(t): +    """ +        Many editors will silently add a newline to the final line of a +        document (I'm looking at you, Vim). This function fixes this common +        problem at the risk of removing a hanging newline in the rare cases +        where the user actually intends it. +    """ +    if t[-1] == "\n": +        return t[:-1] +    return t + +  def parse_size(s):      """          Parses a size specification. Valid specifications are:  | 
