from __future__ import absolute_import import os, sys, copy import urwid from . import common, grideditor, contentview, signals from .. import utils, flow, controller from ..protocol.http import HTTPRequest, HTTPResponse, CONTENT_MISSING, decoded class SearchError(Exception): pass def _mkhelp(): text = [] keys = [ ("A", "accept all intercepted flows"), ("a", "accept this intercepted flow"), ("b", "save request/response body"), ("d", "delete flow"), ("D", "duplicate flow"), ("e", "edit request/response"), ("f", "load full body data"), ("g", "copy response(content/headers) to clipboard"), ("m", "change body display mode for this entity"), (None, common.highlight_key("automatic", "a") + [("text", ": automatic detection")] ), (None, common.highlight_key("hex", "e") + [("text", ": Hex")] ), (None, common.highlight_key("html", "h") + [("text", ": HTML")] ), (None, common.highlight_key("image", "i") + [("text", ": Image")] ), (None, common.highlight_key("javascript", "j") + [("text", ": JavaScript")] ), (None, common.highlight_key("json", "s") + [("text", ": JSON")] ), (None, common.highlight_key("urlencoded", "u") + [("text", ": URL-encoded data")] ), (None, common.highlight_key("raw", "r") + [("text", ": raw data")] ), (None, common.highlight_key("xml", "x") + [("text", ": XML")] ), ("M", "change default body display mode"), ("p", "previous flow"), ("r", "replay request"), ("V", "revert changes to request"), ("v", "view body in external viewer"), ("w", "save all flows matching current limit"), ("W", "save this flow"), ("x", "delete body"), ("X", "view flow details"), ("z", "encode/decode a request/response"), ("tab", "toggle request/response view"), ("space", "next flow"), ("|", "run script on this flow"), ("/", "search in response body (case sensitive)"), ("n", "repeat search forward"), ("N", "repeat search backwards"), ] text.extend(common.format_keyvals(keys, key="key", val="text", indent=4)) return text help_context = _mkhelp() footer = [ ('heading_key', "?"), ":help ", ('heading_key', "q"), ":back ", ] class FlowViewHeader(urwid.WidgetWrap): def __init__(self, master, f): self.master, self.flow = master, f self._w = common.format_flow(f, False, extended=True, padding=0, hostheader=self.master.showhost) signals.flow_change.connect(self.sig_flow_change) def sig_flow_change(self, sender, flow): if flow == self.flow: self._w = common.format_flow( flow, False, extended=True, padding=0, hostheader=self.master.showhost ) class CallbackCache: @utils.LRUCache(200) def _callback(self, method, *args, **kwargs): return getattr(self.obj, method)(*args, **kwargs) def callback(self, obj, method, *args, **kwargs): # obj varies! self.obj = obj return self._callback(method, *args, **kwargs) cache = CallbackCache() class FlowView(urwid.WidgetWrap): REQ = 0 RESP = 1 highlight_color = "focusfield" def __init__(self, master, state, flow): self.master, self.state, self.flow = master, state, flow self.last_displayed_body = None if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: self.view_response() else: self.view_request() signals.flow_change.connect(self.sig_flow_change) def sig_flow_change(self, sender, flow): if flow == self.flow: if self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE and self.flow.response: self.view_response() else: self.view_request() def _cached_content_view(self, viewmode, hdrItems, content, limit, is_request): return contentview.get_content_view(viewmode, hdrItems, content, limit, self.master.add_event, is_request) def content_view(self, viewmode, conn): full = self.state.get_flow_setting( self.flow, (self.state.view_flow_mode, "fullcontents"), False ) if full: limit = sys.maxint else: limit = contentview.VIEW_CUTOFF description, text_objects = cache.callback( self, "_cached_content_view", viewmode, tuple(tuple(i) for i in conn.headers.lst), conn.content, limit, isinstance(conn, HTTPRequest) ) return (description, text_objects) def cont_view_handle_missing(self, conn, viewmode): if conn.content == CONTENT_MISSING: msg, body = "", [urwid.Text([("error", "[content missing]")])] else: msg, body = self.content_view(viewmode, conn) return (msg, body) def viewmode_get(self, override): return self.state.default_body_view if override is None else override def override_get(self): return self.state.get_flow_setting(self.flow, (self.state.view_flow_mode, "prettyview")) def conn_text_raw(self, conn): """ Based on a request/response, conn, returns the elements for display. """ headers = common.format_keyvals( [(h+":", v) for (h, v) in conn.headers.lst], key = "header", val = "text" ) override = self.override_get() viewmode = self.viewmode_get(override) msg, body = self.cont_view_handle_missing(conn, viewmode) return headers, msg, body def conn_text_merge(self, headers, msg, body): """ Grabs what is returned by conn_text_raw and merges them all toghether, mainly used by conn_text and search """ override = self.override_get() viewmode = self.viewmode_get(override) cols = [urwid.Text( [ ("heading", msg), ] ) ] if override is not None: cols.append(urwid.Text([ " ", ('heading', "["), ('heading_key', "m"), ('heading', (":%s]", ], align="right" ) ) title = urwid.AttrWrap(urwid.Columns(cols), "heading") headers.append(title) headers.extend(body) return headers def conn_text(self, conn): """ Same as conn_text_raw, but returns result wrapped in a listbox ready for usage. """ headers, msg, body = self.conn_text_raw(conn) merged = self.conn_text_merge(headers, msg, body) return urwid.ListBox(merged) def _tab(self, content, attr): p = urwid.Text(content) p = urwid.Padding(p, align="left", width=("relative", 100)) p = urwid.AttrWrap(p, attr) return p def wrap_body(self, active, body): parts = [] if self.flow.intercepted and not self.flow.reply.acked and not self.flow.response: qt = "Request intercepted" else: qt = "Request" if active == common.VIEW_FLOW_REQUEST: parts.append(self._tab(qt, "heading")) else: parts.append(self._tab(qt, "heading_inactive")) if self.flow.intercepted and not self.flow.reply.acked and self.flow.response: st = "Response intercepted" else: st = "Response" if active == common.VIEW_FLOW_RESPONSE: parts.append(self._tab(st, "heading")) else: parts.append(self._tab(st, "heading_inactive")) h = urwid.Columns(parts) f = urwid.Frame( body, header=h ) return f def search_wrapped_around(self, last_find_line, last_search_index, backwards): """ returns true if search wrapped around the bottom. """ current_find_line = self.state.get_flow_setting(self.flow, "last_find_line") current_search_index = self.state.get_flow_setting(self.flow, "last_search_index") if not backwards: message = "search hit BOTTOM, continuing at TOP" if current_find_line <= last_find_line: return True, message elif current_find_line == last_find_line: if current_search_index <= last_search_index: return True, message else: message = "search hit TOP, continuing at BOTTOM" if current_find_line >= last_find_line: return True, message elif current_find_line == last_find_line: if current_search_index >= last_search_index: return True, message return False, "" def search_again(self, backwards=False): """ runs the previous search again, forwards or backwards. """ last_search_string = self.state.get_flow_setting(self.flow, "last_search_string") if last_search_string: message =, backwards) if message: signals.status_message.send(message=message) else: message = "no previous searches have been made" signals.status_message.send(message=message) return message def search(self, search_string, backwards=False): """ similar to view_response or view_request, but instead of just displaying the conn, it highlights a word that the user is searching for and handles all the logic surrounding that. """ if not search_string: search_string = self.state.get_flow_setting(self.flow, "last_search_string") if not search_string: return if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: text = self.flow.request const = common.VIEW_FLOW_REQUEST else: text = self.flow.response const = common.VIEW_FLOW_RESPONSE if not self.flow.response: return "no response to search in" last_find_line = self.state.get_flow_setting(self.flow, "last_find_line") last_search_index = self.state.get_flow_setting(self.flow, "last_search_index") # generate the body, highlight the words and get focus headers, msg, body = self.conn_text_raw(text) try: body, focus_position = self.search_highlight_text(body, search_string, backwards=backwards) except SearchError: return "Search not supported in this view." if focus_position == None: # no results found. return "no matches for '%s'" % search_string # UI stuff. merged = self.conn_text_merge(headers, msg, body) list_box = urwid.ListBox(merged) list_box.set_focus(focus_position + 2) self._w = self.wrap_body(const, list_box) signals.update_settings.send(self) self.last_displayed_body = list_box wrapped, wrapped_message = self.search_wrapped_around(last_find_line, last_search_index, backwards) if wrapped: return wrapped_message def search_get_start(self, search_string): start_line = 0 start_index = 0 last_search_string = self.state.get_flow_setting(self.flow, "last_search_string") if search_string == last_search_string: start_line = self.state.get_flow_setting(self.flow, "last_find_line") start_index = self.state.get_flow_setting(self.flow, "last_search_index") if start_index == None: start_index = 0 else: start_index += len(search_string) if start_line == None: start_line = 0 else: self.state.add_flow_setting(self.flow, "last_search_string", search_string) return (start_line, start_index) def search_get_range(self, len_text_objects, start_line, backwards): if not backwards: loop_range = xrange(start_line, len_text_objects) else: loop_range = xrange(start_line, -1, -1) return loop_range def search_find(self, text, search_string, start_index, backwards): if backwards == False: find_index = text.find(search_string, start_index) else: if start_index != 0: start_index -= len(search_string) else: start_index = None find_index = text.rfind(search_string, 0, start_index) return find_index def search_highlight_text(self, text_objects, search_string, looping = False, backwards = False): start_line, start_index = self.search_get_start(search_string) i = start_line found = False text_objects = copy.deepcopy(text_objects) loop_range = self.search_get_range(len(text_objects), start_line, backwards) for i in loop_range: text_object = text_objects[i] try: text, style = text_object.get_text() except AttributeError: raise SearchError() if i != start_line: start_index = 0 find_index = self.search_find(text, search_string, start_index, backwards) if find_index != -1: new_text = self.search_highlight_object(text, find_index, search_string) text_objects[i] = new_text found = True self.state.add_flow_setting(self.flow, "last_search_index", find_index) self.state.add_flow_setting(self.flow, "last_find_line", i) break # handle search WRAP if found: focus_pos = i else : if looping: focus_pos = None else: if not backwards: self.state.add_flow_setting(self.flow, "last_search_index", 0) self.state.add_flow_setting(self.flow, "last_find_line", 0) else: self.state.add_flow_setting(self.flow, "last_search_index", None) self.state.add_flow_setting(self.flow, "last_find_line", len(text_objects) - 1) text_objects, focus_pos = self.search_highlight_text(text_objects, search_string, looping=True, backwards=backwards) return text_objects, focus_pos def search_highlight_object(self, text_object, find_index, search_string): """ just a little abstraction """ before = text_object[:find_index] after = text_object[find_index+len(search_string):] new_text = urwid.Text( [ before, (self.highlight_color, search_string), after, ] ) return new_text def view_request(self): self.state.view_flow_mode = common.VIEW_FLOW_REQUEST body = self.conn_text(self.flow.request) self._w = self.wrap_body(common.VIEW_FLOW_REQUEST, body) def view_response(self): self.state.view_flow_mode = common.VIEW_FLOW_RESPONSE if self.flow.response: body = self.conn_text(self.flow.response) else: body = urwid.ListBox( [ urwid.Text(""), urwid.Text( [ ("highlight", "No response. Press "), ("key", "e"), ("highlight", " and edit any aspect to add one."), ] ) ] ) self._w = self.wrap_body(common.VIEW_FLOW_RESPONSE, body) def set_method_raw(self, m): if m: self.flow.request.method = m signals.flow_change.send(self, flow = self.flow) def edit_method(self, m): if m == "e": signals.status_prompt.send( prompt = "Method", text = self.flow.request.method, callback = self.set_method_raw ) else: for i in common.METHOD_OPTIONS: if i[1] == m: self.flow.request.method = i[0].upper() signals.flow_change.send(self, flow = self.flow) def set_url(self, url): request = self.flow.request try: request.url = str(url) except ValueError: return "Invalid URL." signals.flow_change.send(self, flow = self.flow) def set_resp_code(self, code): response = self.flow.response try: response.code = int(code) except ValueError: return None import BaseHTTPServer if BaseHTTPServer.BaseHTTPRequestHandler.responses.has_key(int(code)): response.msg = BaseHTTPServer.BaseHTTPRequestHandler.responses[int(code)][0] signals.flow_change.send(self, flow = self.flow) def set_resp_msg(self, msg): response = self.flow.response response.msg = msg signals.flow_change.send(self, flow = self.flow) def set_headers(self, lst, conn): conn.headers = flow.ODictCaseless(lst) def set_query(self, lst, conn): conn.set_query(flow.ODict(lst)) def set_path_components(self, lst, conn): conn.set_path_components([i[0] for i in lst]) def set_form(self, lst, conn): conn.set_form_urlencoded(flow.ODict(lst)) def edit_form(self, conn): self.master.view_grideditor( grideditor.URLEncodedFormEditor(self.master, conn.get_form_urlencoded().lst, self.set_form, conn) ) def edit_form_confirm(self, key, conn): if key == "y": self.edit_form(conn) def edit(self, part): if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: message = self.flow.request else: if not self.flow.response: self.flow.response = HTTPResponse( self.flow.request.httpversion, 200, "OK", flow.ODictCaseless(), "" ) self.flow.response.reply = controller.DummyReply() message = self.flow.response self.flow.backup() if part == "r": with decoded(message): # Fix an issue caused by some editors when editing a request/response body. # Many editors make it hard to save a file without a terminating newline on the last # line. When editing message bodies, this can cause problems. For now, I just # strip the newlines off the end of the body when we return from an editor. c = self.master.spawn_editor(message.content or "") message.content = c.rstrip("\n") elif part == "f": if not message.get_form_urlencoded() and message.content: signals.status_prompt_onekey.send( prompt = "Existing body is not a URL-encoded form. Clear and edit?", keys = [ ("yes", "y"), ("no", "n"), ], callback = self.edit_form_confirm, args = (message,) ) else: self.edit_form(message) elif part == "h": self.master.view_grideditor(grideditor.HeaderEditor(self.master, message.headers.lst, self.set_headers, message)) elif part == "p": p = message.get_path_components() p = [[i] for i in p] self.master.view_grideditor(grideditor.PathEditor(self.master, p, self.set_path_components, message)) elif part == "q": self.master.view_grideditor(grideditor.QueryEditor(self.master, message.get_query().lst, self.set_query, message)) elif part == "u" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: signals.status_prompt.send( prompt = "URL", text = message.url, callback = self.set_url ) elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: signals.status_prompt_onekey.send( prompt = "Method", keys = common.METHOD_OPTIONS, callback = self.edit_method ) elif part == "c" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: signals.status_prompt.send( prompt = "Code", text = str(message.code), callback = self.set_resp_code ) elif part == "m" and self.state.view_flow_mode == common.VIEW_FLOW_RESPONSE: signals.status_prompt.send( prompt = "Message", text = message.msg, callback = self.set_resp_msg ) signals.flow_change.send(self, flow = self.flow) def _view_nextprev_flow(self, np, flow): try: idx = self.state.view.index(flow) except IndexError: return if np == "next": new_flow, new_idx = self.state.get_next(idx) else: new_flow, new_idx = self.state.get_prev(idx) if new_flow is None: signals.status_message.send(message="No more flows!") return self.master.view_flow(new_flow) def view_next_flow(self, flow): return self._view_nextprev_flow("next", flow) def view_prev_flow(self, flow): return self._view_nextprev_flow("prev", flow) def change_this_display_mode(self, t): self.state.add_flow_setting( self.flow, (self.state.view_flow_mode, "prettyview"), contentview.get_by_shortcut(t) ) signals.flow_change.send(self, flow = self.flow) def delete_body(self, t): if t == "m": val = CONTENT_MISSING else: val = None if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: self.flow.request.content = val else: self.flow.response.content = val signals.flow_change.send(self, flow = self.flow) def keypress(self, size, key): if key == " ": self.view_next_flow(self.flow) return key = common.shortcuts(key) if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: conn = self.flow.request else: conn = self.flow.response if key == "q": self.master.view_flowlist() key = None elif key == "tab": if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: self.view_response() else: self.view_request() elif key in ("up", "down", "page up", "page down"): # Why doesn't this just work?? self._w.keypress(size, key) elif key == "a": self.flow.accept_intercept(self.master) self.master.view_flow(self.flow) elif key == "A": self.master.accept_all() self.master.view_flow(self.flow) elif key == "b": if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: common.ask_save_body("q", self.master, self.state, self.flow) else: common.ask_save_body("s", self.master, self.state, self.flow) elif key == "d": if self.state.flow_count() == 1: self.master.view_flowlist() elif self.state.view.index(self.flow) == len(self.state.view)-1: self.view_prev_flow(self.flow) else: self.view_next_flow(self.flow) f = self.flow f.kill(self.master) self.state.delete_flow(f) elif key == "D": f = self.master.duplicate_flow(self.flow) self.master.view_flow(f) signals.status_message.send(message="Duplicated.") elif key == "e": if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: signals.status_prompt_onekey.send( prompt = "Edit request", keys = ( ("query", "q"), ("path", "p"), ("url", "u"), ("header", "h"), ("form", "f"), ("raw body", "r"), ("method", "m"), ), callback = self.edit ) else: signals.status_prompt_onekey.send( prompt = "Edit response", keys = ( ("code", "c"), ("message", "m"), ("header", "h"), ("raw body", "r"), ), callback = self.edit ) key = None elif key == "f": signals.status_message.send(message="Loading all body data...") self.state.add_flow_setting( self.flow, (self.state.view_flow_mode, "fullcontents"), True ) signals.flow_change.send(self, flow = self.flow) signals.status_message.send(message="") elif key == "g": if self.state.view_flow_mode == common.VIEW_FLOW_REQUEST: scope = "q" else: scope = "s" common.ask_copy_part(scope, self.flow, self.master, self.state) elif key == "m": p = list(contentview.view_prompts) p.insert(0, ("Clear", "C")) signals.status_prompt_onekey.send( self, prompt = "Display mode", keys = p, callback = self.change_this_display_mode ) key = None elif key == "p": self.view_prev_flow(self.flow) elif key == "r": r = self.master.replay_request(self.flow) if r: signals.status_message.send(message=r) signals.flow_change.send(self, flow = self.flow) elif key == "V": if not self.flow.modified(): signals.status_message.send(message="Flow not modified.") return self.state.revert(self.flow) signals.flow_change.send(self, flow = self.flow) signals.status_message.send(message="Reverted.") elif key == "W": signals.status_prompt_path.send( prompt = "Save this flow", callback = self.master.save_one_flow, args = (self.flow,) ) elif key == "v": if conn and conn.content: t = conn.headers["content-type"] or [None] t = t[0] if os.environ.has_key("EDITOR") or os.environ.has_key("PAGER"): self.master.spawn_external_viewer(conn.content, t) else: signals.status_message.send(message="Error! Set $EDITOR or $PAGER.") elif key == "|": signals.status_prompt_path.send( prompt = "Send flow to script", callback = self.master.run_script_once, args = (self.flow,) ) elif key == "x": signals.status_prompt_onekey.send( prompt = "Delete body", keys = ( ("completely", "c"), ("mark as missing", "m"), ), callback = self.delete_body ) key = None elif key == "X": self.master.view_flowdetails(self.flow) elif key == "z": if conn: self.flow.backup() e = conn.headers.get_first("content-encoding", "identity") if e != "identity": if not conn.decode(): signals.status_message.send(message="Could not decode - invalid data?") else: signals.status_prompt_onekey.send( prompt = "Select encoding: ", keys = ( ("gzip", "z"), ("deflate", "d"), ), callback = self.encode_callback, args = (conn,) ) signals.flow_change.send(self, flow = self.flow) elif key == "/": last_search_string = self.state.get_flow_setting(self.flow, "last_search_string") search_prompt = "Search body ["+last_search_string+"]" if last_search_string else "Search body" signals.status_prompt.send( prompt = search_prompt, text = "", callback = ) elif key == "n": self.search_again(backwards=False) elif key == "N": self.search_again(backwards=True) else: return key def encode_callback(self, key, conn): encoding_map = { "z": "gzip", "d": "deflate", } conn.encode(encoding_map[key]) signals.flow_change.send(self, flow = self.flow)