From d2f5db1f37313abe27d3267a98b9bb6d073707a5 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Tue, 21 Feb 2012 12:42:43 +1300 Subject: connection -> flow in libmitmproxy/console "Flow" is the correct term here - every connection can have multiple flows. --- libmproxy/console/__init__.py | 62 ++--- libmproxy/console/connlist.py | 197 --------------- libmproxy/console/connview.py | 575 ------------------------------------------ libmproxy/console/flowlist.py | 197 +++++++++++++++ libmproxy/console/flowview.py | 575 ++++++++++++++++++++++++++++++++++++++++++ libmproxy/console/help.py | 2 +- 6 files changed, 804 insertions(+), 804 deletions(-) delete mode 100644 libmproxy/console/connlist.py delete mode 100644 libmproxy/console/connview.py create mode 100644 libmproxy/console/flowlist.py create mode 100644 libmproxy/console/flowview.py diff --git a/libmproxy/console/__init__.py b/libmproxy/console/__init__.py index 2130e59f..b07a6fe1 100644 --- a/libmproxy/console/__init__.py +++ b/libmproxy/console/__init__.py @@ -17,7 +17,7 @@ import mailcap, mimetypes, tempfile, os, subprocess, glob, time import os.path, sys import urwid from .. import controller, utils, flow -import connlist, connview, help, common, kveditor, palettes +import flowlist, flowview, help, common, kveditor, palettes EVENTLOG_SIZE = 500 @@ -317,7 +317,7 @@ class ConsoleMaster(flow.FlowMaster): footer_text_help = [ ('heading_key', "q"), ":back", ] - footer_text_connview = [ + footer_text_flowview = [ ('heading_key', "tab"), ":toggle view ", ('heading_key', "?"), ":help ", ('heading_key', "q"), ":back ", @@ -327,7 +327,7 @@ class ConsoleMaster(flow.FlowMaster): self.looptime = 0 self.options = options - self.conn_list_view = None + self.flow_list_view = None self.set_palette() r = self.set_intercept(options.intercept) @@ -383,7 +383,7 @@ class ConsoleMaster(flow.FlowMaster): if f.error: s.run("error", f) s.run("done") - self.refresh_connection(f) + self.refresh_flow(f) self.state.last_script = path def set_script(self, path): @@ -396,7 +396,7 @@ class ConsoleMaster(flow.FlowMaster): def toggle_eventlog(self): self.eventlog = not self.eventlog - self.view_connlist() + self.view_flowlist() def _readflow(self, path): path = os.path.expanduser(path) @@ -481,7 +481,7 @@ class ConsoleMaster(flow.FlowMaster): self.ui = urwid.raw_display.Screen() self.ui.set_terminal_properties(256) self.ui.register_palette(self.palette) - self.conn_list_view = connlist.ConnectionListView(self, self.state) + self.flow_list_view = flowlist.ConnectionListView(self, self.state) self.view = None self.statusbar = None @@ -492,7 +492,7 @@ class ConsoleMaster(flow.FlowMaster): self.prompting = False self.onekey = False - self.view_connlist() + self.view_flowlist() if self.server: slave = controller.Slave(self.masterq, self.server) @@ -506,7 +506,7 @@ class ConsoleMaster(flow.FlowMaster): sys.exit(1) self.ui.run_wrapper(self.loop) - # If True, quit just pops out to connection list view. + # If True, quit just pops out to flow list view. print >> sys.stderr, "Shutting down..." sys.stderr.flush() self.shutdown() @@ -514,7 +514,7 @@ class ConsoleMaster(flow.FlowMaster): def focus_current(self): if self.currentflow: try: - self.conn_list_view.set_focus(self.state.index(self.currentflow)) + self.flow_list_view.set_focus(self.state.index(self.currentflow)) except (IndexError, ValueError): pass @@ -540,29 +540,29 @@ class ConsoleMaster(flow.FlowMaster): self.statusbar = StatusBar(self, self.footer_text_help) self.make_view() - def view_connlist(self): + def view_flowlist(self): if self.ui.started: self.ui.clear() self.focus_current() if self.eventlog: - self.body = connlist.BodyPile(self) + self.body = flowlist.BodyPile(self) else: - self.body = connlist.ConnectionListBox(self) + self.body = flowlist.ConnectionListBox(self) self.statusbar = StatusBar(self, self.footer_text_default) self.header = None self.currentflow = None self.make_view() - self.help_context = connlist.help_context + self.help_context = flowlist.help_context def view_flow(self, flow): - self.body = connview.ConnectionView(self, self.state, flow) - self.header = connview.ConnectionViewHeader(self, flow) - self.statusbar = StatusBar(self, self.footer_text_connview) + self.body = flowview.ConnectionView(self, self.state, flow) + self.header = flowview.ConnectionViewHeader(self, flow) + self.statusbar = StatusBar(self, self.footer_text_flowview) self.currentflow = flow self.make_view() - self.help_context = connview.help_context + self.help_context = flowview.help_context def _write_flows(self, path, flows): self.state.last_saveload = path @@ -603,7 +603,7 @@ class ConsoleMaster(flow.FlowMaster): except flow.FlowReadError, v: return v.strerror f.close() - if self.conn_list_view: + if self.flow_list_view: self.sync_list_view() self.focus_current() @@ -681,7 +681,7 @@ class ConsoleMaster(flow.FlowMaster): self.state.view_body_mode = common.VIEW_BODY_HEX elif v == "p": self.state.view_body_mode = common.VIEW_BODY_PRETTY - self.refresh_connection(self.currentflow) + self.refresh_flow(self.currentflow) def drawscreen(self): size = self.ui.get_cols_rows() @@ -693,7 +693,7 @@ class ConsoleMaster(flow.FlowMaster): if self.currentflow: self.view_flow(self.currentflow) else: - self.view_connlist() + self.view_flowlist() def loop(self): changed = True @@ -851,23 +851,23 @@ class ConsoleMaster(flow.FlowMaster): controller.Master.shutdown(self) def sync_list_view(self): - self.conn_list_view._modified() + self.flow_list_view._modified() - def clear_connections(self): + def clear_flows(self): self.state.clear() self.sync_list_view() - def delete_connection(self, f): + def delete_flow(self, f): self.state.delete_flow(f) self.sync_list_view() - def refresh_connection(self, c): - if hasattr(self.header, "refresh_connection"): - self.header.refresh_connection(c) - if hasattr(self.body, "refresh_connection"): - self.body.refresh_connection(c) - if hasattr(self.statusbar, "refresh_connection"): - self.statusbar.refresh_connection(c) + def refresh_flow(self, c): + if hasattr(self.header, "refresh_flow"): + self.header.refresh_flow(c) + if hasattr(self.body, "refresh_flow"): + self.body.refresh_flow(c) + if hasattr(self.statusbar, "refresh_flow"): + self.statusbar.refresh_flow(c) def process_flow(self, f, r): if self.state.intercept and f.match(self.state.intercept) and not f.request.is_replay(): @@ -875,7 +875,7 @@ class ConsoleMaster(flow.FlowMaster): else: r._ack() self.sync_list_view() - self.refresh_connection(f) + self.refresh_flow(f) def clear_events(self): self.eventlist[:] = [] diff --git a/libmproxy/console/connlist.py b/libmproxy/console/connlist.py deleted file mode 100644 index 6504573a..00000000 --- a/libmproxy/console/connlist.py +++ /dev/null @@ -1,197 +0,0 @@ -import urwid -import common - -def _mkhelp(): - text = [] - keys = [ - ("A", "accept all intercepted connections"), - ("a", "accept this intercepted connection"), - ("C", "clear connection list or eventlog"), - ("d", "delete flow"), - ("D", "duplicate flow"), - ("e", "toggle eventlog"), - ("l", "set limit filter pattern"), - ("L", "load saved flows"), - ("r", "replay request"), - ("V", "revert changes to request"), - ("w", "save all flows matching current limit"), - ("W", "save this flow"), - ("X", "kill and delete connection, even if it's mid-intercept"), - ("tab", "tab between eventlog and connection list"), - ("enter", "view connection"), - ("|", "run script on this flow"), - ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) - return text -help_context = _mkhelp() - - -class EventListBox(urwid.ListBox): - def __init__(self, master): - self.master = master - urwid.ListBox.__init__(self, master.eventlist) - - def keypress(self, size, key): - key = common.shortcuts(key) - if key == "C": - self.master.clear_events() - key = None - return urwid.ListBox.keypress(self, size, key) - - -class BodyPile(urwid.Pile): - def __init__(self, master): - h = urwid.Text("Event log") - h = urwid.Padding(h, align="left", width=("relative", 100)) - - self.inactive_header = urwid.AttrWrap(h, "heading_inactive") - self.active_header = urwid.AttrWrap(h, "heading") - - urwid.Pile.__init__( - self, - [ - ConnectionListBox(master), - urwid.Frame(EventListBox(master), header = self.inactive_header) - ] - ) - self.master = master - self.focus = 0 - - def keypress(self, size, key): - if key == "tab": - self.focus = (self.focus + 1)%len(self.widget_list) - self.set_focus(self.focus) - if self.focus == 1: - self.widget_list[1].header = self.active_header - else: - self.widget_list[1].header = self.inactive_header - key = None - elif key == "v": - self.master.toggle_eventlog() - key = None - - # This is essentially a copypasta from urwid.Pile's keypress handler. - # So much for "closed for modification, but open for extension". - item_rows = None - if len(size)==2: - item_rows = self.get_item_rows( size, focus=True ) - i = self.widget_list.index(self.focus_item) - tsize = self.get_item_size(size,i,True,item_rows) - return self.focus_item.keypress( tsize, key ) - - -class ConnectionItem(common.WWrap): - def __init__(self, master, state, flow, focus): - self.master, self.state, self.flow = master, state, flow - self.focus = focus - w = self.get_text() - common.WWrap.__init__(self, w) - - def get_text(self): - return common.format_flow(self.flow, self.focus) - - def selectable(self): - return True - - def keypress(self, (maxcol,), key): - key = common.shortcuts(key) - if key == "a": - self.flow.accept_intercept() - self.master.sync_list_view() - elif key == "d": - self.flow.kill(self.master) - self.state.delete_flow(self.flow) - self.master.sync_list_view() - elif key == "D": - f = self.master.duplicate_flow(self.flow) - self.master.currentflow = f - self.master.focus_current() - elif key == "r": - r = self.master.replay_request(self.flow) - if r: - self.master.statusbar.message(r) - self.master.sync_list_view() - elif key == "V": - self.state.revert(self.flow) - self.master.sync_list_view() - elif key == "w": - self.master.path_prompt( - "Save flows: ", - self.state.last_saveload, - self.master.save_flows - ) - elif key == "W": - self.master.path_prompt( - "Save this flow: ", - self.state.last_saveload, - self.master.save_one_flow, - self.flow - ) - elif key == "X": - self.flow.kill(self.master) - elif key == "enter": - if self.flow.request: - self.master.view_flow(self.flow) - elif key == "|": - self.master.path_prompt( - "Send flow to script: ", - self.state.last_script, - self.master.run_script_once, - self.flow - ) - else: - return key - - -class ConnectionListView(urwid.ListWalker): - def __init__(self, master, state): - self.master, self.state = master, state - if self.state.flow_count(): - self.set_focus(0) - - def get_focus(self): - f, i = self.state.get_focus() - f = ConnectionItem(self.master, self.state, f, True) if f else None - return f, i - - def set_focus(self, focus): - ret = self.state.set_focus(focus) - self._modified() - return ret - - def get_next(self, pos): - f, i = self.state.get_next(pos) - f = ConnectionItem(self.master, self.state, f, False) if f else None - return f, i - - def get_prev(self, pos): - f, i = self.state.get_prev(pos) - f = ConnectionItem(self.master, self.state, f, False) if f else None - return f, i - - -class ConnectionListBox(urwid.ListBox): - def __init__(self, master): - self.master = master - urwid.ListBox.__init__(self, master.conn_list_view) - - def keypress(self, size, key): - key = common.shortcuts(key) - if key == "A": - self.master.accept_all() - self.master.sync_list_view() - elif key == "C": - self.master.clear_connections() - elif key == "e": - self.master.toggle_eventlog() - elif key == "l": - self.master.prompt("Limit: ", self.master.state.limit_txt, self.master.set_limit) - self.master.sync_list_view() - elif key == "L": - self.master.path_prompt( - "Load flows: ", - self.master.state.last_saveload, - self.master.load_flows_callback - ) - else: - return urwid.ListBox.keypress(self, size, key) diff --git a/libmproxy/console/connview.py b/libmproxy/console/connview.py deleted file mode 100644 index 2bf6a4af..00000000 --- a/libmproxy/console/connview.py +++ /dev/null @@ -1,575 +0,0 @@ -import os, re -import urwid -import common -from .. import utils, encoding, flow - -def _mkhelp(): - text = [] - keys = [ - ("A", "accept all intercepted connections"), - ("a", "accept this intercepted connection"), - ("b", "save request/response body"), - ("d", "delete flow"), - ("D", "duplicate flow"), - ("e", "edit request/response"), - ("m", "change body display mode"), - (None, - common.highlight_key("raw", "r") + - [("text", ": raw data")] - ), - (None, - common.highlight_key("pretty", "p") + - [("text", ": pretty-print XML, HTML and JSON")] - ), - (None, - common.highlight_key("hex", "h") + - [("text", ": hex dump")] - ), - ("p", "previous flow"), - ("r", "replay request"), - ("V", "revert changes to request"), - ("v", "view body in external viewer"), - ("w", "save all flows matching current limit"), - ("W", "save this flow"), - ("z", "encode/decode a request/response"), - ("tab", "toggle request/response view"), - ("space", "next flow"), - ("|", "run script on this flow"), - ] - text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) - return text -help_context = _mkhelp() - - -VIEW_CUTOFF = 1024*100 - -class ConnectionViewHeader(common.WWrap): - def __init__(self, master, f): - self.master, self.flow = master, f - self.w = common.format_flow(f, False, extended=True, padding=0) - - def refresh_connection(self, f): - if f == self.flow: - self.w = common.format_flow(f, False, extended=True, padding=0) - - -class CallbackCache: - @utils.LRUCache(20) - def callback(self, obj, method, *args, **kwargs): - return getattr(obj, method)(*args, **kwargs) -cache = CallbackCache() - - -class ConnectionView(common.WWrap): - REQ = 0 - RESP = 1 - method_options = [ - ("get", "g"), - ("post", "p"), - ("put", "u"), - ("head", "h"), - ("trace", "t"), - ("delete", "d"), - ("options", "o"), - ("edit raw", "e"), - ] - def __init__(self, master, state, flow): - self.master, self.state, self.flow = master, state, flow - if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and flow.response: - self.view_response() - else: - self.view_request() - - def _trailer(self, clen, txt): - rem = clen - VIEW_CUTOFF - if rem > 0: - txt.append(urwid.Text("")) - txt.append( - urwid.Text( - [ - ("highlight", "... %s of data not shown"%utils.pretty_size(rem)) - ] - ) - ) - - def _view_conn_raw(self, content): - txt = [] - for i in utils.cleanBin(content[:VIEW_CUTOFF]).splitlines(): - txt.append( - urwid.Text(("text", i)) - ) - self._trailer(len(content), txt) - return txt - - def _view_conn_binary(self, content): - txt = [] - for offset, hexa, s in utils.hexdump(content[:VIEW_CUTOFF]): - txt.append(urwid.Text([ - ("offset", offset), - " ", - ("text", hexa), - " ", - ("text", s), - ])) - self._trailer(len(content), txt) - return txt - - def _view_conn_xmlish(self, content): - txt = [] - for i in utils.pretty_xmlish(content[:VIEW_CUTOFF]): - txt.append( - urwid.Text(("text", i)), - ) - self._trailer(len(content), txt) - return txt - - def _view_conn_json(self, lines): - txt = [] - sofar = 0 - for i in lines: - sofar += len(i) - txt.append( - urwid.Text(("text", i)), - ) - if sofar > VIEW_CUTOFF: - break - self._trailer(sum(len(i) for i in lines), txt) - return txt - - def _view_conn_formdata(self, content, boundary): - rx = re.compile(r'\bname="([^"]+)"') - keys = [] - vals = [] - - for i in content.split("--" + boundary): - parts = i.splitlines() - if len(parts) > 1 and parts[0][0:2] != "--": - match = rx.search(parts[1]) - if match: - keys.append(match.group(1) + ":") - vals.append(utils.cleanBin( - "\n".join(parts[3+parts[2:].index(""):]) - )) - r = [ - urwid.Text(("highlight", "Form data:\n")), - ] - r.extend(common.format_keyvals( - zip(keys, vals), - key = "header", - val = "text" - )) - return r - - def _view_conn_urlencoded(self, lines): - return common.format_keyvals( - [(k+":", v) for (k, v) in lines], - key = "header", - val = "text" - ) - - - def _find_pretty_view(self, content, hdrItems): - ctype = None - for i in hdrItems: - if i[0].lower() == "content-type": - ctype = i[1] - break - if ctype and flow.HDR_FORM_URLENCODED in ctype: - data = utils.urldecode(content) - if data: - return "URLEncoded form", self._view_conn_urlencoded(data) - if utils.isXML(content): - return "Indented XML-ish", self._view_conn_xmlish(content) - elif ctype and "application/json" in ctype: - lines = utils.pretty_json(content) - if lines: - return "JSON", self._view_conn_json(lines) - elif ctype and "multipart/form-data" in ctype: - boundary = ctype.split('boundary=') - if len(boundary) > 1: - return "Form data", self._view_conn_formdata(content, boundary[1].split(';')[0]) - return "", self._view_conn_raw(content) - - def _cached_conn_text(self, e, content, hdrItems, viewmode): - txt = common.format_keyvals( - [(h+":", v) for (h, v) in hdrItems], - key = "header", - val = "text" - ) - if content: - msg = "" - if viewmode == common.VIEW_BODY_HEX: - body = self._view_conn_binary(content) - elif viewmode == common.VIEW_BODY_PRETTY: - emsg = "" - if e: - decoded = encoding.decode(e, content) - if decoded: - content = decoded - if e and e != "identity": - emsg = "[decoded %s]"%e - msg, body = self._find_pretty_view(content, hdrItems) - if emsg: - msg = emsg + " " + msg - else: - body = self._view_conn_raw(content) - - title = urwid.AttrWrap(urwid.Columns([ - urwid.Text( - [ - ("heading", msg), - ] - ), - urwid.Text( - [ - " ", - ('heading', "["), - ('heading_key', "m"), - ('heading', (":%s]"%common.BODY_VIEWS[self.master.state.view_body_mode])), - ], - align="right" - ), - ]), "heading") - txt.append(title) - txt.extend(body) - return urwid.ListBox(txt) - - def _tab(self, content, attr): - p = urwid.Text(content) - p = urwid.Padding(p, align="left", width=("relative", 100)) - p = urwid.AttrWrap(p, attr) - return p - - def wrap_body(self, active, body): - parts = [] - - if self.flow.intercepting and not self.flow.request.acked: - qt = "Request intercepted" - else: - qt = "Request" - if active == common.VIEW_FLOW_REQUEST: - parts.append(self._tab(qt, "heading")) - else: - parts.append(self._tab(qt, "heading_inactive")) - - if self.flow.intercepting and self.flow.response and not self.flow.response.acked: - st = "Response intercepted" - else: - st = "Response" - if active == common.VIEW_FLOW_RESPONSE: - parts.append(self._tab(st, "heading")) - else: - parts.append(self._tab(st, "heading_inactive")) - - h = urwid.Columns(parts) - f = urwid.Frame( - body, - header=h - ) - return f - - def _conn_text(self, conn, viewmode): - e = conn.headers["content-encoding"] - e = e[0] if e else None - return cache.callback( - self, "_cached_conn_text", - e, - conn.content, - tuple(tuple(i) for i in conn.headers.lst), - viewmode - ) - - def view_request(self): - self.state.view_flow_mode = common.VIEW_FLOW_REQUEST - body = self._conn_text( - self.flow.request, - self.state.view_body_mode - ) - self.w = self.wrap_body(common.VIEW_FLOW_REQUEST, body) - self.master.statusbar.redraw() - - def view_response(self): - self.state.view_flow_mode = common.VIEW_FLOW_RESPONSE - if self.flow.response: - body = self._conn_text( - self.flow.response, - self.state.view_body_mode - ) - else: - body = urwid.ListBox( - [ - urwid.Text(""), - urwid.Text( - [ - ("highlight", "No response. Press "), - ("key", "e"), - ("highlight", " and edit any aspect to add one."), - ] - ) - ] - ) - self.w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body) - self.master.statusbar.redraw() - - def refresh_connection(self, c=None): - if c == self.flow: - if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response: - self.view_response() - else: - self.view_request() - - def set_method_raw(self, m): - if m: - self.flow.request.method = m - self.master.refresh_connection(self.flow) - - def edit_method(self, m): - if m == "e": - self.master.prompt_edit("Method", self.flow.request.method, self.set_method_raw) - else: - for i in self.method_options: - if i[1] == m: - self.flow.request.method = i[0].upper() - self.master.refresh_connection(self.flow) - - def save_body(self, path): - if not path: - return - self.state.last_saveload = path - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - c = self.flow.request - else: - c = self.flow.response - path = os.path.expanduser(path) - try: - f = file(path, "wb") - f.write(str(c.content)) - f.close() - except IOError, v: - self.master.statusbar.message(v.strerror) - - def set_url(self, url): - request = self.flow.request - if not request.set_url(str(url)): - return "Invalid URL." - self.master.refresh_connection(self.flow) - - def set_resp_code(self, code): - response = self.flow.response - try: - response.code = int(code) - except ValueError: - return None - import BaseHTTPServer - if BaseHTTPServer.BaseHTTPRequestHandler.responses.has_key(int(code)): - response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[int(code)][0] - self.master.refresh_connection(self.flow) - - def set_resp_msg(self, msg): - response = self.flow.response - response.msg = msg - self.master.refresh_connection(self.flow) - - def set_headers(self, lst, conn): - conn.headers = flow.ODict(lst) - - def set_query(self, lst, conn): - conn.set_query(flow.ODict(lst)) - - def set_form(self, lst, conn): - conn.set_form_urlencoded(flow.ODict(lst)) - - def edit(self, part): - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - conn = self.flow.request - else: - if not self.flow.response: - self.flow.response = flow.Response(self.flow.request, 200, "OK", flow.ODict(), "") - conn = self.flow.response - - self.flow.backup() - if part == "r": - c = self.master.spawn_editor(conn.content or "") - conn.content = c.rstrip("\n") - elif part == "f": - self.master.view_kveditor("Editing form", conn.get_form_urlencoded().lst, self.set_form, conn) - elif part == "h": - self.master.view_kveditor("Editing headers", conn.headers.lst, self.set_headers, conn) - elif part == "q": - self.master.view_kveditor("Editing query", conn.get_query().lst, self.set_query, conn) - elif part == "u" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - self.master.prompt_edit("URL", conn.get_url(), self.set_url) - elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - self.master.prompt_onekey("Method", self.method_options, self.edit_method) - elif part == "c" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: - self.master.prompt_edit("Code", str(conn.code), self.set_resp_code) - elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: - self.master.prompt_edit("Message", conn.msg, self.set_resp_msg) - self.master.refresh_connection(self.flow) - - def _view_nextprev_flow(self, np, flow): - try: - idx = self.state.view.index(flow) - except IndexError: - return - if np == "next": - new_flow, new_idx = self.state.get_next(idx) - else: - new_flow, new_idx = self.state.get_prev(idx) - if new_idx is None: - return - self.master.view_flow(new_flow) - - def view_next_flow(self, flow): - return self._view_nextprev_flow("next", flow) - - def view_prev_flow(self, flow): - return self._view_nextprev_flow("prev", flow) - - def keypress(self, size, key): - if key == " ": - self.view_next_flow(self.flow) - return key - - key = common.shortcuts(key) - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - conn = self.flow.request - else: - conn = self.flow.response - - if key == "q": - self.master.view_connlist() - key = None - elif key == "tab": - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - self.view_response() - else: - self.view_request() - elif key in ("up", "down", "page up", "page down"): - # Why doesn't this just work?? - self.w.keypress(size, key) - elif key == "a": - self.flow.accept_intercept() - self.master.view_flow(self.flow) - elif key == "A": - self.master.accept_all() - self.master.view_flow(self.flow) - elif key == "d": - if self.state.flow_count() == 1: - self.master.view_connlist() - elif self.state.view.index(self.flow) == len(self.state.view)-1: - self.view_prev_flow(self.flow) - else: - self.view_next_flow(self.flow) - f = self.flow - f.kill(self.master) - self.state.delete_flow(f) - elif key == "D": - f = self.master.duplicate_flow(self.flow) - self.master.view_flow(f) - self.master.currentflow = f - self.master.statusbar.message("Duplicated.") - elif key == "e": - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - self.master.prompt_onekey( - "Edit request", - ( - ("query", "q"), - ("form", "f"), - ("url", "u"), - ("header", "h"), - ("raw body", "r"), - ("method", "m"), - ), - self.edit - ) - else: - self.master.prompt_onekey( - "Edit response", - ( - ("code", "c"), - ("message", "m"), - ("header", "h"), - ("raw body", "r"), - ), - self.edit - ) - key = None - elif key == "m": - self.master.prompt_onekey( - "View", - ( - ("raw", "r"), - ("pretty", "p"), - ("hex", "h"), - ), - self.master.changeview - ) - key = None - elif key == "p": - self.view_prev_flow(self.flow) - elif key == "r": - r = self.master.replay_request(self.flow) - if r: - self.master.statusbar.message(r) - self.master.refresh_connection(self.flow) - elif key == "V": - self.state.revert(self.flow) - self.master.refresh_connection(self.flow) - elif key == "W": - self.master.path_prompt( - "Save this flow: ", - self.state.last_saveload, - self.master.save_one_flow, - self.flow - ) - elif key == "v": - if conn and conn.content: - t = conn.headers["content-type"] or [None] - t = t[0] - self.master.spawn_external_viewer(conn.content, t) - elif key == "b": - if conn: - if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: - self.master.path_prompt( - "Save request body: ", - self.state.last_saveload, - self.save_body - ) - else: - self.master.path_prompt( - "Save response body: ", - self.state.last_saveload, - self.save_body - ) - elif key == "|": - self.master.path_prompt( - "Send flow to script: ", self.state.last_script, - self.master.run_script_once, self.flow - ) - elif key == "z": - if conn: - e = conn.headers["content-encoding"] or ["identity"] - if e[0] != "identity": - conn.decode() - else: - self.master.prompt_onekey( - "Select encoding: ", - ( - ("gzip", "z"), - ("deflate", "d"), - ), - self.encode_callback, - conn - ) - self.master.refresh_connection(self.flow) - else: - return key - - def encode_callback(self, key, conn): - encoding_map = { - "z": "gzip", - "d": "deflate", - } - conn.encode(encoding_map[key]) - self.master.refresh_connection(self.flow) diff --git a/libmproxy/console/flowlist.py b/libmproxy/console/flowlist.py new file mode 100644 index 00000000..d8fbe613 --- /dev/null +++ b/libmproxy/console/flowlist.py @@ -0,0 +1,197 @@ +import urwid +import common + +def _mkhelp(): + text = [] + keys = [ + ("A", "accept all intercepted flows"), + ("a", "accept this intercepted flows"), + ("C", "clear flow list or eventlog"), + ("d", "delete flow"), + ("D", "duplicate flow"), + ("e", "toggle eventlog"), + ("l", "set limit filter pattern"), + ("L", "load saved flows"), + ("r", "replay request"), + ("V", "revert changes to request"), + ("w", "save all flows matching current limit"), + ("W", "save this flow"), + ("X", "kill and delete flow, even if it's mid-intercept"), + ("tab", "tab between eventlog and flow list"), + ("enter", "view flow"), + ("|", "run script on this flow"), + ] + text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + return text +help_context = _mkhelp() + + +class EventListBox(urwid.ListBox): + def __init__(self, master): + self.master = master + urwid.ListBox.__init__(self, master.eventlist) + + def keypress(self, size, key): + key = common.shortcuts(key) + if key == "C": + self.master.clear_events() + key = None + return urwid.ListBox.keypress(self, size, key) + + +class BodyPile(urwid.Pile): + def __init__(self, master): + h = urwid.Text("Event log") + h = urwid.Padding(h, align="left", width=("relative", 100)) + + self.inactive_header = urwid.AttrWrap(h, "heading_inactive") + self.active_header = urwid.AttrWrap(h, "heading") + + urwid.Pile.__init__( + self, + [ + ConnectionListBox(master), + urwid.Frame(EventListBox(master), header = self.inactive_header) + ] + ) + self.master = master + self.focus = 0 + + def keypress(self, size, key): + if key == "tab": + self.focus = (self.focus + 1)%len(self.widget_list) + self.set_focus(self.focus) + if self.focus == 1: + self.widget_list[1].header = self.active_header + else: + self.widget_list[1].header = self.inactive_header + key = None + elif key == "v": + self.master.toggle_eventlog() + key = None + + # This is essentially a copypasta from urwid.Pile's keypress handler. + # So much for "closed for modification, but open for extension". + item_rows = None + if len(size)==2: + item_rows = self.get_item_rows( size, focus=True ) + i = self.widget_list.index(self.focus_item) + tsize = self.get_item_size(size,i,True,item_rows) + return self.focus_item.keypress( tsize, key ) + + +class ConnectionItem(common.WWrap): + def __init__(self, master, state, flow, focus): + self.master, self.state, self.flow = master, state, flow + self.focus = focus + w = self.get_text() + common.WWrap.__init__(self, w) + + def get_text(self): + return common.format_flow(self.flow, self.focus) + + def selectable(self): + return True + + def keypress(self, (maxcol,), key): + key = common.shortcuts(key) + if key == "a": + self.flow.accept_intercept() + self.master.sync_list_view() + elif key == "d": + self.flow.kill(self.master) + self.state.delete_flow(self.flow) + self.master.sync_list_view() + elif key == "D": + f = self.master.duplicate_flow(self.flow) + self.master.currentflow = f + self.master.focus_current() + elif key == "r": + r = self.master.replay_request(self.flow) + if r: + self.master.statusbar.message(r) + self.master.sync_list_view() + elif key == "V": + self.state.revert(self.flow) + self.master.sync_list_view() + elif key == "w": + self.master.path_prompt( + "Save flows: ", + self.state.last_saveload, + self.master.save_flows + ) + elif key == "W": + self.master.path_prompt( + "Save this flow: ", + self.state.last_saveload, + self.master.save_one_flow, + self.flow + ) + elif key == "X": + self.flow.kill(self.master) + elif key == "enter": + if self.flow.request: + self.master.view_flow(self.flow) + elif key == "|": + self.master.path_prompt( + "Send flow to script: ", + self.state.last_script, + self.master.run_script_once, + self.flow + ) + else: + return key + + +class ConnectionListView(urwid.ListWalker): + def __init__(self, master, state): + self.master, self.state = master, state + if self.state.flow_count(): + self.set_focus(0) + + def get_focus(self): + f, i = self.state.get_focus() + f = ConnectionItem(self.master, self.state, f, True) if f else None + return f, i + + def set_focus(self, focus): + ret = self.state.set_focus(focus) + self._modified() + return ret + + def get_next(self, pos): + f, i = self.state.get_next(pos) + f = ConnectionItem(self.master, self.state, f, False) if f else None + return f, i + + def get_prev(self, pos): + f, i = self.state.get_prev(pos) + f = ConnectionItem(self.master, self.state, f, False) if f else None + return f, i + + +class ConnectionListBox(urwid.ListBox): + def __init__(self, master): + self.master = master + urwid.ListBox.__init__(self, master.flow_list_view) + + def keypress(self, size, key): + key = common.shortcuts(key) + if key == "A": + self.master.accept_all() + self.master.sync_list_view() + elif key == "C": + self.master.clear_flows() + elif key == "e": + self.master.toggle_eventlog() + elif key == "l": + self.master.prompt("Limit: ", self.master.state.limit_txt, self.master.set_limit) + self.master.sync_list_view() + elif key == "L": + self.master.path_prompt( + "Load flows: ", + self.master.state.last_saveload, + self.master.load_flows_callback + ) + else: + return urwid.ListBox.keypress(self, size, key) diff --git a/libmproxy/console/flowview.py b/libmproxy/console/flowview.py new file mode 100644 index 00000000..dc63c067 --- /dev/null +++ b/libmproxy/console/flowview.py @@ -0,0 +1,575 @@ +import os, re +import urwid +import common +from .. import utils, encoding, flow + +def _mkhelp(): + text = [] + keys = [ + ("A", "accept all intercepted flows"), + ("a", "accept this intercepted flow"), + ("b", "save request/response body"), + ("d", "delete flow"), + ("D", "duplicate flow"), + ("e", "edit request/response"), + ("m", "change body display mode"), + (None, + common.highlight_key("raw", "r") + + [("text", ": raw data")] + ), + (None, + common.highlight_key("pretty", "p") + + [("text", ": pretty-print XML, HTML and JSON")] + ), + (None, + common.highlight_key("hex", "h") + + [("text", ": hex dump")] + ), + ("p", "previous flow"), + ("r", "replay request"), + ("V", "revert changes to request"), + ("v", "view body in external viewer"), + ("w", "save all flows matching current limit"), + ("W", "save this flow"), + ("z", "encode/decode a request/response"), + ("tab", "toggle request/response view"), + ("space", "next flow"), + ("|", "run script on this flow"), + ] + text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) + return text +help_context = _mkhelp() + + +VIEW_CUTOFF = 1024*100 + +class ConnectionViewHeader(common.WWrap): + def __init__(self, master, f): + self.master, self.flow = master, f + self.w = common.format_flow(f, False, extended=True, padding=0) + + def refresh_flow(self, f): + if f == self.flow: + self.w = common.format_flow(f, False, extended=True, padding=0) + + +class CallbackCache: + @utils.LRUCache(20) + def callback(self, obj, method, *args, **kwargs): + return getattr(obj, method)(*args, **kwargs) +cache = CallbackCache() + + +class ConnectionView(common.WWrap): + REQ = 0 + RESP = 1 + method_options = [ + ("get", "g"), + ("post", "p"), + ("put", "u"), + ("head", "h"), + ("trace", "t"), + ("delete", "d"), + ("options", "o"), + ("edit raw", "e"), + ] + def __init__(self, master, state, flow): + self.master, self.state, self.flow = master, state, flow + if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and flow.response: + self.view_response() + else: + self.view_request() + + def _trailer(self, clen, txt): + rem = clen - VIEW_CUTOFF + if rem > 0: + txt.append(urwid.Text("")) + txt.append( + urwid.Text( + [ + ("highlight", "... %s of data not shown"%utils.pretty_size(rem)) + ] + ) + ) + + def _view_flow_raw(self, content): + txt = [] + for i in utils.cleanBin(content[:VIEW_CUTOFF]).splitlines(): + txt.append( + urwid.Text(("text", i)) + ) + self._trailer(len(content), txt) + return txt + + def _view_flow_binary(self, content): + txt = [] + for offset, hexa, s in utils.hexdump(content[:VIEW_CUTOFF]): + txt.append(urwid.Text([ + ("offset", offset), + " ", + ("text", hexa), + " ", + ("text", s), + ])) + self._trailer(len(content), txt) + return txt + + def _view_flow_xmlish(self, content): + txt = [] + for i in utils.pretty_xmlish(content[:VIEW_CUTOFF]): + txt.append( + urwid.Text(("text", i)), + ) + self._trailer(len(content), txt) + return txt + + def _view_flow_json(self, lines): + txt = [] + sofar = 0 + for i in lines: + sofar += len(i) + txt.append( + urwid.Text(("text", i)), + ) + if sofar > VIEW_CUTOFF: + break + self._trailer(sum(len(i) for i in lines), txt) + return txt + + def _view_flow_formdata(self, content, boundary): + rx = re.compile(r'\bname="([^"]+)"') + keys = [] + vals = [] + + for i in content.split("--" + boundary): + parts = i.splitlines() + if len(parts) > 1 and parts[0][0:2] != "--": + match = rx.search(parts[1]) + if match: + keys.append(match.group(1) + ":") + vals.append(utils.cleanBin( + "\n".join(parts[3+parts[2:].index(""):]) + )) + r = [ + urwid.Text(("highlight", "Form data:\n")), + ] + r.extend(common.format_keyvals( + zip(keys, vals), + key = "header", + val = "text" + )) + return r + + def _view_flow_urlencoded(self, lines): + return common.format_keyvals( + [(k+":", v) for (k, v) in lines], + key = "header", + val = "text" + ) + + + def _find_pretty_view(self, content, hdrItems): + ctype = None + for i in hdrItems: + if i[0].lower() == "content-type": + ctype = i[1] + break + if ctype and flow.HDR_FORM_URLENCODED in ctype: + data = utils.urldecode(content) + if data: + return "URLEncoded form", self._view_flow_urlencoded(data) + if utils.isXML(content): + return "Indented XML-ish", self._view_flow_xmlish(content) + elif ctype and "application/json" in ctype: + lines = utils.pretty_json(content) + if lines: + return "JSON", self._view_flow_json(lines) + elif ctype and "multipart/form-data" in ctype: + boundary = ctype.split('boundary=') + if len(boundary) > 1: + return "Form data", self._view_flow_formdata(content, boundary[1].split(';')[0]) + return "", self._view_flow_raw(content) + + def _cached_conn_text(self, e, content, hdrItems, viewmode): + txt = common.format_keyvals( + [(h+":", v) for (h, v) in hdrItems], + key = "header", + val = "text" + ) + if content: + msg = "" + if viewmode == common.VIEW_BODY_HEX: + body = self._view_flow_binary(content) + elif viewmode == common.VIEW_BODY_PRETTY: + emsg = "" + if e: + decoded = encoding.decode(e, content) + if decoded: + content = decoded + if e and e != "identity": + emsg = "[decoded %s]"%e + msg, body = self._find_pretty_view(content, hdrItems) + if emsg: + msg = emsg + " " + msg + else: + body = self._view_flow_raw(content) + + title = urwid.AttrWrap(urwid.Columns([ + urwid.Text( + [ + ("heading", msg), + ] + ), + urwid.Text( + [ + " ", + ('heading', "["), + ('heading_key', "m"), + ('heading', (":%s]"%common.BODY_VIEWS[self.master.state.view_body_mode])), + ], + align="right" + ), + ]), "heading") + txt.append(title) + txt.extend(body) + return urwid.ListBox(txt) + + def _tab(self, content, attr): + p = urwid.Text(content) + p = urwid.Padding(p, align="left", width=("relative", 100)) + p = urwid.AttrWrap(p, attr) + return p + + def wrap_body(self, active, body): + parts = [] + + if self.flow.intercepting and not self.flow.request.acked: + qt = "Request intercepted" + else: + qt = "Request" + if active == common.VIEW_FLOW_REQUEST: + parts.append(self._tab(qt, "heading")) + else: + parts.append(self._tab(qt, "heading_inactive")) + + if self.flow.intercepting and self.flow.response and not self.flow.response.acked: + st = "Response intercepted" + else: + st = "Response" + if active == common.VIEW_FLOW_RESPONSE: + parts.append(self._tab(st, "heading")) + else: + parts.append(self._tab(st, "heading_inactive")) + + h = urwid.Columns(parts) + f = urwid.Frame( + body, + header=h + ) + return f + + def _conn_text(self, conn, viewmode): + e = conn.headers["content-encoding"] + e = e[0] if e else None + return cache.callback( + self, "_cached_conn_text", + e, + conn.content, + tuple(tuple(i) for i in conn.headers.lst), + viewmode + ) + + def view_request(self): + self.state.view_flow_mode = common.VIEW_FLOW_REQUEST + body = self._conn_text( + self.flow.request, + self.state.view_body_mode + ) + self.w = self.wrap_body(common.VIEW_FLOW_REQUEST, body) + self.master.statusbar.redraw() + + def view_response(self): + self.state.view_flow_mode = common.VIEW_FLOW_RESPONSE + if self.flow.response: + body = self._conn_text( + self.flow.response, + self.state.view_body_mode + ) + else: + body = urwid.ListBox( + [ + urwid.Text(""), + urwid.Text( + [ + ("highlight", "No response. Press "), + ("key", "e"), + ("highlight", " and edit any aspect to add one."), + ] + ) + ] + ) + self.w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body) + self.master.statusbar.redraw() + + def refresh_flow(self, c=None): + if c == self.flow: + if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response: + self.view_response() + else: + self.view_request() + + def set_method_raw(self, m): + if m: + self.flow.request.method = m + self.master.refresh_flow(self.flow) + + def edit_method(self, m): + if m == "e": + self.master.prompt_edit("Method", self.flow.request.method, self.set_method_raw) + else: + for i in self.method_options: + if i[1] == m: + self.flow.request.method = i[0].upper() + self.master.refresh_flow(self.flow) + + def save_body(self, path): + if not path: + return + self.state.last_saveload = path + if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + c = self.flow.request + else: + c = self.flow.response + path = os.path.expanduser(path) + try: + f = file(path, "wb") + f.write(str(c.content)) + f.close() + except IOError, v: + self.master.statusbar.message(v.strerror) + + def set_url(self, url): + request = self.flow.request + if not request.set_url(str(url)): + return "Invalid URL." + self.master.refresh_flow(self.flow) + + def set_resp_code(self, code): + response = self.flow.response + try: + response.code = int(code) + except ValueError: + return None + import BaseHTTPServer + if BaseHTTPServer.BaseHTTPRequestHandler.responses.has_key(int(code)): + response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[int(code)][0] + self.master.refresh_flow(self.flow) + + def set_resp_msg(self, msg): + response = self.flow.response + response.msg = msg + self.master.refresh_flow(self.flow) + + def set_headers(self, lst, conn): + conn.headers = flow.ODict(lst) + + def set_query(self, lst, conn): + conn.set_query(flow.ODict(lst)) + + def set_form(self, lst, conn): + conn.set_form_urlencoded(flow.ODict(lst)) + + def edit(self, part): + if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + conn = self.flow.request + else: + if not self.flow.response: + self.flow.response = flow.Response(self.flow.request, 200, "OK", flow.ODict(), "") + conn = self.flow.response + + self.flow.backup() + if part == "r": + c = self.master.spawn_editor(conn.content or "") + conn.content = c.rstrip("\n") + elif part == "f": + self.master.view_kveditor("Editing form", conn.get_form_urlencoded().lst, self.set_form, conn) + elif part == "h": + self.master.view_kveditor("Editing headers", conn.headers.lst, self.set_headers, conn) + elif part == "q": + self.master.view_kveditor("Editing query", conn.get_query().lst, self.set_query, conn) + elif part == "u" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + self.master.prompt_edit("URL", conn.get_url(), self.set_url) + elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + self.master.prompt_onekey("Method", self.method_options, self.edit_method) + elif part == "c" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: + self.master.prompt_edit("Code", str(conn.code), self.set_resp_code) + elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: + self.master.prompt_edit("Message", conn.msg, self.set_resp_msg) + self.master.refresh_flow(self.flow) + + def _view_nextprev_flow(self, np, flow): + try: + idx = self.state.view.index(flow) + except IndexError: + return + if np == "next": + new_flow, new_idx = self.state.get_next(idx) + else: + new_flow, new_idx = self.state.get_prev(idx) + if new_idx is None: + return + self.master.view_flow(new_flow) + + def view_next_flow(self, flow): + return self._view_nextprev_flow("next", flow) + + def view_prev_flow(self, flow): + return self._view_nextprev_flow("prev", flow) + + def keypress(self, size, key): + if key == " ": + self.view_next_flow(self.flow) + return key + + key = common.shortcuts(key) + if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + conn = self.flow.request + else: + conn = self.flow.response + + if key == "q": + self.master.view_flowlist() + key = None + elif key == "tab": + if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + self.view_response() + else: + self.view_request() + elif key in ("up", "down", "page up", "page down"): + # Why doesn't this just work?? + self.w.keypress(size, key) + elif key == "a": + self.flow.accept_intercept() + self.master.view_flow(self.flow) + elif key == "A": + self.master.accept_all() + self.master.view_flow(self.flow) + elif key == "d": + if self.state.flow_count() == 1: + self.master.view_flowlist() + elif self.state.view.index(self.flow) == len(self.state.view)-1: + self.view_prev_flow(self.flow) + else: + self.view_next_flow(self.flow) + f = self.flow + f.kill(self.master) + self.state.delete_flow(f) + elif key == "D": + f = self.master.duplicate_flow(self.flow) + self.master.view_flow(f) + self.master.currentflow = f + self.master.statusbar.message("Duplicated.") + elif key == "e": + if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + self.master.prompt_onekey( + "Edit request", + ( + ("query", "q"), + ("form", "f"), + ("url", "u"), + ("header", "h"), + ("raw body", "r"), + ("method", "m"), + ), + self.edit + ) + else: + self.master.prompt_onekey( + "Edit response", + ( + ("code", "c"), + ("message", "m"), + ("header", "h"), + ("raw body", "r"), + ), + self.edit + ) + key = None + elif key == "m": + self.master.prompt_onekey( + "View", + ( + ("raw", "r"), + ("pretty", "p"), + ("hex", "h"), + ), + self.master.changeview + ) + key = None + elif key == "p": + self.view_prev_flow(self.flow) + elif key == "r": + r = self.master.replay_request(self.flow) + if r: + self.master.statusbar.message(r) + self.master.refresh_flow(self.flow) + elif key == "V": + self.state.revert(self.flow) + self.master.refresh_flow(self.flow) + elif key == "W": + self.master.path_prompt( + "Save this flow: ", + self.state.last_saveload, + self.master.save_one_flow, + self.flow + ) + elif key == "v": + if conn and conn.content: + t = conn.headers["content-type"] or [None] + t = t[0] + self.master.spawn_external_viewer(conn.content, t) + elif key == "b": + if conn: + if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: + self.master.path_prompt( + "Save request body: ", + self.state.last_saveload, + self.save_body + ) + else: + self.master.path_prompt( + "Save response body: ", + self.state.last_saveload, + self.save_body + ) + elif key == "|": + self.master.path_prompt( + "Send flow to script: ", self.state.last_script, + self.master.run_script_once, self.flow + ) + elif key == "z": + if conn: + e = conn.headers["content-encoding"] or ["identity"] + if e[0] != "identity": + conn.decode() + else: + self.master.prompt_onekey( + "Select encoding: ", + ( + ("gzip", "z"), + ("deflate", "d"), + ), + self.encode_callback, + conn + ) + self.master.refresh_flow(self.flow) + else: + return key + + def encode_callback(self, key, conn): + encoding_map = { + "z": "gzip", + "d": "deflate", + } + conn.encode(encoding_map[key]) + self.master.refresh_flow(self.flow) diff --git a/libmproxy/console/help.py b/libmproxy/console/help.py index 8b92369c..224c6ab7 100644 --- a/libmproxy/console/help.py +++ b/libmproxy/console/help.py @@ -59,7 +59,7 @@ class HelpView(urwid.ListBox): [("text", ": disable server replay response refresh")] ), - ("q", "quit / return to connection list"), + ("q", "quit / return to flow list"), ("Q", "quit without confirm prompt"), ("R", "set reverse proxy mode"), ("s", "set/unset script"), -- cgit v1.2.3