aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/libmproxy/protocol/http2.py
diff options
context:
space:
mode:
Diffstat (limited to 'mitmproxy/libmproxy/protocol/http2.py')
-rw-r--r--mitmproxy/libmproxy/protocol/http2.py437
1 files changed, 437 insertions, 0 deletions
diff --git a/mitmproxy/libmproxy/protocol/http2.py b/mitmproxy/libmproxy/protocol/http2.py
new file mode 100644
index 00000000..c121637c
--- /dev/null
+++ b/mitmproxy/libmproxy/protocol/http2.py
@@ -0,0 +1,437 @@
+from __future__ import (absolute_import, print_function, division)
+
+import threading
+import time
+import Queue
+
+from netlib.tcp import ssl_read_select
+from netlib.exceptions import HttpException
+from netlib.http import Headers
+from netlib.utils import http2_read_raw_frame
+
+import hyperframe
+import h2
+from h2.connection import H2Connection
+from h2.events import *
+
+from .base import Layer
+from .http import _HttpTransmissionLayer, HttpLayer
+from .. import utils
+from ..models import HTTPRequest, HTTPResponse
+
+
+class SafeH2Connection(H2Connection):
+
+ def __init__(self, conn, *args, **kwargs):
+ super(SafeH2Connection, self).__init__(*args, **kwargs)
+ self.conn = conn
+ self.lock = threading.RLock()
+
+ def safe_close_connection(self, error_code):
+ with self.lock:
+ self.close_connection(error_code)
+ self.conn.send(self.data_to_send())
+
+ def safe_increment_flow_control(self, stream_id, length):
+ if length == 0:
+ return
+
+ with self.lock:
+ self.increment_flow_control_window(length)
+ self.conn.send(self.data_to_send())
+ with self.lock:
+ if stream_id in self.streams and not self.streams[stream_id].closed:
+ self.increment_flow_control_window(length, stream_id=stream_id)
+ self.conn.send(self.data_to_send())
+
+ def safe_reset_stream(self, stream_id, error_code):
+ with self.lock:
+ try:
+ self.reset_stream(stream_id, error_code)
+ except h2.exceptions.StreamClosedError:
+ # stream is already closed - good
+ pass
+ self.conn.send(self.data_to_send())
+
+ def safe_update_settings(self, new_settings):
+ with self.lock:
+ self.update_settings(new_settings)
+ self.conn.send(self.data_to_send())
+
+ def safe_send_headers(self, is_zombie, stream_id, headers):
+ with self.lock:
+ if is_zombie():
+ return
+ self.send_headers(stream_id, headers)
+ self.conn.send(self.data_to_send())
+
+ def safe_send_body(self, is_zombie, stream_id, chunks):
+ for chunk in chunks:
+ position = 0
+ while position < len(chunk):
+ self.lock.acquire()
+ if is_zombie():
+ self.lock.release()
+ return
+ max_outbound_frame_size = self.max_outbound_frame_size
+ frame_chunk = chunk[position:position + max_outbound_frame_size]
+ if self.local_flow_control_window(stream_id) < len(frame_chunk):
+ self.lock.release()
+ time.sleep(0)
+ continue
+ self.send_data(stream_id, frame_chunk)
+ self.conn.send(self.data_to_send())
+ self.lock.release()
+ position += max_outbound_frame_size
+ with self.lock:
+ if is_zombie():
+ return
+ self.end_stream(stream_id)
+ self.conn.send(self.data_to_send())
+
+
+class Http2Layer(Layer):
+
+ def __init__(self, ctx, mode):
+ super(Http2Layer, self).__init__(ctx)
+ self.mode = mode
+ self.streams = dict()
+ self.client_reset_streams = []
+ self.server_reset_streams = []
+ self.server_to_client_stream_ids = dict([(0, 0)])
+ self.client_conn.h2 = SafeH2Connection(self.client_conn, client_side=False)
+
+ # make sure that we only pass actual SSL.Connection objects in here,
+ # because otherwise ssl_read_select fails!
+ self.active_conns = [self.client_conn.connection]
+
+ def _initiate_server_conn(self):
+ self.server_conn.h2 = SafeH2Connection(self.server_conn, client_side=True)
+ self.server_conn.h2.initiate_connection()
+ self.server_conn.send(self.server_conn.h2.data_to_send())
+ self.active_conns.append(self.server_conn.connection)
+
+ def connect(self): # pragma: no cover
+ raise ValueError("CONNECT inside an HTTP2 stream is not supported.")
+ # self.ctx.connect()
+ # self.server_conn.connect()
+ # self._initiate_server_conn()
+
+ def set_server(self): # pragma: no cover
+ raise NotImplementedError("Cannot change server for HTTP2 connections.")
+
+ def disconnect(self): # pragma: no cover
+ raise NotImplementedError("Cannot dis- or reconnect in HTTP2 connections.")
+
+ def next_layer(self): # pragma: no cover
+ # WebSockets over HTTP/2?
+ # CONNECT for proxying?
+ raise NotImplementedError()
+
+ def _handle_event(self, event, source_conn, other_conn, is_server):
+ self.log(
+ "HTTP2 Event from {}".format("server" if is_server else "client"),
+ "debug",
+ [repr(event)]
+ )
+
+ if hasattr(event, 'stream_id'):
+ if is_server and event.stream_id % 2 == 1:
+ eid = self.server_to_client_stream_ids[event.stream_id]
+ else:
+ eid = event.stream_id
+
+ if isinstance(event, RequestReceived):
+ headers = Headers([[str(k), str(v)] for k, v in event.headers])
+ self.streams[eid] = Http2SingleStreamLayer(self, eid, headers)
+ self.streams[eid].timestamp_start = time.time()
+ self.streams[eid].start()
+ elif isinstance(event, ResponseReceived):
+ headers = Headers([[str(k), str(v)] for k, v in event.headers])
+ self.streams[eid].queued_data_length = 0
+ self.streams[eid].timestamp_start = time.time()
+ self.streams[eid].response_headers = headers
+ self.streams[eid].response_arrived.set()
+ elif isinstance(event, DataReceived):
+ if self.config.body_size_limit and self.streams[eid].queued_data_length > self.config.body_size_limit:
+ raise HttpException("HTTP body too large. Limit is {}.".format(self.config.body_size_limit))
+ self.streams[eid].data_queue.put(event.data)
+ self.streams[eid].queued_data_length += len(event.data)
+ source_conn.h2.safe_increment_flow_control(event.stream_id, event.flow_controlled_length)
+ elif isinstance(event, StreamEnded):
+ self.streams[eid].timestamp_end = time.time()
+ self.streams[eid].data_finished.set()
+ elif isinstance(event, StreamReset):
+ self.streams[eid].zombie = time.time()
+ self.client_reset_streams.append(self.streams[eid].client_stream_id)
+ if self.streams[eid].server_stream_id:
+ self.server_reset_streams.append(self.streams[eid].server_stream_id)
+ if eid in self.streams and event.error_code == 0x8:
+ if is_server:
+ other_stream_id = self.streams[eid].client_stream_id
+ else:
+ other_stream_id = self.streams[eid].server_stream_id
+ if other_stream_id is not None:
+ other_conn.h2.safe_reset_stream(other_stream_id, event.error_code)
+ elif isinstance(event, RemoteSettingsChanged):
+ new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()])
+ other_conn.h2.safe_update_settings(new_settings)
+ elif isinstance(event, ConnectionTerminated):
+ # Do not immediately terminate the other connection.
+ # Some streams might be still sending data to the client.
+ return False
+ elif isinstance(event, PushedStreamReceived):
+ # pushed stream ids should be uniq and not dependent on race conditions
+ # only the parent stream id must be looked up first
+ parent_eid = self.server_to_client_stream_ids[event.parent_stream_id]
+ with self.client_conn.h2.lock:
+ self.client_conn.h2.push_stream(parent_eid, event.pushed_stream_id, event.headers)
+
+ headers = Headers([[str(k), str(v)] for k, v in event.headers])
+ headers['x-mitmproxy-pushed'] = 'true'
+ self.streams[event.pushed_stream_id] = Http2SingleStreamLayer(self, event.pushed_stream_id, headers)
+ self.streams[event.pushed_stream_id].timestamp_start = time.time()
+ self.streams[event.pushed_stream_id].pushed = True
+ self.streams[event.pushed_stream_id].parent_stream_id = parent_eid
+ self.streams[event.pushed_stream_id].timestamp_end = time.time()
+ self.streams[event.pushed_stream_id].request_data_finished.set()
+ self.streams[event.pushed_stream_id].start()
+ elif isinstance(event, TrailersReceived):
+ raise NotImplementedError()
+
+ return True
+
+ def _cleanup_streams(self):
+ death_time = time.time() - 10
+ for stream_id in self.streams.keys():
+ zombie = self.streams[stream_id].zombie
+ if zombie and zombie <= death_time:
+ self.streams.pop(stream_id, None)
+
+ def __call__(self):
+ if self.server_conn:
+ self._initiate_server_conn()
+
+ preamble = self.client_conn.rfile.read(24)
+ self.client_conn.h2.initiate_connection()
+ self.client_conn.h2.receive_data(preamble)
+ self.client_conn.send(self.client_conn.h2.data_to_send())
+
+ while True:
+ r = ssl_read_select(self.active_conns, 1)
+ for conn in r:
+ source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn
+ other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn
+ is_server = (conn == self.server_conn.connection)
+
+ with source_conn.h2.lock:
+ try:
+ raw_frame = b''.join(http2_read_raw_frame(source_conn.rfile))
+ except:
+ for stream in self.streams.values():
+ stream.zombie = time.time()
+ return
+
+ frame, _ = hyperframe.frame.Frame.parse_frame_header(raw_frame[:9])
+
+ if is_server:
+ list = self.server_reset_streams
+ else:
+ list = self.client_reset_streams
+ if frame.stream_id in list:
+ # this frame belongs to a reset stream - just ignore it
+ if isinstance(frame, hyperframe.frame.HeadersFrame) or isinstance(frame, hyperframe.frame.ContinuationFrame):
+ # we need to keep the hpack-decoder happy too
+ source_conn.h2.decoder.decode(raw_frame[9:])
+ continue
+
+ events = source_conn.h2.receive_data(raw_frame)
+ source_conn.send(source_conn.h2.data_to_send())
+
+ for event in events:
+ if not self._handle_event(event, source_conn, other_conn, is_server):
+ return
+
+ self._cleanup_streams()
+
+
+class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
+
+ def __init__(self, ctx, stream_id, request_headers):
+ super(Http2SingleStreamLayer, self).__init__(ctx)
+ self.zombie = None
+ self.client_stream_id = stream_id
+ self.server_stream_id = None
+ self.request_headers = request_headers
+ self.response_headers = None
+ self.pushed = False
+
+ self.request_data_queue = Queue.Queue()
+ self.request_queued_data_length = 0
+ self.request_data_finished = threading.Event()
+
+ self.response_arrived = threading.Event()
+ self.response_data_queue = Queue.Queue()
+ self.response_queued_data_length = 0
+ self.response_data_finished = threading.Event()
+
+ @property
+ def data_queue(self):
+ if self.response_arrived.is_set():
+ return self.response_data_queue
+ else:
+ return self.request_data_queue
+
+ @property
+ def queued_data_length(self):
+ if self.response_arrived.is_set():
+ return self.response_queued_data_length
+ else:
+ return self.request_queued_data_length
+
+ @property
+ def data_finished(self):
+ if self.response_arrived.is_set():
+ return self.response_data_finished
+ else:
+ return self.request_data_finished
+
+ @queued_data_length.setter
+ def queued_data_length(self, v):
+ if self.response_arrived.is_set():
+ return self.response_queued_data_length
+ else:
+ return self.request_queued_data_length
+
+ def is_zombie(self):
+ return self.zombie is not None
+
+ def read_request(self):
+ self.request_data_finished.wait()
+
+ authority = self.request_headers.get(':authority', '')
+ method = self.request_headers.get(':method', 'GET')
+ scheme = self.request_headers.get(':scheme', 'https')
+ path = self.request_headers.get(':path', '/')
+ host = None
+ port = None
+
+ if path == '*' or path.startswith("/"):
+ form_in = "relative"
+ elif method == 'CONNECT': # pragma: no cover
+ raise NotImplementedError("CONNECT over HTTP/2 is not implemented.")
+ else: # pragma: no cover
+ form_in = "absolute"
+ # FIXME: verify if path or :host contains what we need
+ scheme, host, port, _ = utils.parse_url(path)
+
+ if authority:
+ host, _, port = authority.partition(':')
+
+ if not host:
+ host = 'localhost'
+ if not port:
+ port = 443 if scheme == 'https' else 80
+ port = int(port)
+
+ data = []
+ while self.request_data_queue.qsize() > 0:
+ data.append(self.request_data_queue.get())
+ data = b"".join(data)
+
+ return HTTPRequest(
+ form_in,
+ method,
+ scheme,
+ host,
+ port,
+ path,
+ b"HTTP/2.0",
+ self.request_headers,
+ data,
+ timestamp_start=self.timestamp_start,
+ timestamp_end=self.timestamp_end,
+ )
+
+ def send_request(self, message):
+ if self.pushed:
+ # nothing to do here
+ return
+
+ with self.server_conn.h2.lock:
+ # We must not assign a stream id if we are already a zombie.
+ if self.zombie:
+ return
+
+ self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
+ self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
+
+ self.server_conn.h2.safe_send_headers(
+ self.is_zombie,
+ self.server_stream_id,
+ message.headers
+ )
+ self.server_conn.h2.safe_send_body(
+ self.is_zombie,
+ self.server_stream_id,
+ message.body
+ )
+
+ def read_response_headers(self):
+ self.response_arrived.wait()
+
+ status_code = int(self.response_headers.get(':status', 502))
+
+ return HTTPResponse(
+ http_version=b"HTTP/2.0",
+ status_code=status_code,
+ reason='',
+ headers=self.response_headers,
+ content=None,
+ timestamp_start=self.timestamp_start,
+ timestamp_end=self.timestamp_end,
+ )
+
+ def read_response_body(self, request, response):
+ while True:
+ try:
+ yield self.response_data_queue.get(timeout=1)
+ except Queue.Empty:
+ pass
+ if self.response_data_finished.is_set():
+ while self.response_data_queue.qsize() > 0:
+ yield self.response_data_queue.get()
+ return
+ if self.zombie:
+ return
+
+ def send_response_headers(self, response):
+ self.client_conn.h2.safe_send_headers(
+ self.is_zombie,
+ self.client_stream_id,
+ response.headers
+ )
+
+ def send_response_body(self, _response, chunks):
+ self.client_conn.h2.safe_send_body(
+ self.is_zombie,
+ self.client_stream_id,
+ chunks
+ )
+
+ def check_close_connection(self, flow):
+ # This layer only handles a single stream.
+ # RFC 7540 8.1: An HTTP request/response exchange fully consumes a single stream.
+ return True
+
+ def connect(self): # pragma: no cover
+ raise ValueError("CONNECT inside an HTTP2 stream is not supported.")
+
+ def set_server(self, *args, **kwargs): # pragma: no cover
+ # do not mess with the server connection - all streams share it.
+ pass
+
+ def run(self):
+ layer = HttpLayer(self, self.mode)
+ layer()
+ self.zombie = time.time()