From 24641d8561e6765b2aafada24fbefec2407eeeb3 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Sun, 17 Jan 2016 20:18:53 +0100 Subject: cleanup code --- libmproxy/protocol/http.py | 106 ++++++++++++++++++++++++--------------------- 1 file changed, 56 insertions(+), 50 deletions(-) (limited to 'libmproxy') diff --git a/libmproxy/protocol/http.py b/libmproxy/protocol/http.py index 75658661..74bb0c19 100644 --- a/libmproxy/protocol/http.py +++ b/libmproxy/protocol/http.py @@ -239,6 +239,57 @@ class Http2Layer(Layer): # CONNECT for proxying? raise NotImplementedError() + def _handle_event(self, event, source_conn, other_conn, is_server): + is_server = (conn == self.server_conn.connection) + if hasattr(event, 'stream_id'): + if is_server: + eid = self.server_to_client_stream_ids[event.stream_id] + else: + eid = event.stream_id + + if isinstance(event, RequestReceived): + headers = Headers([[str(k), str(v)] for k, v in event.headers]) + self.streams[eid] = Http2SingleStreamLayer(self, eid, headers) + self.streams[eid].timestamp_start = time.time() + self.streams[eid].start() + elif isinstance(event, ResponseReceived): + headers = Headers([[str(k), str(v)] for k, v in event.headers]) + self.streams[eid].timestamp_start = time.time() + self.streams[eid].response_headers = headers + self.streams[eid].response_arrived.set() + elif isinstance(event, DataReceived): + self.streams[eid].data_queue.put(event.data) + source_conn.h2.safe_increment_flow_control(event.stream_id, len(event.data)) + elif isinstance(event, StreamEnded): + self.streams[eid].timestamp_end = time.time() + self.streams[eid].data_finished.set() + elif isinstance(event, StreamReset): + self.streams[eid].zombie = time.time() + if eid in self.streams and event.error_code == 0x8: + if is_server: + other_stream_id = self.streams[eid].client_stream_id + else: + other_stream_id = self.streams[eid].server_stream_id + other_conn.h2.safe_reset_stream(other_stream_id, event.error_code) + elif isinstance(event, RemoteSettingsChanged): + source_conn.h2.safe_acknowledge_settings(event) + new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()]) + other_conn.h2.safe_update_settings(new_settings) + elif isinstance(event, ConnectionTerminated): + other_conn.h2.safe_close_connection(event.error_code) + return + elif isinstance(event, TrailersReceived): + raise NotImplementedError() + elif isinstance(event, PushedStreamReceived): + raise NotImplementedError() + + def _cleanup_streams(self): + death_time = time.time() - 10 + for stream_id in self.streams.keys(): + zombie = self.streams[stream_id].zombie + if zombie and zombie <= death_time: + self.streams.pop(stream_id, None) + def __call__(self): if self.server_conn: self._initiate_server_conn() @@ -254,10 +305,9 @@ class Http2Layer(Layer): for conn in r: source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn - is_server = (conn == self.server_conn.connection) - fields = struct.unpack("!HB", source_conn.rfile.peek(3)) - length = (fields[0] << 8) + fields[1] + field = source_conn.rfile.peek(3) + length = (field[0] << 16) + (field[1] << 8) + field[2] raw_frame = source_conn.rfile.safe_read(9 + length) with source_conn.h2.lock: @@ -265,53 +315,9 @@ class Http2Layer(Layer): source_conn.send(source_conn.h2.data_to_send()) for event in events: - if hasattr(event, 'stream_id'): - if is_server: - eid = self.server_to_client_stream_ids[event.stream_id] - else: - eid = event.stream_id - - if isinstance(event, RequestReceived): - headers = Headers([[str(k), str(v)] for k, v in event.headers]) - self.streams[eid] = Http2SingleStreamLayer(self, eid, headers) - self.streams[eid].timestamp_start = time.time() - self.streams[eid].start() - elif isinstance(event, ResponseReceived): - headers = Headers([[str(k), str(v)] for k, v in event.headers]) - self.streams[eid].timestamp_start = time.time() - self.streams[eid].response_headers = headers - self.streams[eid].response_arrived.set() - elif isinstance(event, DataReceived): - self.streams[eid].data_queue.put(event.data) - source_conn.h2.safe_increment_flow_control(event.stream_id, len(event.data)) - elif isinstance(event, StreamEnded): - self.streams[eid].timestamp_end = time.time() - self.streams[eid].data_finished.set() - elif isinstance(event, StreamReset): - self.streams[eid].zombie = time.time() - if eid in self.streams and event.error_code == 0x8: - if is_server: - other_stream_id = self.streams[eid].client_stream_id - else: - other_stream_id = self.streams[eid].server_stream_id - other_conn.h2.safe_reset_stream(other_stream_id, event.error_code) - elif isinstance(event, RemoteSettingsChanged): - source_conn.h2.safe_acknowledge_settings(event) - new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()]) - other_conn.h2.safe_update_settings(new_settings) - elif isinstance(event, ConnectionTerminated): - other_conn.h2.safe_close_connection(event.error_code) - return - elif isinstance(event, TrailersReceived): - raise NotImplementedError() - elif isinstance(event, PushedStreamReceived): - raise NotImplementedError() - - death_time = time.time() - 10 - for stream_id in self.streams.keys(): - zombie = self.streams[stream_id].zombie - if zombie and zombie <= death_time: - self.streams.pop(stream_id, None) + self.handle_event(event, source_conn, other_conn) + + self.cleanup_streams() class Http2SingleStreamLayer(_HttpLayer, threading.Thread): -- cgit v1.2.3