diff options
| author | Maximilian Hils <git@maximilianhils.com> | 2016-07-24 19:35:39 -0700 | 
|---|---|---|
| committer | Maximilian Hils <git@maximilianhils.com> | 2016-07-24 21:17:41 -0700 | 
| commit | 480ae46b88e9ab75c5dade3da17e015e4cca3501 (patch) | |
| tree | 234e9b4430f39887e2f6bc94deae600dda7ae767 | |
| parent | 56796aeda25dda66621ce78af227ff46049ef811 (diff) | |
| download | mitmproxy-480ae46b88e9ab75c5dade3da17e015e4cca3501.tar.gz mitmproxy-480ae46b88e9ab75c5dade3da17e015e4cca3501.tar.bz2 mitmproxy-480ae46b88e9ab75c5dade3da17e015e4cca3501.zip | |
refactor grideditor for py3 compatibility
| -rw-r--r-- | mitmproxy/builtins/replace.py | 4 | ||||
| -rw-r--r-- | mitmproxy/console/grideditor.py | 719 | ||||
| -rw-r--r-- | mitmproxy/console/grideditor/__init__.py | 2 | ||||
| -rw-r--r-- | mitmproxy/console/grideditor/base.py | 427 | ||||
| -rw-r--r-- | mitmproxy/console/grideditor/col_bytes.py | 103 | ||||
| -rw-r--r-- | mitmproxy/console/grideditor/col_subgrid.py | 51 | ||||
| -rw-r--r-- | mitmproxy/console/grideditor/col_text.py | 55 | ||||
| -rw-r--r-- | mitmproxy/console/grideditor/editors.py | 239 | ||||
| -rw-r--r-- | mitmproxy/console/master.py | 14 | 
9 files changed, 886 insertions, 728 deletions
| diff --git a/mitmproxy/builtins/replace.py b/mitmproxy/builtins/replace.py index 74d30c05..2c94fbb5 100644 --- a/mitmproxy/builtins/replace.py +++ b/mitmproxy/builtins/replace.py @@ -13,8 +13,8 @@ class Replace:              .replacements is a list of tuples (fpat, rex, s):              fpatt: a string specifying a filter pattern. -            rex: a regular expression. -            s: the replacement string +            rex: a regular expression, as bytes. +            s: the replacement string, as bytes          """          lst = []          for fpatt, rex, s in options.replacements: diff --git a/mitmproxy/console/grideditor.py b/mitmproxy/console/grideditor.py deleted file mode 100644 index 87700fd7..00000000 --- a/mitmproxy/console/grideditor.py +++ /dev/null @@ -1,719 +0,0 @@ -from __future__ import absolute_import, print_function, division - -import copy -import os -import re - -import urwid - -from mitmproxy import exceptions -from mitmproxy import filt -from mitmproxy.builtins import script -from mitmproxy.console import common -from mitmproxy.console import signals -from netlib import strutils -from netlib.http import cookies -from netlib.http import user_agents - -FOOTER = [ -    ('heading_key', "enter"), ":edit ", -    ('heading_key', "q"), ":back ", -] -FOOTER_EDITING = [ -    ('heading_key', "esc"), ":stop editing ", -] - - -class TextColumn: -    subeditor = None - -    def __init__(self, heading): -        self.heading = heading - -    def text(self, obj): -        return SEscaped(obj or "") - -    def blank(self): -        return "" - -    def keypress(self, key, editor): -        if key == "r": -            if editor.walker.get_current_value() is not None: -                signals.status_prompt_path.send( -                    self, -                    prompt = "Read file", -                    callback = editor.read_file -                ) -        elif key == "R": -            if editor.walker.get_current_value() is not None: -                signals.status_prompt_path.send( -                    editor, -                    prompt = "Read unescaped file", -                    callback = editor.read_file, -                    args = (True,) -                ) -        elif key == "e": -            o = editor.walker.get_current_value() -            if o is not None: -                n = editor.master.spawn_editor(o.encode("string-escape")) -                n = strutils.clean_hanging_newline(n) -                editor.walker.set_current_value(n, False) -                editor.walker._modified() -        elif key in ["enter"]: -            editor.walker.start_edit() -        else: -            return key - - -class SubgridColumn: - -    def __init__(self, heading, subeditor): -        self.heading = heading -        self.subeditor = subeditor - -    def text(self, obj): -        p = cookies._format_pairs(obj, sep="\n") -        return urwid.Text(p) - -    def blank(self): -        return [] - -    def keypress(self, key, editor): -        if key in "rRe": -            signals.status_message.send( -                self, -                message = "Press enter to edit this field.", -                expire = 1000 -            ) -            return -        elif key in ["enter"]: -            editor.master.view_grideditor( -                self.subeditor( -                    editor.master, -                    editor.walker.get_current_value(), -                    editor.set_subeditor_value, -                    editor.walker.focus, -                    editor.walker.focus_col -                ) -            ) -        else: -            return key - - -class SEscaped(urwid.WidgetWrap): - -    def __init__(self, txt): -        txt = txt.encode("string-escape") -        w = urwid.Text(txt, wrap="any") -        urwid.WidgetWrap.__init__(self, w) - -    def get_text(self): -        return self._w.get_text()[0] - -    def keypress(self, size, key): -        return key - -    def selectable(self): -        return True - - -class SEdit(urwid.WidgetWrap): - -    def __init__(self, txt): -        txt = txt.encode("string-escape") -        w = urwid.Edit(edit_text=txt, wrap="any", multiline=True) -        w = urwid.AttrWrap(w, "editfield") -        urwid.WidgetWrap.__init__(self, w) - -    def get_text(self): -        return self._w.get_text()[0].strip() - -    def selectable(self): -        return True - - -class GridRow(urwid.WidgetWrap): - -    def __init__(self, focused, editing, editor, values): -        self.focused, self.editing, self.editor = focused, editing, editor - -        errors = values[1] -        self.fields = [] -        for i, v in enumerate(values[0]): -            if focused == i and editing: -                self.editing = SEdit(v) -                self.fields.append(self.editing) -            else: -                w = self.editor.columns[i].text(v) -                if focused == i: -                    if i in errors: -                        w = urwid.AttrWrap(w, "focusfield_error") -                    else: -                        w = urwid.AttrWrap(w, "focusfield") -                elif i in errors: -                    w = urwid.AttrWrap(w, "field_error") -                self.fields.append(w) - -        fspecs = self.fields[:] -        if len(self.fields) > 1: -            fspecs[0] = ("fixed", self.editor.first_width + 2, fspecs[0]) -        w = urwid.Columns( -            fspecs, -            dividechars = 2 -        ) -        if focused is not None: -            w.set_focus_column(focused) -        urwid.WidgetWrap.__init__(self, w) - -    def get_edit_value(self): -        return self.editing.get_text() - -    def keypress(self, s, k): -        if self.editing: -            w = self._w.column_widths(s)[self.focused] -            k = self.editing.keypress((w,), k) -        return k - -    def selectable(self): -        return True - - -class GridWalker(urwid.ListWalker): - -    """ -        Stores rows as a list of (rows, errors) tuples, where rows is a list -        and errors is a set with an entry of each offset in rows that is an -        error. -    """ - -    def __init__(self, lst, editor): -        self.lst = [(i, set([])) for i in lst] -        self.editor = editor -        self.focus = 0 -        self.focus_col = 0 -        self.editing = False - -    def _modified(self): -        self.editor.show_empty_msg() -        return urwid.ListWalker._modified(self) - -    def add_value(self, lst): -        self.lst.append((lst[:], set([]))) -        self._modified() - -    def get_current_value(self): -        if self.lst: -            return self.lst[self.focus][0][self.focus_col] - -    def set_current_value(self, val, unescaped): -        if not unescaped: -            try: -                val = val.decode("string-escape") -            except ValueError: -                signals.status_message.send( -                    self, -                    message = "Invalid Python-style string encoding.", -                    expire = 1000 -                ) -                return -        errors = self.lst[self.focus][1] -        emsg = self.editor.is_error(self.focus_col, val) -        if emsg: -            signals.status_message.send(message = emsg, expire = 1) -            errors.add(self.focus_col) -        else: -            errors.discard(self.focus_col) -        self.set_value(val, self.focus, self.focus_col, errors) - -    def set_value(self, val, focus, focus_col, errors=None): -        if not errors: -            errors = set([]) -        row = list(self.lst[focus][0]) -        row[focus_col] = val -        self.lst[focus] = [tuple(row), errors] -        self._modified() - -    def delete_focus(self): -        if self.lst: -            del self.lst[self.focus] -            self.focus = min(len(self.lst) - 1, self.focus) -            self._modified() - -    def _insert(self, pos): -        self.focus = pos -        self.lst.insert( -            self.focus, -            [ -                [c.blank() for c in self.editor.columns], set([]) -            ] -        ) -        self.focus_col = 0 -        self.start_edit() - -    def insert(self): -        return self._insert(self.focus) - -    def add(self): -        return self._insert(min(self.focus + 1, len(self.lst))) - -    def start_edit(self): -        col = self.editor.columns[self.focus_col] -        if self.lst and not col.subeditor: -            self.editing = GridRow( -                self.focus_col, True, self.editor, self.lst[self.focus] -            ) -            self.editor.master.loop.widget.footer.update(FOOTER_EDITING) -            self._modified() - -    def stop_edit(self): -        if self.editing: -            self.editor.master.loop.widget.footer.update(FOOTER) -            self.set_current_value(self.editing.get_edit_value(), False) -            self.editing = False -            self._modified() - -    def left(self): -        self.focus_col = max(self.focus_col - 1, 0) -        self._modified() - -    def right(self): -        self.focus_col = min(self.focus_col + 1, len(self.editor.columns) - 1) -        self._modified() - -    def tab_next(self): -        self.stop_edit() -        if self.focus_col < len(self.editor.columns) - 1: -            self.focus_col += 1 -        elif self.focus != len(self.lst) - 1: -            self.focus_col = 0 -            self.focus += 1 -        self._modified() - -    def get_focus(self): -        if self.editing: -            return self.editing, self.focus -        elif self.lst: -            return GridRow( -                self.focus_col, -                False, -                self.editor, -                self.lst[self.focus] -            ), self.focus -        else: -            return None, None - -    def set_focus(self, focus): -        self.stop_edit() -        self.focus = focus -        self._modified() - -    def get_next(self, pos): -        if pos + 1 >= len(self.lst): -            return None, None -        return GridRow(None, False, self.editor, self.lst[pos + 1]), pos + 1 - -    def get_prev(self, pos): -        if pos - 1 < 0: -            return None, None -        return GridRow(None, False, self.editor, self.lst[pos - 1]), pos - 1 - - -class GridListBox(urwid.ListBox): - -    def __init__(self, lw): -        urwid.ListBox.__init__(self, lw) - - -FIRST_WIDTH_MAX = 40 -FIRST_WIDTH_MIN = 20 - - -class GridEditor(urwid.WidgetWrap): -    title = None -    columns = None - -    def __init__(self, master, value, callback, *cb_args, **cb_kwargs): -        value = self.data_in(copy.deepcopy(value)) -        self.master, self.value, self.callback = master, value, callback -        self.cb_args, self.cb_kwargs = cb_args, cb_kwargs - -        first_width = 20 -        if value: -            for r in value: -                assert len(r) == len(self.columns) -                first_width = max(len(r), first_width) -        self.first_width = min(first_width, FIRST_WIDTH_MAX) - -        title = urwid.Text(self.title) -        title = urwid.Padding(title, align="left", width=("relative", 100)) -        title = urwid.AttrWrap(title, "heading") - -        headings = [] -        for i, col in enumerate(self.columns): -            c = urwid.Text(col.heading) -            if i == 0 and len(self.columns) > 1: -                headings.append(("fixed", first_width + 2, c)) -            else: -                headings.append(c) -        h = urwid.Columns( -            headings, -            dividechars = 2 -        ) -        h = urwid.AttrWrap(h, "heading") - -        self.walker = GridWalker(self.value, self) -        self.lb = GridListBox(self.walker) -        self._w = urwid.Frame( -            self.lb, -            header = urwid.Pile([title, h]) -        ) -        self.master.loop.widget.footer.update("") -        self.show_empty_msg() - -    def show_empty_msg(self): -        if self.walker.lst: -            self._w.set_footer(None) -        else: -            self._w.set_footer( -                urwid.Text( -                    [ -                        ("highlight", "No values. Press "), -                        ("key", "a"), -                        ("highlight", " to add some."), -                    ] -                ) -            ) - -    def encode(self, s): -        if not self.encoding: -            return s -        try: -            return s.encode(self.encoding) -        except ValueError: -            return None - -    def read_file(self, p, unescaped=False): -        if p: -            try: -                p = os.path.expanduser(p) -                d = open(p, "rb").read() -                self.walker.set_current_value(d, unescaped) -                self.walker._modified() -            except IOError as v: -                return str(v) - -    def set_subeditor_value(self, val, focus, focus_col): -        self.walker.set_value(val, focus, focus_col) - -    def keypress(self, size, key): -        if self.walker.editing: -            if key in ["esc"]: -                self.walker.stop_edit() -            elif key == "tab": -                pf, pfc = self.walker.focus, self.walker.focus_col -                self.walker.tab_next() -                if self.walker.focus == pf and self.walker.focus_col != pfc: -                    self.walker.start_edit() -            else: -                self._w.keypress(size, key) -            return None - -        key = common.shortcuts(key) -        column = self.columns[self.walker.focus_col] -        if key in ["q", "esc"]: -            res = [] -            for i in self.walker.lst: -                if not i[1] and any([x for x in i[0]]): -                    res.append(i[0]) -            self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs) -            signals.pop_view_state.send(self) -        elif key == "g": -            self.walker.set_focus(0) -        elif key == "G": -            self.walker.set_focus(len(self.walker.lst) - 1) -        elif key in ["h", "left"]: -            self.walker.left() -        elif key in ["l", "right"]: -            self.walker.right() -        elif key == "tab": -            self.walker.tab_next() -        elif key == "a": -            self.walker.add() -        elif key == "A": -            self.walker.insert() -        elif key == "d": -            self.walker.delete_focus() -        elif column.keypress(key, self) and not self.handle_key(key): -            return self._w.keypress(size, key) - -    def data_out(self, data): -        """ -            Called on raw list data, before data is returned through the -            callback. -        """ -        return data - -    def data_in(self, data): -        """ -            Called to prepare provided data. -        """ -        return data - -    def is_error(self, col, val): -        """ -            Return False, or a string error message. -        """ -        return False - -    def handle_key(self, key): -        return False - -    def make_help(self): -        text = [] -        text.append(urwid.Text([("text", "Editor control:\n")])) -        keys = [ -            ("A", "insert row before cursor"), -            ("a", "add row after cursor"), -            ("d", "delete row"), -            ("e", "spawn external editor on current field"), -            ("q", "save changes and exit editor"), -            ("r", "read value from file"), -            ("R", "read unescaped value from file"), -            ("esc", "save changes and exit editor"), -            ("tab", "next field"), -            ("enter", "edit field"), -        ] -        text.extend( -            common.format_keyvals(keys, key="key", val="text", indent=4) -        ) -        text.append( -            urwid.Text( -                [ -                    "\n", -                    ("text", "Values are escaped Python-style strings.\n"), -                ] -            ) -        ) -        return text - - -class QueryEditor(GridEditor): -    title = "Editing query" -    columns = [ -        TextColumn("Key"), -        TextColumn("Value") -    ] - - -class HeaderEditor(GridEditor): -    title = "Editing headers" -    columns = [ -        TextColumn("Key"), -        TextColumn("Value") -    ] - -    def make_help(self): -        h = GridEditor.make_help(self) -        text = [] -        text.append(urwid.Text([("text", "Special keys:\n")])) -        keys = [ -            ("U", "add User-Agent header"), -        ] -        text.extend( -            common.format_keyvals(keys, key="key", val="text", indent=4) -        ) -        text.append(urwid.Text([("text", "\n")])) -        text.extend(h) -        return text - -    def set_user_agent(self, k): -        ua = user_agents.get_by_shortcut(k) -        if ua: -            self.walker.add_value( -                [ -                    "User-Agent", -                    ua[2] -                ] -            ) - -    def handle_key(self, key): -        if key == "U": -            signals.status_prompt_onekey.send( -                prompt = "Add User-Agent header:", -                keys = [(i[0], i[1]) for i in user_agents.UASTRINGS], -                callback = self.set_user_agent, -            ) -            return True - - -class URLEncodedFormEditor(GridEditor): -    title = "Editing URL-encoded form" -    columns = [ -        TextColumn("Key"), -        TextColumn("Value") -    ] - - -class ReplaceEditor(GridEditor): -    title = "Editing replacement patterns" -    columns = [ -        TextColumn("Filter"), -        TextColumn("Regex"), -        TextColumn("Replacement"), -    ] - -    def is_error(self, col, val): -        if col == 0: -            if not filt.parse(val): -                return "Invalid filter specification." -        elif col == 1: -            try: -                re.compile(val) -            except re.error: -                return "Invalid regular expression." -        return False - - -class SetHeadersEditor(GridEditor): -    title = "Editing header set patterns" -    columns = [ -        TextColumn("Filter"), -        TextColumn("Header"), -        TextColumn("Value"), -    ] - -    def is_error(self, col, val): -        if col == 0: -            if not filt.parse(val): -                return "Invalid filter specification" -        return False - -    def make_help(self): -        h = GridEditor.make_help(self) -        text = [] -        text.append(urwid.Text([("text", "Special keys:\n")])) -        keys = [ -            ("U", "add User-Agent header"), -        ] -        text.extend( -            common.format_keyvals(keys, key="key", val="text", indent=4) -        ) -        text.append(urwid.Text([("text", "\n")])) -        text.extend(h) -        return text - -    def set_user_agent(self, k): -        ua = user_agents.get_by_shortcut(k) -        if ua: -            self.walker.add_value( -                [ -                    ".*", -                    "User-Agent", -                    ua[2] -                ] -            ) - -    def handle_key(self, key): -        if key == "U": -            signals.status_prompt_onekey.send( -                prompt = "Add User-Agent header:", -                keys = [(i[0], i[1]) for i in user_agents.UASTRINGS], -                callback = self.set_user_agent, -            ) -            return True - - -class PathEditor(GridEditor): -    title = "Editing URL path components" -    columns = [ -        TextColumn("Component"), -    ] - -    def data_in(self, data): -        return [[i] for i in data] - -    def data_out(self, data): -        return [i[0] for i in data] - - -class ScriptEditor(GridEditor): -    title = "Editing scripts" -    columns = [ -        TextColumn("Command"), -    ] - -    def is_error(self, col, val): -        try: -            script.parse_command(val) -        except exceptions.AddonError as e: -            return str(e) - - -class HostPatternEditor(GridEditor): -    title = "Editing host patterns" -    columns = [ -        TextColumn("Regex (matched on hostname:port / ip:port)") -    ] - -    def is_error(self, col, val): -        try: -            re.compile(val, re.IGNORECASE) -        except re.error as e: -            return "Invalid regex: %s" % str(e) - -    def data_in(self, data): -        return [[i] for i in data] - -    def data_out(self, data): -        return [i[0] for i in data] - - -class CookieEditor(GridEditor): -    title = "Editing request Cookie header" -    columns = [ -        TextColumn("Name"), -        TextColumn("Value"), -    ] - - -class CookieAttributeEditor(GridEditor): -    title = "Editing Set-Cookie attributes" -    columns = [ -        TextColumn("Name"), -        TextColumn("Value"), -    ] - -    def data_out(self, data): -        ret = [] -        for i in data: -            if not i[1]: -                ret.append([i[0], None]) -            else: -                ret.append(i) -        return ret - - -class SetCookieEditor(GridEditor): -    title = "Editing response SetCookie header" -    columns = [ -        TextColumn("Name"), -        TextColumn("Value"), -        SubgridColumn("Attributes", CookieAttributeEditor), -    ] - -    def data_in(self, data): -        flattened = [] -        for key, (value, attrs) in data: -            flattened.append([key, value, attrs.items(multi=True)]) -        return flattened - -    def data_out(self, data): -        vals = [] -        for key, value, attrs in data: -            vals.append( -                [ -                    key, -                    (value, attrs) -                ] -            ) -        return vals diff --git a/mitmproxy/console/grideditor/__init__.py b/mitmproxy/console/grideditor/__init__.py new file mode 100644 index 00000000..894f3d22 --- /dev/null +++ b/mitmproxy/console/grideditor/__init__.py @@ -0,0 +1,2 @@ +from .editors import *  # noqa +from . import base  # noqa diff --git a/mitmproxy/console/grideditor/base.py b/mitmproxy/console/grideditor/base.py new file mode 100644 index 00000000..68a0f86a --- /dev/null +++ b/mitmproxy/console/grideditor/base.py @@ -0,0 +1,427 @@ +from __future__ import absolute_import, print_function, division +import abc +import copy + +import six +import urwid +from mitmproxy.console import common +from mitmproxy.console import signals + +from typing import Any  # noqa +from typing import Callable  # noqa +from typing import Container  # noqa +from typing import Iterable  # noqa +from typing import Optional  # noqa +from typing import Sequence  # noqa +from typing import Tuple  # noqa + +FOOTER = [ +    ('heading_key', "enter"), ":edit ", +    ('heading_key', "q"), ":back ", +] +FOOTER_EDITING = [ +    ('heading_key', "esc"), ":stop editing ", +] + + +@six.add_metaclass(abc.ABCMeta) +class Column(object): +    subeditor = None + +    def __init__(self, heading): +        self.heading = heading + +    @abc.abstractmethod +    def Display(self, data): +        # type: () -> Cell +        pass + +    @abc.abstractmethod +    def Edit(self, data): +        # type: () -> Cell +        pass + +    @abc.abstractmethod +    def blank(self): +        # type: () -> Any +        pass + +    def keypress(self, key, editor): +        # type: (str, GridEditor) -> Optional[str] +        return key + + +class Cell(urwid.WidgetWrap): +    __metaclass__ = abc.ABCMeta + +    @abc.abstractmethod +    def get_data(self): +        """ +        Raises: +            ValueError, if the current content is invalid. +        """ +        pass + +    def selectable(self): +        return True + + +class GridRow(urwid.WidgetWrap): +    def __init__( +            self, +            focused,  # type: Optional[int] +            editing,  # type: bool +            editor,  # type: GridEditor +            values  # type: Tuple[Iterable[bytes], Container[int] +    ): +        self.focused = focused +        self.editor = editor +        self.edit_col = None  # type: Optional[Cell] + +        errors = values[1] +        self.fields = [] +        for i, v in enumerate(values[0]): +            if focused == i and editing: +                self.edit_col = self.editor.columns[i].Edit(v) +                self.fields.append(self.edit_col) +            else: +                w = self.editor.columns[i].Display(v) +                if focused == i: +                    if i in errors: +                        w = urwid.AttrWrap(w, "focusfield_error") +                    else: +                        w = urwid.AttrWrap(w, "focusfield") +                elif i in errors: +                    w = urwid.AttrWrap(w, "field_error") +                self.fields.append(w) + +        fspecs = self.fields[:] +        if len(self.fields) > 1: +            fspecs[0] = ("fixed", self.editor.first_width + 2, fspecs[0]) +        w = urwid.Columns( +            fspecs, +            dividechars=2 +        ) +        if focused is not None: +            w.set_focus_column(focused) +        super(GridRow, self).__init__(w) + +    def keypress(self, s, k): +        if self.edit_col: +            w = self._w.column_widths(s)[self.focused] +            k = self.edit_col.keypress((w,), k) +        return k + +    def selectable(self): +        return True + + +class GridWalker(urwid.ListWalker): +    """ +        Stores rows as a list of (rows, errors) tuples, where rows is a list +        and errors is a set with an entry of each offset in rows that is an +        error. +    """ + +    def __init__( +            self, +            lst,  # type: Iterable[list] +            editor  # type: GridEditor +    ): +        self.lst = [(i, set()) for i in lst] +        self.editor = editor +        self.focus = 0 +        self.focus_col = 0 +        self.edit_row = None  # type: Optional[GridRow] + +    def _modified(self): +        self.editor.show_empty_msg() +        return super(GridWalker, self)._modified() + +    def add_value(self, lst): +        self.lst.append( +            (lst[:], set()) +        ) +        self._modified() + +    def get_current_value(self): +        if self.lst: +            return self.lst[self.focus][0][self.focus_col] + +    def set_current_value(self, val): +        errors = self.lst[self.focus][1] +        emsg = self.editor.is_error(self.focus_col, val) +        if emsg: +            signals.status_message.send(message=emsg, expire=5) +            errors.add(self.focus_col) +        else: +            errors.discard(self.focus_col) +        self.set_value(val, self.focus, self.focus_col, errors) + +    def set_value(self, val, focus, focus_col, errors=None): +        if not errors: +            errors = set([]) +        row = list(self.lst[focus][0]) +        row[focus_col] = val +        self.lst[focus] = [tuple(row), errors] +        self._modified() + +    def delete_focus(self): +        if self.lst: +            del self.lst[self.focus] +            self.focus = min(len(self.lst) - 1, self.focus) +            self._modified() + +    def _insert(self, pos): +        self.focus = pos +        self.lst.insert( +            self.focus, +            ([c.blank() for c in self.editor.columns], set([])) +        ) +        self.focus_col = 0 +        self.start_edit() + +    def insert(self): +        return self._insert(self.focus) + +    def add(self): +        return self._insert(min(self.focus + 1, len(self.lst))) + +    def start_edit(self): +        col = self.editor.columns[self.focus_col] +        if self.lst and not col.subeditor: +            self.edit_row = GridRow( +                self.focus_col, True, self.editor, self.lst[self.focus] +            ) +            self.editor.master.loop.widget.footer.update(FOOTER_EDITING) +            self._modified() + +    def stop_edit(self): +        if self.edit_row: +            self.editor.master.loop.widget.footer.update(FOOTER) +            try: +                val = self.edit_row.edit_col.get_data() +            except ValueError: +                return +            self.edit_row = None +            self.set_current_value(val) + +    def left(self): +        self.focus_col = max(self.focus_col - 1, 0) +        self._modified() + +    def right(self): +        self.focus_col = min(self.focus_col + 1, len(self.editor.columns) - 1) +        self._modified() + +    def tab_next(self): +        self.stop_edit() +        if self.focus_col < len(self.editor.columns) - 1: +            self.focus_col += 1 +        elif self.focus != len(self.lst) - 1: +            self.focus_col = 0 +            self.focus += 1 +        self._modified() + +    def get_focus(self): +        if self.edit_row: +            return self.edit_row, self.focus +        elif self.lst: +            return GridRow( +                self.focus_col, +                False, +                self.editor, +                self.lst[self.focus] +            ), self.focus +        else: +            return None, None + +    def set_focus(self, focus): +        self.stop_edit() +        self.focus = focus +        self._modified() + +    def get_next(self, pos): +        if pos + 1 >= len(self.lst): +            return None, None +        return GridRow(None, False, self.editor, self.lst[pos + 1]), pos + 1 + +    def get_prev(self, pos): +        if pos - 1 < 0: +            return None, None +        return GridRow(None, False, self.editor, self.lst[pos - 1]), pos - 1 + + +class GridListBox(urwid.ListBox): +    def __init__(self, lw): +        super(GridListBox, self).__init__(lw) + + +FIRST_WIDTH_MAX = 40 +FIRST_WIDTH_MIN = 20 + + +class GridEditor(urwid.WidgetWrap): +    title = None  # type: str +    columns = None  # type: Sequence[Column] + +    def __init__( +            self, +            master,  # type: "mitmproxy.console.master.ConsoleMaster" +            value,  # type: Any +            callback,  # type: Callable[..., None] +            *cb_args, +            **cb_kwargs +    ): +        value = self.data_in(copy.deepcopy(value)) +        self.master = master +        self.value = value +        self.callback = callback +        self.cb_args = cb_args +        self.cb_kwargs = cb_kwargs + +        first_width = 20 +        if value: +            for r in value: +                assert len(r) == len(self.columns) +                first_width = max(len(r), first_width) +        self.first_width = min(first_width, FIRST_WIDTH_MAX) + +        title = urwid.Text(self.title) +        title = urwid.Padding(title, align="left", width=("relative", 100)) +        title = urwid.AttrWrap(title, "heading") + +        headings = [] +        for i, col in enumerate(self.columns): +            c = urwid.Text(col.heading) +            if i == 0 and len(self.columns) > 1: +                headings.append(("fixed", first_width + 2, c)) +            else: +                headings.append(c) +        h = urwid.Columns( +            headings, +            dividechars=2 +        ) +        h = urwid.AttrWrap(h, "heading") + +        self.walker = GridWalker(self.value, self) +        self.lb = GridListBox(self.walker) +        w = urwid.Frame( +            self.lb, +            header=urwid.Pile([title, h]) +        ) +        super(GridEditor, self).__init__(w) +        self.master.loop.widget.footer.update("") +        self.show_empty_msg() + +    def show_empty_msg(self): +        if self.walker.lst: +            self._w.set_footer(None) +        else: +            self._w.set_footer( +                urwid.Text( +                    [ +                        ("highlight", "No values. Press "), +                        ("key", "a"), +                        ("highlight", " to add some."), +                    ] +                ) +            ) + +    def set_subeditor_value(self, val, focus, focus_col): +        self.walker.set_value(val, focus, focus_col) + +    def keypress(self, size, key): +        if self.walker.edit_row: +            if key in ["esc"]: +                self.walker.stop_edit() +            elif key == "tab": +                pf, pfc = self.walker.focus, self.walker.focus_col +                self.walker.tab_next() +                if self.walker.focus == pf and self.walker.focus_col != pfc: +                    self.walker.start_edit() +            else: +                self._w.keypress(size, key) +            return None + +        key = common.shortcuts(key) +        column = self.columns[self.walker.focus_col] +        if key in ["q", "esc"]: +            res = [] +            for i in self.walker.lst: +                if not i[1] and any([x for x in i[0]]): +                    res.append(i[0]) +            self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs) +            signals.pop_view_state.send(self) +        elif key == "g": +            self.walker.set_focus(0) +        elif key == "G": +            self.walker.set_focus(len(self.walker.lst) - 1) +        elif key in ["h", "left"]: +            self.walker.left() +        elif key in ["l", "right"]: +            self.walker.right() +        elif key == "tab": +            self.walker.tab_next() +        elif key == "a": +            self.walker.add() +        elif key == "A": +            self.walker.insert() +        elif key == "d": +            self.walker.delete_focus() +        elif column.keypress(key, self) and not self.handle_key(key): +            return self._w.keypress(size, key) + +    def data_out(self, data): +        # type: (Sequence[list]) -> Any +        """ +            Called on raw list data, before data is returned through the +            callback. +        """ +        return data + +    def data_in(self, data): +        # type: (Any) -> Iterable[list] +        """ +            Called to prepare provided data. +        """ +        return data + +    def is_error(self, col, val): +        # type: (int, Any) -> Optional[str] +        """ +            Return None, or a string error message. +        """ +        return False + +    def handle_key(self, key): +        return False + +    def make_help(self): +        text = [ +            urwid.Text([("text", "Editor control:\n")]) +        ] +        keys = [ +            ("A", "insert row before cursor"), +            ("a", "add row after cursor"), +            ("d", "delete row"), +            ("e", "spawn external editor on current field"), +            ("q", "save changes and exit editor"), +            ("r", "read value from file"), +            ("R", "read unescaped value from file"), +            ("esc", "save changes and exit editor"), +            ("tab", "next field"), +            ("enter", "edit field"), +        ] +        text.extend( +            common.format_keyvals(keys, key="key", val="text", indent=4) +        ) +        text.append( +            urwid.Text( +                [ +                    "\n", +                    ("text", "Values are escaped Python-style strings.\n"), +                ] +            ) +        ) +        return text diff --git a/mitmproxy/console/grideditor/col_bytes.py b/mitmproxy/console/grideditor/col_bytes.py new file mode 100644 index 00000000..51bbb6cb --- /dev/null +++ b/mitmproxy/console/grideditor/col_bytes.py @@ -0,0 +1,103 @@ +from __future__ import absolute_import, print_function, division + +import os + +import urwid +from mitmproxy.console import signals +from mitmproxy.console.grideditor import base +from netlib import strutils + + +def read_file(filename, callback, escaped): +    # type: (str, Callable[...,None], bool) -> Optional[str] +    if not filename: +        return + +    filename = os.path.expanduser(filename) +    try: +        with open(filename, "r" if escaped else "rb") as f: +            d = f.read() +    except IOError as v: +        return str(v) + +    if escaped: +        try: +            d = strutils.escaped_str_to_bytes(d) +        except ValueError: +            return "Invalid Python-style string encoding." +    # TODO: Refactor the status_prompt_path signal so that we +    # can raise exceptions here and return the content instead. +    callback(d) + + +class Column(base.Column): +    def Display(self, data): +        return Display(data) + +    def Edit(self, data): +        return Edit(data) + +    def blank(self): +        return b"" + +    def keypress(self, key, editor): +        if key == "r": +            if editor.walker.get_current_value() is not None: +                signals.status_prompt_path.send( +                    self, +                    prompt="Read file", +                    callback=read_file, +                    args=(editor.walker.set_current_value, True) +                ) +        elif key == "R": +            if editor.walker.get_current_value() is not None: +                signals.status_prompt_path.send( +                    self, +                    prompt="Read unescaped file", +                    callback=read_file, +                    args=(editor.walker.set_current_value, False) +                ) +        elif key == "e": +            o = editor.walker.get_current_value() +            if o is not None: +                n = editor.master.spawn_editor(o) +                n = strutils.clean_hanging_newline(n) +                editor.walker.set_current_value(n) +        elif key in ["enter"]: +            editor.walker.start_edit() +        else: +            return key + + +class Display(base.Cell): +    def __init__(self, data): +        # type: (bytes) -> Display +        self.data = data +        escaped = strutils.bytes_to_escaped_str(data) +        w = urwid.Text(escaped, wrap="any") +        super(Display, self).__init__(w) + +    def get_data(self): +        return self.data + + +class Edit(base.Cell): +    def __init__(self, data): +        # type: (bytes) -> Edit +        data = strutils.bytes_to_escaped_str(data) +        w = urwid.Edit(edit_text=data, wrap="any", multiline=True) +        w = urwid.AttrWrap(w, "editfield") +        super(Edit, self).__init__(w) + +    def get_data(self): +        # type: () -> bytes +        txt = self._w.get_text()[0].strip() +        try: +            return strutils.escaped_str_to_bytes(txt) +        except ValueError: +            signals.status_message.send( +                self, +                message="Invalid Python-style string encoding.", +                expire=1000 +            ) +            raise diff --git a/mitmproxy/console/grideditor/col_subgrid.py b/mitmproxy/console/grideditor/col_subgrid.py new file mode 100644 index 00000000..1dec8032 --- /dev/null +++ b/mitmproxy/console/grideditor/col_subgrid.py @@ -0,0 +1,51 @@ +from __future__ import absolute_import, print_function, division +import urwid +from mitmproxy.console.grideditor import base +from mitmproxy.console import signals +from netlib.http import cookies + + +class Column(base.Column): +    def __init__(self, heading, subeditor): +        super(Column, self).__init__(heading) +        self.subeditor = subeditor + +    def Edit(self, data): +        raise RuntimeError("SubgridColumn should handle edits itself") + +    def Display(self, data): +        return Display(data) + +    def blank(self): +        return [] + +    def keypress(self, key, editor): +        if key in "rRe": +            signals.status_message.send( +                self, +                message="Press enter to edit this field.", +                expire=1000 +            ) +            return +        elif key in ["enter"]: +            editor.master.view_grideditor( +                self.subeditor( +                    editor.master, +                    editor.walker.get_current_value(), +                    editor.set_subeditor_value, +                    editor.walker.focus, +                    editor.walker.focus_col +                ) +            ) +        else: +            return key + + +class Display(base.Cell): +    def __init__(self, data): +        p = cookies._format_pairs(data, sep="\n") +        w = urwid.Text(p) +        super(Display, self).__init__(w) + +    def get_data(self): +        pass diff --git a/mitmproxy/console/grideditor/col_text.py b/mitmproxy/console/grideditor/col_text.py new file mode 100644 index 00000000..d60dc854 --- /dev/null +++ b/mitmproxy/console/grideditor/col_text.py @@ -0,0 +1,55 @@ +""" +Welcome to the encoding dance! + +In a nutshell, text columns are actually a proxy class for byte columns, +which just encode/decodes contents. +""" +from __future__ import absolute_import, print_function, division + +from mitmproxy.console import signals +from mitmproxy.console.grideditor import col_bytes + + +class Column(col_bytes.Column): +    def __init__(self, heading, encoding="utf8", errors="surrogateescape"): +        super(Column, self).__init__(heading) +        self.encoding_args = encoding, errors + +    def Display(self, data): +        return TDisplay(data, self.encoding_args) + +    def Edit(self, data): +        return TEdit(data, self.encoding_args) + +    def blank(self): +        return u"" + + +# This is the same for both edit and display. +class EncodingMixin(object): +    def __init__(self, data, encoding_args): +        # type: (str) -> TDisplay +        self.encoding_args = encoding_args +        data = data.encode(*self.encoding_args) +        super(EncodingMixin, self).__init__(data) + +    def get_data(self): +        data = super(EncodingMixin, self).get_data() +        try: +            return data.decode(*self.encoding_args) +        except ValueError: +            signals.status_message.send( +                self, +                message="Invalid encoding.", +                expire=1000 +            ) +            raise + + +# urwid forces a different name for a subclass. +class TDisplay(EncodingMixin, col_bytes.Display): +    pass + + +class TEdit(EncodingMixin, col_bytes.Edit): +    pass diff --git a/mitmproxy/console/grideditor/editors.py b/mitmproxy/console/grideditor/editors.py new file mode 100644 index 00000000..80f0541b --- /dev/null +++ b/mitmproxy/console/grideditor/editors.py @@ -0,0 +1,239 @@ +from __future__ import absolute_import, print_function, division +import re +import urwid +from mitmproxy import filt +from mitmproxy.builtins import script +from mitmproxy import exceptions +from mitmproxy.console import common +from mitmproxy.console.grideditor import base +from mitmproxy.console.grideditor import col_bytes +from mitmproxy.console.grideditor import col_text +from mitmproxy.console.grideditor import col_subgrid +from mitmproxy.console import signals +from netlib.http import user_agents + + +class QueryEditor(base.GridEditor): +    title = "Editing query" +    columns = [ +        col_text.Column("Key"), +        col_text.Column("Value") +    ] + + +class HeaderEditor(base.GridEditor): +    title = "Editing headers" +    columns = [ +        col_bytes.Column("Key"), +        col_bytes.Column("Value") +    ] + +    def make_help(self): +        h = super(HeaderEditor, self).make_help() +        text = [ +            urwid.Text([("text", "Special keys:\n")]) +        ] +        keys = [ +            ("U", "add User-Agent header"), +        ] +        text.extend( +            common.format_keyvals(keys, key="key", val="text", indent=4) +        ) +        text.append(urwid.Text([("text", "\n")])) +        text.extend(h) +        return text + +    def set_user_agent(self, k): +        ua = user_agents.get_by_shortcut(k) +        if ua: +            self.walker.add_value( +                [ +                    b"User-Agent", +                    ua[2].encode() +                ] +            ) + +    def handle_key(self, key): +        if key == "U": +            signals.status_prompt_onekey.send( +                prompt="Add User-Agent header:", +                keys=[(i[0], i[1]) for i in user_agents.UASTRINGS], +                callback=self.set_user_agent, +            ) +            return True + + +class URLEncodedFormEditor(base.GridEditor): +    title = "Editing URL-encoded form" +    columns = [ +        col_bytes.Column("Key"), +        col_bytes.Column("Value") +    ] + + +class ReplaceEditor(base.GridEditor): +    title = "Editing replacement patterns" +    columns = [ +        col_text.Column("Filter"), +        col_bytes.Column("Regex"), +        col_bytes.Column("Replacement"), +    ] + +    def is_error(self, col, val): +        if col == 0: +            if not filt.parse(val): +                return "Invalid filter specification." +        elif col == 1: +            try: +                re.compile(val) +            except re.error: +                return "Invalid regular expression." +        return False + + +class SetHeadersEditor(base.GridEditor): +    title = "Editing header set patterns" +    columns = [ +        col_text.Column("Filter"), +        col_bytes.Column("Header"), +        col_bytes.Column("Value"), +    ] + +    def is_error(self, col, val): +        if col == 0: +            if not filt.parse(val): +                return "Invalid filter specification" +        return False + +    def make_help(self): +        h = super(SetHeadersEditor, self).make_help() +        text = [ +            urwid.Text([("text", "Special keys:\n")]) +        ] +        keys = [ +            ("U", "add User-Agent header"), +        ] +        text.extend( +            common.format_keyvals(keys, key="key", val="text", indent=4) +        ) +        text.append(urwid.Text([("text", "\n")])) +        text.extend(h) +        return text + +    def set_user_agent(self, k): +        ua = user_agents.get_by_shortcut(k) +        if ua: +            self.walker.add_value( +                [ +                    ".*", +                    b"User-Agent", +                    ua[2].encode() +                ] +            ) + +    def handle_key(self, key): +        if key == "U": +            signals.status_prompt_onekey.send( +                prompt="Add User-Agent header:", +                keys=[(i[0], i[1]) for i in user_agents.UASTRINGS], +                callback=self.set_user_agent, +            ) +            return True + + +class PathEditor(base.GridEditor): +    # TODO: Next row on enter? + +    title = "Editing URL path components" +    columns = [ +        col_text.Column("Component"), +    ] + +    def data_in(self, data): +        return [[i] for i in data] + +    def data_out(self, data): +        return [i[0] for i in data] + + +class ScriptEditor(base.GridEditor): +    title = "Editing scripts" +    columns = [ +        col_text.Column("Command"), +    ] + +    def is_error(self, col, val): +        try: +            script.parse_command(val) +        except exceptions.AddonError as e: +            return str(e) + + +class HostPatternEditor(base.GridEditor): +    title = "Editing host patterns" +    columns = [ +        col_text.Column("Regex (matched on hostname:port / ip:port)") +    ] + +    def is_error(self, col, val): +        try: +            re.compile(val, re.IGNORECASE) +        except re.error as e: +            return "Invalid regex: %s" % str(e) + +    def data_in(self, data): +        return [[i] for i in data] + +    def data_out(self, data): +        return [i[0] for i in data] + + +class CookieEditor(base.GridEditor): +    title = "Editing request Cookie header" +    columns = [ +        col_text.Column("Name"), +        col_text.Column("Value"), +    ] + + +class CookieAttributeEditor(base.GridEditor): +    title = "Editing Set-Cookie attributes" +    columns = [ +        col_text.Column("Name"), +        col_text.Column("Value"), +    ] + +    def data_out(self, data): +        ret = [] +        for i in data: +            if not i[1]: +                ret.append([i[0], None]) +            else: +                ret.append(i) +        return ret + + +class SetCookieEditor(base.GridEditor): +    title = "Editing response SetCookie header" +    columns = [ +        col_text.Column("Name"), +        col_text.Column("Value"), +        col_subgrid.Column("Attributes", CookieAttributeEditor), +    ] + +    def data_in(self, data): +        flattened = [] +        for key, (value, attrs) in data: +            flattened.append([key, value, attrs.items(multi=True)]) +        return flattened + +    def data_out(self, data): +        vals = [] +        for key, value, attrs in data: +            vals.append( +                [ +                    key, +                    (value, attrs) +                ] +            ) +        return vals diff --git a/mitmproxy/console/master.py b/mitmproxy/console/master.py index db414147..ad46cbb4 100644 --- a/mitmproxy/console/master.py +++ b/mitmproxy/console/master.py @@ -390,13 +390,12 @@ class ConsoleMaster(flow.FlowMaster):              )      def spawn_editor(self, data): -        fd, name = tempfile.mkstemp('', "mproxy") +        text = not isinstance(data, bytes) +        fd, name = tempfile.mkstemp('', "mproxy", text=text)          os.write(fd, data)          os.close(fd) -        c = os.environ.get("EDITOR")          # if no EDITOR is set, assume 'vi' -        if not c: -            c = "vi" +        c = os.environ.get("EDITOR") or "vi"          cmd = shlex.split(c)          cmd.append(name)          self.ui.stop() @@ -404,10 +403,11 @@ class ConsoleMaster(flow.FlowMaster):              subprocess.call(cmd)          except:              signals.status_message.send( -                message = "Can't start editor: %s" % " ".join(c) +                message="Can't start editor: %s" % " ".join(c)              )          else: -            data = open(name, "rb").read() +            with open(name, "r" if text else "rb") as f: +                data = f.read()          self.ui.start()          os.unlink(name)          return data @@ -570,7 +570,7 @@ class ConsoleMaster(flow.FlowMaster):                  self,                  ge,                  None, -                statusbar.StatusBar(self, grideditor.FOOTER), +                statusbar.StatusBar(self, grideditor.base.FOOTER),                  ge.make_help()              )          ) | 
