diff options
author | Aldo Cortesi <aldo@corte.si> | 2016-10-19 21:26:54 +1300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-10-19 21:26:54 +1300 |
commit | b1cf9dd5e38d5386199dff8c49e0dc1b46d8ec72 (patch) | |
tree | fee98428fbf36897aa874fd91fe5c9738bf2626f /mitmproxy/tools/console/grideditor | |
parent | 49346c5248b8aa33acef26f0d55f51dcd2493a59 (diff) | |
parent | 24cf8da27eb56a65bf3e4ceb78bbeacdb1864597 (diff) | |
download | mitmproxy-b1cf9dd5e38d5386199dff8c49e0dc1b46d8ec72.tar.gz mitmproxy-b1cf9dd5e38d5386199dff8c49e0dc1b46d8ec72.tar.bz2 mitmproxy-b1cf9dd5e38d5386199dff8c49e0dc1b46d8ec72.zip |
Merge pull request #1633 from cortesi/refactor2
Continue module structure cleanup
Diffstat (limited to 'mitmproxy/tools/console/grideditor')
-rw-r--r-- | mitmproxy/tools/console/grideditor/__init__.py | 2 | ||||
-rw-r--r-- | mitmproxy/tools/console/grideditor/base.py | 413 | ||||
-rw-r--r-- | mitmproxy/tools/console/grideditor/col_bytes.py | 98 | ||||
-rw-r--r-- | mitmproxy/tools/console/grideditor/col_subgrid.py | 50 | ||||
-rw-r--r-- | mitmproxy/tools/console/grideditor/col_text.py | 54 | ||||
-rw-r--r-- | mitmproxy/tools/console/grideditor/editors.py | 241 |
6 files changed, 858 insertions, 0 deletions
diff --git a/mitmproxy/tools/console/grideditor/__init__.py b/mitmproxy/tools/console/grideditor/__init__.py new file mode 100644 index 00000000..894f3d22 --- /dev/null +++ b/mitmproxy/tools/console/grideditor/__init__.py @@ -0,0 +1,2 @@ +from .editors import * # noqa +from . import base # noqa diff --git a/mitmproxy/tools/console/grideditor/base.py b/mitmproxy/tools/console/grideditor/base.py new file mode 100644 index 00000000..4505bb97 --- /dev/null +++ b/mitmproxy/tools/console/grideditor/base.py @@ -0,0 +1,413 @@ +import abc +import copy +from typing import Any +from typing import Callable +from typing import Container +from typing import Iterable +from typing import Optional +from typing import Sequence +from typing import Tuple + +import urwid +from mitmproxy.tools.console import common +from mitmproxy.tools.console import signals + +FOOTER = [ + ('heading_key', "enter"), ":edit ", + ('heading_key', "q"), ":back ", +] +FOOTER_EDITING = [ + ('heading_key', "esc"), ":stop editing ", +] + + +class Cell(urwid.WidgetWrap): + def get_data(self): + """ + Raises: + ValueError, if the current content is invalid. + """ + raise NotImplementedError() + + def selectable(self): + return True + + +class Column(metaclass=abc.ABCMeta): + subeditor = None + + def __init__(self, heading): + self.heading = heading + + @abc.abstractmethod + def Display(self, data) -> Cell: + pass + + @abc.abstractmethod + def Edit(self, data) -> Cell: + pass + + @abc.abstractmethod + def blank(self) -> Any: + pass + + def keypress(self, key: str, editor: "GridEditor") -> Optional[str]: + return key + + +class GridRow(urwid.WidgetWrap): + def __init__( + self, + focused: Optional[int], + editing: bool, + editor: "GridEditor", + values: Tuple[Iterable[bytes], Container[int]] + ): + self.focused = focused + self.editor = editor + self.edit_col = None # type: Optional[Cell] + + errors = values[1] + self.fields = [] + for i, v in enumerate(values[0]): + if focused == i and editing: + self.edit_col = self.editor.columns[i].Edit(v) + self.fields.append(self.edit_col) + else: + w = self.editor.columns[i].Display(v) + if focused == i: + if i in errors: + w = urwid.AttrWrap(w, "focusfield_error") + else: + w = urwid.AttrWrap(w, "focusfield") + elif i in errors: + w = urwid.AttrWrap(w, "field_error") + self.fields.append(w) + + fspecs = self.fields[:] + if len(self.fields) > 1: + fspecs[0] = ("fixed", self.editor.first_width + 2, fspecs[0]) + w = urwid.Columns( + fspecs, + dividechars=2 + ) + if focused is not None: + w.set_focus_column(focused) + super().__init__(w) + + def keypress(self, s, k): + if self.edit_col: + w = self._w.column_widths(s)[self.focused] + k = self.edit_col.keypress((w,), k) + return k + + def selectable(self): + return True + + +class GridWalker(urwid.ListWalker): + """ + Stores rows as a list of (rows, errors) tuples, where rows is a list + and errors is a set with an entry of each offset in rows that is an + error. + """ + + def __init__( + self, + lst: Iterable[list], + editor: "GridEditor" + ): + self.lst = [(i, set()) for i in lst] + self.editor = editor + self.focus = 0 + self.focus_col = 0 + self.edit_row = None # type: Optional[GridRow] + + def _modified(self): + self.editor.show_empty_msg() + return super()._modified() + + def add_value(self, lst): + self.lst.append( + (lst[:], set()) + ) + self._modified() + + def get_current_value(self): + if self.lst: + return self.lst[self.focus][0][self.focus_col] + + def set_current_value(self, val): + errors = self.lst[self.focus][1] + emsg = self.editor.is_error(self.focus_col, val) + if emsg: + signals.status_message.send(message=emsg, expire=5) + errors.add(self.focus_col) + else: + errors.discard(self.focus_col) + self.set_value(val, self.focus, self.focus_col, errors) + + def set_value(self, val, focus, focus_col, errors=None): + if not errors: + errors = set([]) + row = list(self.lst[focus][0]) + row[focus_col] = val + self.lst[focus] = [tuple(row), errors] + self._modified() + + def delete_focus(self): + if self.lst: + del self.lst[self.focus] + self.focus = min(len(self.lst) - 1, self.focus) + self._modified() + + def _insert(self, pos): + self.focus = pos + self.lst.insert( + self.focus, + ([c.blank() for c in self.editor.columns], set([])) + ) + self.focus_col = 0 + self.start_edit() + + def insert(self): + return self._insert(self.focus) + + def add(self): + return self._insert(min(self.focus + 1, len(self.lst))) + + def start_edit(self): + col = self.editor.columns[self.focus_col] + if self.lst and not col.subeditor: + self.edit_row = GridRow( + self.focus_col, True, self.editor, self.lst[self.focus] + ) + self.editor.master.loop.widget.footer.update(FOOTER_EDITING) + self._modified() + + def stop_edit(self): + if self.edit_row: + self.editor.master.loop.widget.footer.update(FOOTER) + try: + val = self.edit_row.edit_col.get_data() + except ValueError: + return + self.edit_row = None + self.set_current_value(val) + + def left(self): + self.focus_col = max(self.focus_col - 1, 0) + self._modified() + + def right(self): + self.focus_col = min(self.focus_col + 1, len(self.editor.columns) - 1) + self._modified() + + def tab_next(self): + self.stop_edit() + if self.focus_col < len(self.editor.columns) - 1: + self.focus_col += 1 + elif self.focus != len(self.lst) - 1: + self.focus_col = 0 + self.focus += 1 + self._modified() + + def get_focus(self): + if self.edit_row: + return self.edit_row, self.focus + elif self.lst: + return GridRow( + self.focus_col, + False, + self.editor, + self.lst[self.focus] + ), self.focus + else: + return None, None + + def set_focus(self, focus): + self.stop_edit() + self.focus = focus + self._modified() + + def get_next(self, pos): + if pos + 1 >= len(self.lst): + return None, None + return GridRow(None, False, self.editor, self.lst[pos + 1]), pos + 1 + + def get_prev(self, pos): + if pos - 1 < 0: + return None, None + return GridRow(None, False, self.editor, self.lst[pos - 1]), pos - 1 + + +class GridListBox(urwid.ListBox): + def __init__(self, lw): + super().__init__(lw) + + +FIRST_WIDTH_MAX = 40 +FIRST_WIDTH_MIN = 20 + + +class GridEditor(urwid.WidgetWrap): + title = None # type: str + columns = None # type: Sequence[Column] + + def __init__( + self, + master: "mitmproxy.console.master.ConsoleMaster", + value: Any, + callback: Callable[..., None], + *cb_args, + **cb_kwargs + ): + value = self.data_in(copy.deepcopy(value)) + self.master = master + self.value = value + self.callback = callback + self.cb_args = cb_args + self.cb_kwargs = cb_kwargs + + first_width = 20 + if value: + for r in value: + assert len(r) == len(self.columns) + first_width = max(len(r), first_width) + self.first_width = min(first_width, FIRST_WIDTH_MAX) + + title = urwid.Text(self.title) + title = urwid.Padding(title, align="left", width=("relative", 100)) + title = urwid.AttrWrap(title, "heading") + + headings = [] + for i, col in enumerate(self.columns): + c = urwid.Text(col.heading) + if i == 0 and len(self.columns) > 1: + headings.append(("fixed", first_width + 2, c)) + else: + headings.append(c) + h = urwid.Columns( + headings, + dividechars=2 + ) + h = urwid.AttrWrap(h, "heading") + + self.walker = GridWalker(self.value, self) + self.lb = GridListBox(self.walker) + w = urwid.Frame( + self.lb, + header=urwid.Pile([title, h]) + ) + super().__init__(w) + self.master.loop.widget.footer.update("") + self.show_empty_msg() + + def show_empty_msg(self): + if self.walker.lst: + self._w.set_footer(None) + else: + self._w.set_footer( + urwid.Text( + [ + ("highlight", "No values. Press "), + ("key", "a"), + ("highlight", " to add some."), + ] + ) + ) + + def set_subeditor_value(self, val, focus, focus_col): + self.walker.set_value(val, focus, focus_col) + + def keypress(self, size, key): + if self.walker.edit_row: + if key in ["esc"]: + self.walker.stop_edit() + elif key == "tab": + pf, pfc = self.walker.focus, self.walker.focus_col + self.walker.tab_next() + if self.walker.focus == pf and self.walker.focus_col != pfc: + self.walker.start_edit() + else: + self._w.keypress(size, key) + return None + + key = common.shortcuts(key) + column = self.columns[self.walker.focus_col] + if key in ["q", "esc"]: + res = [] + for i in self.walker.lst: + if not i[1] and any([x for x in i[0]]): + res.append(i[0]) + self.callback(self.data_out(res), *self.cb_args, **self.cb_kwargs) + signals.pop_view_state.send(self) + elif key == "g": + self.walker.set_focus(0) + elif key == "G": + self.walker.set_focus(len(self.walker.lst) - 1) + elif key in ["h", "left"]: + self.walker.left() + elif key in ["l", "right"]: + self.walker.right() + elif key == "tab": + self.walker.tab_next() + elif key == "a": + self.walker.add() + elif key == "A": + self.walker.insert() + elif key == "d": + self.walker.delete_focus() + elif column.keypress(key, self) and not self.handle_key(key): + return self._w.keypress(size, key) + + def data_out(self, data: Sequence[list]) -> Any: + """ + Called on raw list data, before data is returned through the + callback. + """ + return data + + def data_in(self, data: Any) -> Iterable[list]: + """ + Called to prepare provided data. + """ + return data + + def is_error(self, col: int, val: Any) -> Optional[str]: + """ + Return None, or a string error message. + """ + return False + + def handle_key(self, key): + return False + + def make_help(self): + text = [ + urwid.Text([("text", "Editor control:\n")]) + ] + keys = [ + ("A", "insert row before cursor"), + ("a", "add row after cursor"), + ("d", "delete row"), + ("e", "spawn external editor on current field"), + ("q", "save changes and exit editor"), + ("r", "read value from file"), + ("R", "read unescaped value from file"), + ("esc", "save changes and exit editor"), + ("tab", "next field"), + ("enter", "edit field"), + ] + text.extend( + common.format_keyvals(keys, key="key", val="text", indent=4) + ) + text.append( + urwid.Text( + [ + "\n", + ("text", "Values are escaped Python-style strings.\n"), + ] + ) + ) + return text diff --git a/mitmproxy/tools/console/grideditor/col_bytes.py b/mitmproxy/tools/console/grideditor/col_bytes.py new file mode 100644 index 00000000..c951ce44 --- /dev/null +++ b/mitmproxy/tools/console/grideditor/col_bytes.py @@ -0,0 +1,98 @@ +import os +from typing import Callable, Optional + +import urwid +from mitmproxy.tools.console import signals +from mitmproxy.tools.console.grideditor import base +from netlib import strutils + + +def read_file(filename: str, callback: Callable[..., None], escaped: bool) -> Optional[str]: + if not filename: + return + + filename = os.path.expanduser(filename) + try: + with open(filename, "r" if escaped else "rb") as f: + d = f.read() + except IOError as v: + return str(v) + + if escaped: + try: + d = strutils.escaped_str_to_bytes(d) + except ValueError: + return "Invalid Python-style string encoding." + # TODO: Refactor the status_prompt_path signal so that we + # can raise exceptions here and return the content instead. + callback(d) + + +class Column(base.Column): + def Display(self, data): + return Display(data) + + def Edit(self, data): + return Edit(data) + + def blank(self): + return b"" + + def keypress(self, key, editor): + if key == "r": + if editor.walker.get_current_value() is not None: + signals.status_prompt_path.send( + self, + prompt="Read file", + callback=read_file, + args=(editor.walker.set_current_value, True) + ) + elif key == "R": + if editor.walker.get_current_value() is not None: + signals.status_prompt_path.send( + self, + prompt="Read unescaped file", + callback=read_file, + args=(editor.walker.set_current_value, False) + ) + elif key == "e": + o = editor.walker.get_current_value() + if o is not None: + n = editor.master.spawn_editor(o) + n = strutils.clean_hanging_newline(n) + editor.walker.set_current_value(n) + elif key in ["enter"]: + editor.walker.start_edit() + else: + return key + + +class Display(base.Cell): + def __init__(self, data: bytes): + self.data = data + escaped = strutils.bytes_to_escaped_str(data) + w = urwid.Text(escaped, wrap="any") + super().__init__(w) + + def get_data(self) -> bytes: + return self.data + + +class Edit(base.Cell): + def __init__(self, data: bytes): + data = strutils.bytes_to_escaped_str(data) + w = urwid.Edit(edit_text=data, wrap="any", multiline=True) + w = urwid.AttrWrap(w, "editfield") + super().__init__(w) + + def get_data(self) -> bytes: + txt = self._w.get_text()[0].strip() + try: + return strutils.escaped_str_to_bytes(txt) + except ValueError: + signals.status_message.send( + self, + message="Invalid Python-style string encoding.", + expire=1000 + ) + raise diff --git a/mitmproxy/tools/console/grideditor/col_subgrid.py b/mitmproxy/tools/console/grideditor/col_subgrid.py new file mode 100644 index 00000000..3147e63d --- /dev/null +++ b/mitmproxy/tools/console/grideditor/col_subgrid.py @@ -0,0 +1,50 @@ +import urwid +from mitmproxy.tools.console.grideditor import base +from mitmproxy.tools.console import signals +from netlib.http import cookies + + +class Column(base.Column): + def __init__(self, heading, subeditor): + super().__init__(heading) + self.subeditor = subeditor + + def Edit(self, data): + raise RuntimeError("SubgridColumn should handle edits itself") + + def Display(self, data): + return Display(data) + + def blank(self): + return [] + + def keypress(self, key, editor): + if key in "rRe": + signals.status_message.send( + self, + message="Press enter to edit this field.", + expire=1000 + ) + return + elif key in ["enter"]: + editor.master.view_grideditor( + self.subeditor( + editor.master, + editor.walker.get_current_value(), + editor.set_subeditor_value, + editor.walker.focus, + editor.walker.focus_col + ) + ) + else: + return key + + +class Display(base.Cell): + def __init__(self, data): + p = cookies._format_pairs(data, sep="\n") + w = urwid.Text(p) + super().__init__(w) + + def get_data(self): + pass diff --git a/mitmproxy/tools/console/grideditor/col_text.py b/mitmproxy/tools/console/grideditor/col_text.py new file mode 100644 index 00000000..2d5192ae --- /dev/null +++ b/mitmproxy/tools/console/grideditor/col_text.py @@ -0,0 +1,54 @@ +""" +Welcome to the encoding dance! + +In a nutshell, text columns are actually a proxy class for byte columns, +which just encode/decodes contents. +""" + +from mitmproxy.tools.console import signals +from mitmproxy.tools.console.grideditor import col_bytes + + +class Column(col_bytes.Column): + def __init__(self, heading, encoding="utf8", errors="surrogateescape"): + super().__init__(heading) + self.encoding_args = encoding, errors + + def Display(self, data): + return TDisplay(data, self.encoding_args) + + def Edit(self, data): + return TEdit(data, self.encoding_args) + + def blank(self): + return u"" + + +# This is the same for both edit and display. +class EncodingMixin: + def __init__(self, data, encoding_args): + # type: (str) -> TDisplay + self.encoding_args = encoding_args + data = data.encode(*self.encoding_args) + super().__init__(data) + + def get_data(self) -> str: + data = super().get_data() + try: + return data.decode(*self.encoding_args) + except ValueError: + signals.status_message.send( + self, + message="Invalid encoding.", + expire=1000 + ) + raise + + +# urwid forces a different name for a subclass. +class TDisplay(EncodingMixin, col_bytes.Display): + pass + + +class TEdit(EncodingMixin, col_bytes.Edit): + pass diff --git a/mitmproxy/tools/console/grideditor/editors.py b/mitmproxy/tools/console/grideditor/editors.py new file mode 100644 index 00000000..64361af7 --- /dev/null +++ b/mitmproxy/tools/console/grideditor/editors.py @@ -0,0 +1,241 @@ +import re +import urwid +from mitmproxy import exceptions +from mitmproxy import flowfilter +from mitmproxy.addons import script +from mitmproxy.tools.console import common +from mitmproxy.tools.console.grideditor import base +from mitmproxy.tools.console.grideditor import col_bytes +from mitmproxy.tools.console.grideditor import col_text +from mitmproxy.tools.console.grideditor import col_subgrid +from mitmproxy.tools.console import signals +from netlib.http import user_agents + + +class QueryEditor(base.GridEditor): + title = "Editing query" + columns = [ + col_text.Column("Key"), + col_text.Column("Value") + ] + + +class HeaderEditor(base.GridEditor): + title = "Editing headers" + columns = [ + col_bytes.Column("Key"), + col_bytes.Column("Value") + ] + + def make_help(self): + h = super().make_help() + text = [ + urwid.Text([("text", "Special keys:\n")]) + ] + keys = [ + ("U", "add User-Agent header"), + ] + text.extend( + common.format_keyvals(keys, key="key", val="text", indent=4) + ) + text.append(urwid.Text([("text", "\n")])) + text.extend(h) + return text + + def set_user_agent(self, k): + ua = user_agents.get_by_shortcut(k) + if ua: + self.walker.add_value( + [ + b"User-Agent", + ua[2].encode() + ] + ) + + def handle_key(self, key): + if key == "U": + signals.status_prompt_onekey.send( + prompt="Add User-Agent header:", + keys=[(i[0], i[1]) for i in user_agents.UASTRINGS], + callback=self.set_user_agent, + ) + return True + + +class URLEncodedFormEditor(base.GridEditor): + title = "Editing URL-encoded form" + columns = [ + col_bytes.Column("Key"), + col_bytes.Column("Value") + ] + + +class ReplaceEditor(base.GridEditor): + title = "Editing replacement patterns" + columns = [ + col_text.Column("Filter"), + col_bytes.Column("Regex"), + col_bytes.Column("Replacement"), + ] + + def is_error(self, col, val): + if col == 0: + if not flowfilter.parse(val): + return "Invalid filter specification." + elif col == 1: + try: + re.compile(val) + except re.error: + return "Invalid regular expression." + return False + + +class SetHeadersEditor(base.GridEditor): + title = "Editing header set patterns" + columns = [ + col_text.Column("Filter"), + col_bytes.Column("Header"), + col_bytes.Column("Value"), + ] + + def is_error(self, col, val): + if col == 0: + if not flowfilter.parse(val): + return "Invalid filter specification" + return False + + def make_help(self): + h = super().make_help() + text = [ + urwid.Text([("text", "Special keys:\n")]) + ] + keys = [ + ("U", "add User-Agent header"), + ] + text.extend( + common.format_keyvals(keys, key="key", val="text", indent=4) + ) + text.append(urwid.Text([("text", "\n")])) + text.extend(h) + return text + + def set_user_agent(self, k): + ua = user_agents.get_by_shortcut(k) + if ua: + self.walker.add_value( + [ + ".*", + b"User-Agent", + ua[2].encode() + ] + ) + + def handle_key(self, key): + if key == "U": + signals.status_prompt_onekey.send( + prompt="Add User-Agent header:", + keys=[(i[0], i[1]) for i in user_agents.UASTRINGS], + callback=self.set_user_agent, + ) + return True + + +class PathEditor(base.GridEditor): + # TODO: Next row on enter? + + title = "Editing URL path components" + columns = [ + col_text.Column("Component"), + ] + + def data_in(self, data): + return [[i] for i in data] + + def data_out(self, data): + return [i[0] for i in data] + + +class ScriptEditor(base.GridEditor): + title = "Editing scripts" + columns = [ + col_text.Column("Command"), + ] + + def is_error(self, col, val): + try: + script.parse_command(val) + except exceptions.AddonError as e: + return str(e) + + +class HostPatternEditor(base.GridEditor): + title = "Editing host patterns" + columns = [ + col_text.Column("Regex (matched on hostname:port / ip:port)") + ] + + def is_error(self, col, val): + try: + re.compile(val, re.IGNORECASE) + except re.error as e: + return "Invalid regex: %s" % str(e) + + def data_in(self, data): + return [[i] for i in data] + + def data_out(self, data): + return [i[0] for i in data] + + +class CookieEditor(base.GridEditor): + title = "Editing request Cookie header" + columns = [ + col_text.Column("Name"), + col_text.Column("Value"), + ] + + +class CookieAttributeEditor(base.GridEditor): + title = "Editing Set-Cookie attributes" + columns = [ + col_text.Column("Name"), + col_text.Column("Value"), + ] + + def data_in(self, data): + return [(k, v or "") for k, v in data] + + def data_out(self, data): + ret = [] + for i in data: + if not i[1]: + ret.append([i[0], None]) + else: + ret.append(i) + return ret + + +class SetCookieEditor(base.GridEditor): + title = "Editing response SetCookie header" + columns = [ + col_text.Column("Name"), + col_text.Column("Value"), + col_subgrid.Column("Attributes", CookieAttributeEditor), + ] + + def data_in(self, data): + flattened = [] + for key, (value, attrs) in data: + flattened.append([key, value, attrs.items(multi=True)]) + return flattened + + def data_out(self, data): + vals = [] + for key, value, attrs in data: + vals.append( + [ + key, + (value, attrs) + ] + ) + return vals |