From 936422cd735ac7d2bf19633c24dfeb1175cb3377 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Wed, 20 Jan 2016 19:58:24 +0100 Subject: split files into http, http1, and http2 --- libmproxy/protocol/http2.py | 365 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 365 insertions(+) create mode 100644 libmproxy/protocol/http2.py (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py new file mode 100644 index 00000000..a6d1b73f --- /dev/null +++ b/libmproxy/protocol/http2.py @@ -0,0 +1,365 @@ +from __future__ import (absolute_import, print_function, division) + +import struct +import threading +import time +import Queue + +from netlib.tcp import ssl_read_select +from netlib.exceptions import HttpException +from netlib.http import Headers + +import h2 +from h2.connection import H2Connection +from h2.events import * + +from .base import Layer +from .http import _HttpTransmissionLayer, HttpLayer +from .. import utils +from ..models import HTTPRequest, HTTPResponse + + +class SafeH2Connection(H2Connection): + def __init__(self, conn, *args, **kwargs): + super(SafeH2Connection, self).__init__(*args, **kwargs) + self.conn = conn + self.lock = threading.RLock() + + def safe_close_connection(self, error_code): + with self.lock: + self.close_connection(error_code) + self.conn.send(self.data_to_send()) + + def safe_increment_flow_control(self, stream_id, length): + if length == 0: + return + + with self.lock: + self.increment_flow_control_window(length) + self.conn.send(self.data_to_send()) + with self.lock: + if stream_id in self.streams and not self.streams[stream_id].closed: + self.increment_flow_control_window(length, stream_id=stream_id) + self.conn.send(self.data_to_send()) + + def safe_reset_stream(self, stream_id, error_code): + with self.lock: + try: + self.reset_stream(stream_id, error_code) + except h2.exceptions.StreamClosedError: + # stream is already closed - good + pass + self.conn.send(self.data_to_send()) + + def safe_update_settings(self, new_settings): + with self.lock: + self.update_settings(new_settings) + self.conn.send(self.data_to_send()) + + def safe_send_headers(self, is_zombie, stream_id, headers): + with self.lock: + if is_zombie(self, stream_id): + return + self.send_headers(stream_id, headers) + self.conn.send(self.data_to_send()) + + def safe_send_body(self, is_zombie, stream_id, chunks): + for chunk in chunks: + position = 0 + while position < len(chunk): + self.lock.acquire() + if is_zombie(self, stream_id): + return + max_outbound_frame_size = self.max_outbound_frame_size + frame_chunk = chunk[position:position+max_outbound_frame_size] + if self.local_flow_control_window(stream_id) < len(frame_chunk): + self.lock.release() + time.sleep(0) + continue + self.send_data(stream_id, frame_chunk) + self.conn.send(self.data_to_send()) + self.lock.release() + position += max_outbound_frame_size + with self.lock: + if is_zombie(self, stream_id): + return + self.end_stream(stream_id) + self.conn.send(self.data_to_send()) + + +class Http2Layer(Layer): + def __init__(self, ctx, mode): + super(Http2Layer, self).__init__(ctx) + self.mode = mode + self.streams = dict() + self.server_to_client_stream_ids = dict([(0, 0)]) + self.client_conn.h2 = SafeH2Connection(self.client_conn, client_side=False) + + # make sure that we only pass actual SSL.Connection objects in here, + # because otherwise ssl_read_select fails! + self.active_conns = [self.client_conn.connection] + + def _initiate_server_conn(self): + self.server_conn.h2 = SafeH2Connection(self.server_conn, client_side=True) + self.server_conn.h2.initiate_connection() + self.server_conn.send(self.server_conn.h2.data_to_send()) + self.active_conns.append(self.server_conn.connection) + + def connect(self): + self.ctx.connect() + self.server_conn.connect() + self._initiate_server_conn() + + def set_server(self): + raise NotImplementedError("Cannot change server for HTTP2 connections.") + + def disconnect(self): + raise NotImplementedError("Cannot dis- or reconnect in HTTP2 connections.") + + def next_layer(self): + # WebSockets over HTTP/2? + # CONNECT for proxying? + raise NotImplementedError() + + def _handle_event(self, event, source_conn, other_conn, is_server): + if hasattr(event, 'stream_id'): + if is_server and event.stream_id % 2 == 1: + eid = self.server_to_client_stream_ids[event.stream_id] + else: + eid = event.stream_id + + if isinstance(event, RequestReceived): + headers = Headers([[str(k), str(v)] for k, v in event.headers]) + self.streams[eid] = Http2SingleStreamLayer(self, eid, headers) + self.streams[eid].timestamp_start = time.time() + self.streams[eid].start() + elif isinstance(event, ResponseReceived): + headers = Headers([[str(k), str(v)] for k, v in event.headers]) + self.streams[eid].queued_data_length = 0 + self.streams[eid].timestamp_start = time.time() + self.streams[eid].response_headers = headers + self.streams[eid].response_arrived.set() + elif isinstance(event, DataReceived): + if self.config.body_size_limit and self.streams[eid].queued_data_length > self.config.body_size_limit: + raise HttpException("HTTP body too large. Limit is {}.".format(self.config.body_size_limit)) + self.streams[eid].data_queue.put(event.data) + self.streams[eid].queued_data_length += len(event.data) + source_conn.h2.safe_increment_flow_control(event.stream_id, len(event.data)) + elif isinstance(event, StreamEnded): + self.streams[eid].timestamp_end = time.time() + self.streams[eid].data_finished.set() + elif isinstance(event, StreamReset): + self.streams[eid].zombie = time.time() + if eid in self.streams and event.error_code == 0x8: + if is_server: + other_stream_id = self.streams[eid].client_stream_id + else: + other_stream_id = self.streams[eid].server_stream_id + other_conn.h2.safe_reset_stream(other_stream_id, event.error_code) + elif isinstance(event, RemoteSettingsChanged): + new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()]) + other_conn.h2.safe_update_settings(new_settings) + elif isinstance(event, ConnectionTerminated): + other_conn.h2.safe_close_connection(event.error_code) + return False + elif isinstance(event, PushedStreamReceived): + # pushed stream ids should be uniq and not dependent on race conditions + # only the parent stream id must be looked up first + parent_eid = self.server_to_client_stream_ids[event.parent_stream_id] + with self.client_conn.h2.lock: + self.client_conn.h2.push_stream(parent_eid, event.pushed_stream_id, event.headers) + + headers = Headers([[str(k), str(v)] for k, v in event.headers]) + headers['x-mitmproxy-pushed'] = 'true' + self.streams[event.pushed_stream_id] = Http2SingleStreamLayer(self, event.pushed_stream_id, headers) + self.streams[event.pushed_stream_id].timestamp_start = time.time() + self.streams[event.pushed_stream_id].pushed = True + self.streams[event.pushed_stream_id].parent_stream_id = parent_eid + self.streams[event.pushed_stream_id].timestamp_end = time.time() + self.streams[event.pushed_stream_id].data_finished.set() + self.streams[event.pushed_stream_id].start() + elif isinstance(event, TrailersReceived): + raise NotImplementedError() + + return True + + def _cleanup_streams(self): + death_time = time.time() - 10 + for stream_id in self.streams.keys(): + zombie = self.streams[stream_id].zombie + if zombie and zombie <= death_time: + self.streams.pop(stream_id, None) + + def __call__(self): + if self.server_conn: + self._initiate_server_conn() + + preamble = self.client_conn.rfile.read(24) + self.client_conn.h2.initiate_connection() + self.client_conn.h2.receive_data(preamble) + self.client_conn.send(self.client_conn.h2.data_to_send()) + + while True: + r = ssl_read_select(self.active_conns, 1) + for conn in r: + source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn + other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn + is_server = (conn == self.server_conn.connection) + + field = source_conn.rfile.peek(3) + length = int(field.encode('hex'), 16) + raw_frame = source_conn.rfile.safe_read(9 + length) + + with source_conn.h2.lock: + events = source_conn.h2.receive_data(raw_frame) + source_conn.send(source_conn.h2.data_to_send()) + + for event in events: + if not self._handle_event(event, source_conn, other_conn, is_server): + return + + self._cleanup_streams() + + +class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): + def __init__(self, ctx, stream_id, request_headers): + super(Http2SingleStreamLayer, self).__init__(ctx) + self.zombie = None + self.client_stream_id = stream_id + self.server_stream_id = None + self.request_headers = request_headers + self.response_headers = None + self.data_queue = Queue.Queue() + self.queued_data_length = 0 + + self.response_arrived = threading.Event() + self.data_finished = threading.Event() + + def is_zombie(self, h2_conn, stream_id): + if self.zombie: + return True + return False + + def read_request(self): + self.data_finished.wait() + self.data_finished.clear() + + authority = self.request_headers.get(':authority', '') + method = self.request_headers.get(':method', 'GET') + scheme = self.request_headers.get(':scheme', 'https') + path = self.request_headers.get(':path', '/') + host = None + port = None + + if path == '*' or path.startswith("/"): + form_in = "relative" + elif method == 'CONNECT': + form_in = "authority" + if ":" in authority: + host, port = authority.split(":", 1) + else: + host = authority + else: + form_in = "absolute" + # FIXME: verify if path or :host contains what we need + scheme, host, port, _ = utils.parse_url(path) + + if host is None: + host = 'localhost' + if port is None: + port = 80 if scheme == 'http' else 443 + port = int(port) + + data = [] + while self.data_queue.qsize() > 0: + data.append(self.data_queue.get()) + data = b"".join(data) + + return HTTPRequest( + form_in, + method, + scheme, + host, + port, + path, + (2, 0), + self.request_headers, + data, + timestamp_start=self.timestamp_start, + timestamp_end=self.timestamp_end, + ) + + def send_request(self, message): + with self.server_conn.h2.lock: + self.server_stream_id = self.server_conn.h2.get_next_available_stream_id() + self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id + + self.server_conn.h2.safe_send_headers( + self.is_zombie, + self.server_stream_id, + message.headers + ) + self.server_conn.h2.safe_send_body( + self.is_zombie, + self.server_stream_id, + message.body + ) + + def read_response_headers(self): + self.response_arrived.wait() + + status_code = int(self.response_headers.get(':status', 502)) + + return HTTPResponse( + http_version=(2, 0), + status_code=status_code, + reason='', + headers=self.response_headers, + content=None, + timestamp_start=self.timestamp_start, + timestamp_end=self.timestamp_end, + ) + + def read_response_body(self, request, response): + while True: + try: + yield self.data_queue.get(timeout=1) + except Queue.Empty: + pass + if self.data_finished.is_set(): + while self.data_queue.qsize() > 0: + yield self.data_queue.get() + return + if self.zombie: + return + + def send_response_headers(self, response): + self.client_conn.h2.safe_send_headers( + self.is_zombie, + self.client_stream_id, + response.headers + ) + + def send_response_body(self, _response, chunks): + self.client_conn.h2.safe_send_body( + self.is_zombie, + self.client_stream_id, + chunks + ) + + def check_close_connection(self, flow): + # This layer only handles a single stream. + # RFC 7540 8.1: An HTTP request/response exchange fully consumes a single stream. + return True + + def connect(self): + raise ValueError("CONNECT inside an HTTP2 stream is not supported.") + + def set_server(self, *args, **kwargs): + # do not mess with the server connection - all streams share it. + pass + + def run(self): + layer = HttpLayer(self, self.mode) + layer() + self.zombie = time.time() -- cgit v1.2.3 From a99ef584ad184658199d6ba83c73ed173f8ac0d0 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Sun, 24 Jan 2016 23:16:50 +0100 Subject: reuse frame reading snippet --- libmproxy/protocol/http2.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index a6d1b73f..ff1726ae 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -206,12 +206,8 @@ class Http2Layer(Layer): other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn is_server = (conn == self.server_conn.connection) - field = source_conn.rfile.peek(3) - length = int(field.encode('hex'), 16) - raw_frame = source_conn.rfile.safe_read(9 + length) - with source_conn.h2.lock: - events = source_conn.h2.receive_data(raw_frame) + events = source_conn.h2.receive_data(utils.http2_read_frame(source_conn.rfile)) source_conn.send(source_conn.h2.data_to_send()) for event in events: -- cgit v1.2.3 From f49c1cd1c5771aea8ab64bd93619e3b8f0729253 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Sun, 24 Jan 2016 23:53:23 +0100 Subject: improve http2 header parsing --- libmproxy/protocol/http2.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index ff1726ae..03408142 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -260,6 +260,9 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): # FIXME: verify if path or :host contains what we need scheme, host, port, _ = utils.parse_url(path) + if authority: + host, port = authority.split(':') + if host is None: host = 'localhost' if port is None: -- cgit v1.2.3 From 735c79a2edb3b31a35ed3e484744807bb626ab77 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Mon, 25 Jan 2016 19:41:22 +0100 Subject: increase coverage --- libmproxy/protocol/http2.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index 03408142..54e7572e 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -249,12 +249,13 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): if path == '*' or path.startswith("/"): form_in = "relative" - elif method == 'CONNECT': - form_in = "authority" - if ":" in authority: - host, port = authority.split(":", 1) - else: - host = authority + elif method == 'CONNECT': # pragma: no cover + # form_in = "authority" + # if ":" in authority: + # host, port = authority.split(":", 1) + # else: + # host = authority + raise NotImplementedError("CONNECT over HTTP/2 is not implemented.") else: form_in = "absolute" # FIXME: verify if path or :host contains what we need -- cgit v1.2.3 From 41f4197a0dd73a2b00ea8485608ba9b05a605dd4 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Mon, 25 Jan 2016 21:14:58 +0100 Subject: test PushPromise support --- libmproxy/protocol/http2.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index 54e7572e..71423bf7 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -17,7 +17,7 @@ from .base import Layer from .http import _HttpTransmissionLayer, HttpLayer from .. import utils from ..models import HTTPRequest, HTTPResponse - +from ..exceptions import HttpProtocolException, ProtocolException class SafeH2Connection(H2Connection): def __init__(self, conn, *args, **kwargs): @@ -207,7 +207,14 @@ class Http2Layer(Layer): is_server = (conn == self.server_conn.connection) with source_conn.h2.lock: - events = source_conn.h2.receive_data(utils.http2_read_frame(source_conn.rfile)) + try: + raw_frame = utils.http2_read_frame(source_conn.rfile) + except: + for stream in self.streams.values(): + stream.zombie = time.time() + return + + events = source_conn.h2.receive_data(raw_frame) source_conn.send(source_conn.h2.data_to_send()) for event in events: -- cgit v1.2.3 From cd2b4ea058e82ef9e2dc824a379309d68f08cec4 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Tue, 26 Jan 2016 19:38:17 +0100 Subject: bump h2 dependency and use latest API --- libmproxy/protocol/http2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index 71423bf7..23004497 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -144,7 +144,7 @@ class Http2Layer(Layer): raise HttpException("HTTP body too large. Limit is {}.".format(self.config.body_size_limit)) self.streams[eid].data_queue.put(event.data) self.streams[eid].queued_data_length += len(event.data) - source_conn.h2.safe_increment_flow_control(event.stream_id, len(event.data)) + source_conn.h2.safe_increment_flow_control(event.stream_id, event.flow_controlled_length) elif isinstance(event, StreamEnded): self.streams[eid].timestamp_end = time.time() self.streams[eid].data_finished.set() -- cgit v1.2.3 From 44f83b594701f9756418cf8208c30a9ba5ac4aad Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Tue, 26 Jan 2016 20:44:53 +0100 Subject: add more tests, improve coverage --- libmproxy/protocol/http2.py | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index 23004497..20321d53 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -105,18 +105,19 @@ class Http2Layer(Layer): self.server_conn.send(self.server_conn.h2.data_to_send()) self.active_conns.append(self.server_conn.connection) - def connect(self): - self.ctx.connect() - self.server_conn.connect() - self._initiate_server_conn() + def connect(self): # pragma: no cover + raise ValueError("CONNECT inside an HTTP2 stream is not supported.") + # self.ctx.connect() + # self.server_conn.connect() + # self._initiate_server_conn() - def set_server(self): + def set_server(self): # pragma: no cover raise NotImplementedError("Cannot change server for HTTP2 connections.") - def disconnect(self): + def disconnect(self): # pragma: no cover raise NotImplementedError("Cannot dis- or reconnect in HTTP2 connections.") - def next_layer(self): + def next_layer(self): # pragma: no cover # WebSockets over HTTP/2? # CONNECT for proxying? raise NotImplementedError() @@ -257,13 +258,8 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): if path == '*' or path.startswith("/"): form_in = "relative" elif method == 'CONNECT': # pragma: no cover - # form_in = "authority" - # if ":" in authority: - # host, port = authority.split(":", 1) - # else: - # host = authority raise NotImplementedError("CONNECT over HTTP/2 is not implemented.") - else: + else: # pragma: no cover form_in = "absolute" # FIXME: verify if path or :host contains what we need scheme, host, port, _ = utils.parse_url(path) @@ -359,10 +355,10 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): # RFC 7540 8.1: An HTTP request/response exchange fully consumes a single stream. return True - def connect(self): + def connect(self): # pragma: no cover raise ValueError("CONNECT inside an HTTP2 stream is not supported.") - def set_server(self, *args, **kwargs): + def set_server(self, *args, **kwargs): # pragma: no cover # do not mess with the server connection - all streams share it. pass -- cgit v1.2.3 From 64978968f2383988511dc7021ecd9892f098f723 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Tue, 26 Jan 2016 21:04:40 +0100 Subject: fix authority handling --- libmproxy/protocol/http2.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index 20321d53..423c5caa 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -265,12 +265,12 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): scheme, host, port, _ = utils.parse_url(path) if authority: - host, port = authority.split(':') + host, _, port = authority.partition(':') - if host is None: + if not host: host = 'localhost' - if port is None: - port = 80 if scheme == 'http' else 443 + if not port: + port = 443 if scheme == 'https' else 80 port = int(port) data = [] -- cgit v1.2.3 From 6d3b3994e27b3cd97c74612bc64c7e0457aeb448 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Wed, 27 Jan 2016 12:52:18 +0100 Subject: code formatting --- libmproxy/protocol/http2.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index 423c5caa..5c4586de 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -1,6 +1,5 @@ from __future__ import (absolute_import, print_function, division) -import struct import threading import time import Queue @@ -17,9 +16,12 @@ from .base import Layer from .http import _HttpTransmissionLayer, HttpLayer from .. import utils from ..models import HTTPRequest, HTTPResponse -from ..exceptions import HttpProtocolException, ProtocolException +from ..exceptions import HttpProtocolException +from ..exceptions import ProtocolException + class SafeH2Connection(H2Connection): + def __init__(self, conn, *args, **kwargs): super(SafeH2Connection, self).__init__(*args, **kwargs) self.conn = conn @@ -71,7 +73,7 @@ class SafeH2Connection(H2Connection): if is_zombie(self, stream_id): return max_outbound_frame_size = self.max_outbound_frame_size - frame_chunk = chunk[position:position+max_outbound_frame_size] + frame_chunk = chunk[position:position + max_outbound_frame_size] if self.local_flow_control_window(stream_id) < len(frame_chunk): self.lock.release() time.sleep(0) @@ -88,6 +90,7 @@ class SafeH2Connection(H2Connection): class Http2Layer(Layer): + def __init__(self, ctx, mode): super(Http2Layer, self).__init__(ctx) self.mode = mode @@ -226,6 +229,7 @@ class Http2Layer(Layer): class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): + def __init__(self, ctx, stream_id, request_headers): super(Http2SingleStreamLayer, self).__init__(ctx) self.zombie = None -- cgit v1.2.3 From d8ae7c3e29bcf117eb051f9e74b76fea733c1c64 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Mon, 1 Feb 2016 23:03:15 +0100 Subject: fix tests and use netlib utils --- libmproxy/protocol/http2.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index 5c4586de..4b582f51 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -7,6 +7,7 @@ import Queue from netlib.tcp import ssl_read_select from netlib.exceptions import HttpException from netlib.http import Headers +from netlib.utils import http2_read_raw_frame import h2 from h2.connection import H2Connection @@ -212,7 +213,7 @@ class Http2Layer(Layer): with source_conn.h2.lock: try: - raw_frame = utils.http2_read_frame(source_conn.rfile) + raw_frame = b''.join(http2_read_raw_frame(source_conn.rfile)) except: for stream in self.streams.values(): stream.zombie = time.time() -- cgit v1.2.3 From b007ff3f9b00165bd87c18292d8b23d4acc83ab9 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Mon, 1 Feb 2016 23:27:37 +0100 Subject: fix locking issues --- libmproxy/protocol/http2.py | 1 + 1 file changed, 1 insertion(+) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index 4b582f51..4b3ef0ed 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -72,6 +72,7 @@ class SafeH2Connection(H2Connection): while position < len(chunk): self.lock.acquire() if is_zombie(self, stream_id): + self.lock.release() return max_outbound_frame_size = self.max_outbound_frame_size frame_chunk = chunk[position:position + max_outbound_frame_size] -- cgit v1.2.3 From 68bcc82b8e4c219b024bff0081741c799b9cbd74 Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Tue, 2 Feb 2016 15:49:21 +0100 Subject: do not send RST if there is not upstream stream openend yet --- libmproxy/protocol/http2.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index 4b3ef0ed..fe9f8695 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -161,7 +161,8 @@ class Http2Layer(Layer): other_stream_id = self.streams[eid].client_stream_id else: other_stream_id = self.streams[eid].server_stream_id - other_conn.h2.safe_reset_stream(other_stream_id, event.error_code) + if other_stream_id is not None: + other_conn.h2.safe_reset_stream(other_stream_id, event.error_code) elif isinstance(event, RemoteSettingsChanged): new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()]) other_conn.h2.safe_update_settings(new_settings) -- cgit v1.2.3 From ca5cc34d0b70f3306f62004be7ceb3f0c2053da7 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Tue, 2 Feb 2016 17:48:09 +0100 Subject: cleanup --- libmproxy/protocol/http2.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index fe9f8695..e617f77c 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -61,7 +61,7 @@ class SafeH2Connection(H2Connection): def safe_send_headers(self, is_zombie, stream_id, headers): with self.lock: - if is_zombie(self, stream_id): + if is_zombie(): return self.send_headers(stream_id, headers) self.conn.send(self.data_to_send()) @@ -71,7 +71,7 @@ class SafeH2Connection(H2Connection): position = 0 while position < len(chunk): self.lock.acquire() - if is_zombie(self, stream_id): + if is_zombie(): self.lock.release() return max_outbound_frame_size = self.max_outbound_frame_size @@ -85,7 +85,7 @@ class SafeH2Connection(H2Connection): self.lock.release() position += max_outbound_frame_size with self.lock: - if is_zombie(self, stream_id): + if is_zombie(): return self.end_stream(stream_id) self.conn.send(self.data_to_send()) @@ -246,10 +246,8 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): self.response_arrived = threading.Event() self.data_finished = threading.Event() - def is_zombie(self, h2_conn, stream_id): - if self.zombie: - return True - return False + def is_zombie(self): + return self.zombie is not None def read_request(self): self.data_finished.wait() @@ -300,6 +298,8 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): ) def send_request(self, message): + if self.zombie: + return with self.server_conn.h2.lock: self.server_stream_id = self.server_conn.h2.get_next_available_stream_id() self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id -- cgit v1.2.3 From cf8c063773b70ad37ab0a2125f5ed03c35e17336 Mon Sep 17 00:00:00 2001 From: Thomas Kriechbaumer Date: Tue, 2 Feb 2016 23:54:35 +0100 Subject: fix http2 race condition --- libmproxy/protocol/http2.py | 67 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 53 insertions(+), 14 deletions(-) (limited to 'libmproxy/protocol/http2.py') diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py index e617f77c..de068836 100644 --- a/libmproxy/protocol/http2.py +++ b/libmproxy/protocol/http2.py @@ -167,7 +167,8 @@ class Http2Layer(Layer): new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()]) other_conn.h2.safe_update_settings(new_settings) elif isinstance(event, ConnectionTerminated): - other_conn.h2.safe_close_connection(event.error_code) + # Do not immediately terminate the other connection. + # Some streams might be still sending data to the client. return False elif isinstance(event, PushedStreamReceived): # pushed stream ids should be uniq and not dependent on race conditions @@ -183,7 +184,7 @@ class Http2Layer(Layer): self.streams[event.pushed_stream_id].pushed = True self.streams[event.pushed_stream_id].parent_stream_id = parent_eid self.streams[event.pushed_stream_id].timestamp_end = time.time() - self.streams[event.pushed_stream_id].data_finished.set() + self.streams[event.pushed_stream_id].request_data_finished.set() self.streams[event.pushed_stream_id].start() elif isinstance(event, TrailersReceived): raise NotImplementedError() @@ -240,18 +241,50 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): self.server_stream_id = None self.request_headers = request_headers self.response_headers = None - self.data_queue = Queue.Queue() - self.queued_data_length = 0 + self.pushed = False + + self.request_data_queue = Queue.Queue() + self.request_queued_data_length = 0 + self.request_data_finished = threading.Event() self.response_arrived = threading.Event() - self.data_finished = threading.Event() + self.response_data_queue = Queue.Queue() + self.response_queued_data_length = 0 + self.response_data_finished = threading.Event() + + @property + def data_queue(self): + if self.response_arrived.is_set(): + return self.response_data_queue + else: + return self.request_data_queue + + @property + def queued_data_length(self): + if self.response_arrived.is_set(): + return self.response_queued_data_length + else: + return self.request_queued_data_length + + @property + def data_finished(self): + if self.response_arrived.is_set(): + return self.response_data_finished + else: + return self.request_data_finished + + @queued_data_length.setter + def queued_data_length(self, v): + if self.response_arrived.is_set(): + return self.response_queued_data_length + else: + return self.request_queued_data_length def is_zombie(self): return self.zombie is not None def read_request(self): - self.data_finished.wait() - self.data_finished.clear() + self.request_data_finished.wait() authority = self.request_headers.get(':authority', '') method = self.request_headers.get(':method', 'GET') @@ -279,8 +312,8 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): port = int(port) data = [] - while self.data_queue.qsize() > 0: - data.append(self.data_queue.get()) + while self.request_data_queue.qsize() > 0: + data.append(self.request_data_queue.get()) data = b"".join(data) return HTTPRequest( @@ -298,9 +331,15 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): ) def send_request(self, message): - if self.zombie: + if self.pushed: + # nothing to do here return + with self.server_conn.h2.lock: + # We must not assign a stream id if we are already a zombie. + if self.zombie: + return + self.server_stream_id = self.server_conn.h2.get_next_available_stream_id() self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id @@ -333,12 +372,12 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread): def read_response_body(self, request, response): while True: try: - yield self.data_queue.get(timeout=1) + yield self.response_data_queue.get(timeout=1) except Queue.Empty: pass - if self.data_finished.is_set(): - while self.data_queue.qsize() > 0: - yield self.data_queue.get() + if self.response_data_finished.is_set(): + while self.response_data_queue.qsize() > 0: + yield self.response_data_queue.get() return if self.zombie: return -- cgit v1.2.3