import hashlib import urllib import typing from mitmproxy import ctx from mitmproxy import flow from mitmproxy import exceptions from mitmproxy import io from mitmproxy import command import mitmproxy.types class ServerPlayback: def __init__(self): self.flowmap = {} self.stop = False self.final_flow = None self.configured = False def load(self, loader): loader.add_option( "server_replay_kill_extra", bool, False, "Kill extra requests during replay." ) loader.add_option( "server_replay_nopop", bool, False, """ Don't remove flows from server replay state after use. This makes it possible to replay same response multiple times. """ ) loader.add_option( "server_replay_refresh", bool, True, """ Refresh server replay responses by adjusting date, expires and last-modified headers, as well as adjusting cookie expiration. """ ) loader.add_option( "server_replay_use_headers", typing.Sequence[str], [], "Request headers to be considered during replay." ) loader.add_option( "server_replay", typing.Sequence[str], [], "Replay server responses from a saved file." ) loader.add_option( "server_replay_ignore_content", bool, False, "Ignore request's content while searching for a saved flow to replay." ) loader.add_option( "server_replay_ignore_params", typing.Sequence[str], [], """ Request's parameters to be ignored while searching for a saved flow to replay. """ ) loader.add_option( "server_replay_ignore_payload_params", typing.Sequence[str], [], """ Request's payload parameters (application/x-www-form-urlencoded or multipart/form-data) to be ignored while searching for a saved flow to replay. """ ) loader.add_option( "server_replay_ignore_host", bool, False, """ Ignore request's destination host while searching for a saved flow to replay. """ ) @command.command("replay.server") def load_flows(self, flows: typing.Sequence[flow.Flow]) -> None: """ Replay server responses from flows. """ self.flowmap = {} for i in flows: if i.response: # type: ignore l = self.flowmap.setdefault(self._hash(i), []) l.append(i) ctx.master.addons.trigger("update", []) @command.command("replay.server.file") def load_file(self, path: mitmproxy.types.Path) -> None: try: flows = io.read_flows_from_paths([path]) except exceptions.FlowReadException as e: raise exceptions.CommandError(str(e)) self.load_flows(flows) @command.command("replay.server.stop") def clear(self) -> None: """ Stop server replay. """ self.flowmap = {} ctx.master.addons.trigger("update", []) def count(self): return sum([len(i) for i in self.flowmap.values()]) def _hash(self, flow): """ Calculates a loose hash of the flow request. """ r = flow.request _, _, path, _, query, _ = urllib.parse.urlparse(r.url) queriesArray = urllib.parse.parse_qsl(query, keep_blank_values=True) key = [str(r.port), str(r.scheme), str(r.method), str(path)] # type: List[Any] if not ctx.options.server_replay_ignore_content: if ctx.options.server_replay_ignore_payload_params and r.multipart_form: key.extend( (k, v) for k, v in r.multipart_form.items(multi=True) if k.decode(errors="replace") not in ctx.options.server_replay_ignore_payload_params ) elif ctx.options.server_replay_ignore_payload_params and r.urlencoded_form: key.extend( (k, v) for k, v in r.urlencoded_form.items(multi=True) if k not in ctx.options.server_replay_ignore_payload_params ) else: key.append(str(r.raw_content)) if not ctx.options.server_replay_ignore_host: key.append(r.host) filtered = [] ignore_params = ctx.options.server_replay_ignore_params or [] for p in queriesArray: if p[0] not in ignore_params: filtered.append(p) for p in filtered: key.append(p[0]) key.append(p[1]) if ctx.options.server_replay_use_headers: headers = [] for i in ctx.options.server_replay_use_headers: v = r.headers.get(i) headers.append((i, v)) key.append(headers) return hashlib.sha256( repr(key).encode("utf8", "surrogateescape") ).digest() def next_flow(self, request): """ Returns the next flow object, or None if no matching flow was found. """ hsh = self._hash(request) if hsh in self.flowmap: if ctx.options.server_replay_nopop: return self.flowmap[hsh][0] else: ret = self.flowmap[hsh].pop(0) if not self.flowmap[hsh]: del self.flowmap[hsh] return ret def configure(self, updated): if not self.configured and ctx.options.server_replay: self.configured = True try: flows = io.read_flows_from_paths(ctx.options.server_replay) except exceptions.FlowReadException as e: raise exceptions.OptionsError(str(e)) self.load_flows(flows) def tick(self): if self.stop and not self.final_flow.live: ctx.master.addons.trigger("processing_complete") def request(self, f): if self.flowmap: rflow = self.next_flow(f) if rflow: response = rflow.response.copy() response.is_replay = True if ctx.options.server_replay_refresh: response.refresh() f.response = response if not self.flowmap: self.final_flow = f self.stop = True elif ctx.options.server_replay_kill_extra: ctx.log.warn( "server_playback: killed non-replay request {}".format( f.request.url ) ) f.reply.kill()