diff options
Diffstat (limited to 'mitmproxy/protocol/http2.py')
-rw-r--r-- | mitmproxy/protocol/http2.py | 164 |
1 files changed, 93 insertions, 71 deletions
diff --git a/mitmproxy/protocol/http2.py b/mitmproxy/protocol/http2.py index b9a30c7e..27c2a664 100644 --- a/mitmproxy/protocol/http2.py +++ b/mitmproxy/protocol/http2.py @@ -5,7 +5,6 @@ import time import traceback import h2.exceptions -import hyperframe import six from h2 import connection from h2 import events @@ -55,12 +54,12 @@ class SafeH2Connection(connection.H2Connection): self.update_settings(new_settings) self.conn.send(self.data_to_send()) - def safe_send_headers(self, is_zombie, stream_id, headers): - # make sure to have a lock - if is_zombie(): # pragma: no cover - raise exceptions.Http2ProtocolException("Zombie Stream") - self.send_headers(stream_id, headers.fields) - self.conn.send(self.data_to_send()) + def safe_send_headers(self, is_zombie, stream_id, headers, **kwargs): + with self.lock: + if is_zombie(): # pragma: no cover + raise exceptions.Http2ProtocolException("Zombie Stream") + self.send_headers(stream_id, headers.fields, **kwargs) + self.conn.send(self.data_to_send()) def safe_send_body(self, is_zombie, stream_id, chunks): for chunk in chunks: @@ -79,7 +78,7 @@ class SafeH2Connection(connection.H2Connection): self.send_data(stream_id, frame_chunk) try: self.conn.send(self.data_to_send()) - except Exception as e: + except Exception as e: # pragma: no cover raise e finally: self.lock.release() @@ -141,6 +140,12 @@ class Http2Layer(base.Layer): headers = netlib.http.Headers([[k, v] for k, v in event.headers]) self.streams[eid] = Http2SingleStreamLayer(self, eid, headers) self.streams[eid].timestamp_start = time.time() + self.streams[eid].no_body = (event.stream_ended is not None) + if event.priority_updated is not None: + self.streams[eid].priority_exclusive = event.priority_updated.exclusive + self.streams[eid].priority_depends_on = event.priority_updated.depends_on + self.streams[eid].priority_weight = event.priority_updated.weight + self.streams[eid].handled_priority_event = event.priority_updated self.streams[eid].start() elif isinstance(event, events.ResponseReceived): headers = netlib.http.Headers([[k, v] for k, v in event.headers]) @@ -150,10 +155,13 @@ class Http2Layer(base.Layer): self.streams[eid].response_arrived.set() elif isinstance(event, events.DataReceived): if self.config.body_size_limit and self.streams[eid].queued_data_length > self.config.body_size_limit: - raise netlib.exceptions.HttpException("HTTP body too large. Limit is {}.".format(self.config.body_size_limit)) - self.streams[eid].data_queue.put(event.data) - self.streams[eid].queued_data_length += len(event.data) - source_conn.h2.safe_increment_flow_control(event.stream_id, event.flow_controlled_length) + self.streams[eid].zombie = time.time() + source_conn.h2.safe_reset_stream(event.stream_id, 0x7) + self.log("HTTP body too large. Limit is {}.".format(self.config.body_size_limit), "info") + else: + self.streams[eid].data_queue.put(event.data) + self.streams[eid].queued_data_length += len(event.data) + source_conn.h2.safe_increment_flow_control(event.stream_id, event.flow_controlled_length) elif isinstance(event, events.StreamEnded): self.streams[eid].timestamp_end = time.time() self.streams[eid].data_finished.set() @@ -184,7 +192,6 @@ class Http2Layer(base.Layer): self.client_conn.send(self.client_conn.h2.data_to_send()) self._kill_all_streams() return False - elif isinstance(event, events.PushedStreamReceived): # pushed stream ids should be unique and not dependent on race conditions # only the parent stream id must be looked up first @@ -193,7 +200,7 @@ class Http2Layer(base.Layer): self.client_conn.h2.push_stream(parent_eid, event.pushed_stream_id, event.headers) self.client_conn.send(self.client_conn.h2.data_to_send()) - headers = netlib.http.Headers([[str(k), str(v)] for k, v in event.headers]) + headers = netlib.http.Headers([[k, v] for k, v in event.headers]) self.streams[event.pushed_stream_id] = Http2SingleStreamLayer(self, event.pushed_stream_id, headers) self.streams[event.pushed_stream_id].timestamp_start = time.time() self.streams[event.pushed_stream_id].pushed = True @@ -202,28 +209,55 @@ class Http2Layer(base.Layer): self.streams[event.pushed_stream_id].request_data_finished.set() self.streams[event.pushed_stream_id].start() elif isinstance(event, events.PriorityUpdated): - stream_id = event.stream_id - if stream_id in self.streams.keys() and self.streams[stream_id].server_stream_id: - stream_id = self.streams[stream_id].server_stream_id - - depends_on = event.depends_on - if depends_on in self.streams.keys() and self.streams[depends_on].server_stream_id: - depends_on = self.streams[depends_on].server_stream_id - - # weight is between 1 and 256 (inclusive), but represented as uint8 (0 to 255) - frame = hyperframe.frame.PriorityFrame(stream_id, depends_on, event.weight - 1, event.exclusive) - self.server_conn.send(frame.serialize()) + if eid in self.streams and self.streams[eid].handled_priority_event is event: + # this event was already handled during stream creation + # HeadersFrame + Priority information as RequestReceived + return True + + mapped_stream_id = event.stream_id + if mapped_stream_id in self.streams and self.streams[mapped_stream_id].server_stream_id: + # if the stream is already up and running and was sent to the server + # use the mapped server stream id to update priority information + mapped_stream_id = self.streams[mapped_stream_id].server_stream_id + + if eid in self.streams: + self.streams[eid].priority_exclusive = event.exclusive + self.streams[eid].priority_depends_on = event.depends_on + self.streams[eid].priority_weight = event.weight + + with self.server_conn.h2.lock: + self.server_conn.h2.prioritize( + mapped_stream_id, + weight=event.weight, + depends_on=self._map_depends_on_stream_id(mapped_stream_id, event.depends_on), + exclusive=event.exclusive + ) + self.server_conn.send(self.server_conn.h2.data_to_send()) elif isinstance(event, events.TrailersReceived): - raise NotImplementedError() + raise NotImplementedError("TrailersReceived not implemented") return True + def _map_depends_on_stream_id(self, stream_id, depends_on): + mapped_depends_on = depends_on + if mapped_depends_on in self.streams and self.streams[mapped_depends_on].server_stream_id: + # if the depends-on-stream is already up and running and was sent to the server + # use the mapped server stream id to update priority information + mapped_depends_on = self.streams[mapped_depends_on].server_stream_id + if stream_id == mapped_depends_on: + # looks like one of the streams wasn't opened yet + # prevent self-dependent streams which result in ProtocolError + mapped_depends_on += 2 + return mapped_depends_on + def _cleanup_streams(self): death_time = time.time() - 10 - for stream_id in self.streams.keys(): - zombie = self.streams[stream_id].zombie - if zombie and zombie <= death_time: - self.streams.pop(stream_id, None) + + zombie_streams = [(stream_id, stream) for stream_id, stream in list(self.streams.items()) if stream.zombie] + outdated_streams = [stream_id for stream_id, stream in zombie_streams if stream.zombie <= death_time] + + for stream_id in outdated_streams: # pragma: no cover + self.streams.pop(stream_id, None) def _kill_all_streams(self): for stream in self.streams.values(): @@ -267,8 +301,8 @@ class Http2Layer(base.Layer): self._kill_all_streams() return - self._cleanup_streams() - except Exception as e: + self._cleanup_streams() + except Exception as e: # pragma: no cover self.log(repr(e), "info") self.log(traceback.format_exc(), "debug") self._kill_all_streams() @@ -296,6 +330,13 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread) self.response_queued_data_length = 0 self.response_data_finished = threading.Event() + self.no_body = False + + self.priority_exclusive = None + self.priority_depends_on = None + self.priority_weight = None + self.handled_priority_event = None + @property def data_queue(self): if self.response_arrived.is_set(): @@ -330,39 +371,13 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread) if self.zombie: # pragma: no cover raise exceptions.Http2ProtocolException("Zombie Stream") - authority = self.request_headers.get(':authority', '') - method = self.request_headers.get(':method', 'GET') - scheme = self.request_headers.get(':scheme', 'https') - path = self.request_headers.get(':path', '/') - self.request_headers.clear(":method") - self.request_headers.clear(":scheme") - self.request_headers.clear(":path") - host = None - port = None - - if path == '*' or path.startswith("/"): - first_line_format = "relative" - elif method == 'CONNECT': # pragma: no cover - raise NotImplementedError("CONNECT over HTTP/2 is not implemented.") - else: # pragma: no cover - first_line_format = "absolute" - # FIXME: verify if path or :host contains what we need - scheme, host, port, _ = netlib.http.url.parse(path) - - if authority: - host, _, port = authority.partition(':') - - if not host: - host = 'localhost' - if not port: - port = 443 if scheme == 'https' else 80 - port = int(port) - data = [] while self.request_data_queue.qsize() > 0: data.append(self.request_data_queue.get()) data = b"".join(data) + first_line_format, method, scheme, host, port, path = http2.parse_headers(self.request_headers) + return models.HTTPRequest( first_line_format, method, @@ -412,25 +427,29 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread) headers.insert(0, ":path", message.path) headers.insert(0, ":method", message.method) headers.insert(0, ":scheme", message.scheme) - self.server_stream_id = self.server_conn.h2.get_next_available_stream_id() - self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id try: self.server_conn.h2.safe_send_headers( self.is_zombie, self.server_stream_id, headers, + end_stream=self.no_body, + priority_exclusive=self.priority_exclusive, + priority_depends_on=self._map_depends_on_stream_id(self.server_stream_id, self.priority_depends_on), + priority_weight=self.priority_weight, ) - except Exception as e: + except Exception as e: # pragma: no cover raise e finally: self.server_conn.h2.lock.release() - self.server_conn.h2.safe_send_body( - self.is_zombie, - self.server_stream_id, - message.body - ) + if not self.no_body: + self.server_conn.h2.safe_send_body( + self.is_zombie, + self.server_stream_id, + [message.body] + ) + if self.zombie: # pragma: no cover raise exceptions.Http2ProtocolException("Zombie Stream") @@ -442,12 +461,12 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread) status_code = int(self.response_headers.get(':status', 502)) headers = self.response_headers.copy() - headers.clear(":status") + headers.pop(":status", None) return models.HTTPResponse( http_version=b"HTTP/2.0", status_code=status_code, - reason='', + reason=b'', headers=headers, content=None, timestamp_start=self.timestamp_start, @@ -472,6 +491,9 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread) def send_response_headers(self, response): headers = response.headers.copy() headers.insert(0, ":status", str(response.status_code)) + for forbidden_header in h2.utilities.CONNECTION_HEADERS: + if forbidden_header in headers: + del headers[forbidden_header] with self.client_conn.h2.lock: self.client_conn.h2.safe_send_headers( self.is_zombie, @@ -507,7 +529,7 @@ class Http2SingleStreamLayer(http._HttpTransmissionLayer, basethread.BaseThread) try: layer() - except exceptions.ProtocolException as e: + except exceptions.ProtocolException as e: # pragma: no cover self.log(repr(e), "info") self.log(traceback.format_exc(), "debug") |