aboutsummaryrefslogtreecommitdiffstats
path: root/mitmproxy/protocol/http2.py
diff options
context:
space:
mode:
Diffstat (limited to 'mitmproxy/protocol/http2.py')
-rw-r--r--mitmproxy/protocol/http2.py115
1 files changed, 59 insertions, 56 deletions
diff --git a/mitmproxy/protocol/http2.py b/mitmproxy/protocol/http2.py
index 03d4aefc..1cc12792 100644
--- a/mitmproxy/protocol/http2.py
+++ b/mitmproxy/protocol/http2.py
@@ -4,18 +4,20 @@ import threading
import time
from six.moves import queue
-import h2
+import traceback
import six
from h2.connection import H2Connection
+from h2.exceptions import StreamClosedError
+from h2 import events
from netlib.tcp import ssl_read_select
from netlib.exceptions import HttpException
from netlib.http import Headers
-from netlib.utils import http2_read_raw_frame
+from netlib.utils import http2_read_raw_frame, parse_url
from .base import Layer
from .http import _HttpTransmissionLayer, HttpLayer
-from .. import utils
+from ..exceptions import ProtocolException, Http2ProtocolException
from ..models import HTTPRequest, HTTPResponse
@@ -26,11 +28,6 @@ class SafeH2Connection(H2Connection):
self.conn = conn
self.lock = threading.RLock()
- def safe_close_connection(self, error_code):
- with self.lock:
- self.close_connection(error_code)
- self.conn.send(self.data_to_send())
-
def safe_increment_flow_control(self, stream_id, length):
if length == 0:
return
@@ -47,7 +44,7 @@ class SafeH2Connection(H2Connection):
with self.lock:
try:
self.reset_stream(stream_id, error_code)
- except h2.exceptions.StreamClosedError:
+ except StreamClosedError: # pragma: no cover
# stream is already closed - good
pass
self.conn.send(self.data_to_send())
@@ -59,9 +56,9 @@ class SafeH2Connection(H2Connection):
def safe_send_headers(self, is_zombie, stream_id, headers):
with self.lock:
- if is_zombie():
- return
- self.send_headers(stream_id, headers)
+ if is_zombie(): # pragma: no cover
+ raise Http2ProtocolException("Zombie Stream")
+ self.send_headers(stream_id, headers.fields)
self.conn.send(self.data_to_send())
def safe_send_body(self, is_zombie, stream_id, chunks):
@@ -69,9 +66,9 @@ class SafeH2Connection(H2Connection):
position = 0
while position < len(chunk):
self.lock.acquire()
- if is_zombie():
+ if is_zombie(): # pragma: no cover
self.lock.release()
- return
+ raise Http2ProtocolException("Zombie Stream")
max_outbound_frame_size = self.max_outbound_frame_size
frame_chunk = chunk[position:position + max_outbound_frame_size]
if self.local_flow_control_window(stream_id) < len(frame_chunk):
@@ -83,8 +80,8 @@ class SafeH2Connection(H2Connection):
self.lock.release()
position += max_outbound_frame_size
with self.lock:
- if is_zombie():
- return
+ if is_zombie(): # pragma: no cover
+ raise Http2ProtocolException("Zombie Stream")
self.end_stream(stream_id)
self.conn.send(self.data_to_send())
@@ -95,32 +92,27 @@ class Http2Layer(Layer):
super(Http2Layer, self).__init__(ctx)
self.mode = mode
self.streams = dict()
- self.client_reset_streams = []
- self.server_reset_streams = []
self.server_to_client_stream_ids = dict([(0, 0)])
- self.client_conn.h2 = SafeH2Connection(self.client_conn, client_side=False)
+ self.client_conn.h2 = SafeH2Connection(self.client_conn, client_side=False, header_encoding=False)
# make sure that we only pass actual SSL.Connection objects in here,
# because otherwise ssl_read_select fails!
self.active_conns = [self.client_conn.connection]
def _initiate_server_conn(self):
- self.server_conn.h2 = SafeH2Connection(self.server_conn, client_side=True)
+ self.server_conn.h2 = SafeH2Connection(self.server_conn, client_side=True, header_encoding=False)
self.server_conn.h2.initiate_connection()
self.server_conn.send(self.server_conn.h2.data_to_send())
self.active_conns.append(self.server_conn.connection)
def connect(self): # pragma: no cover
- raise ValueError("CONNECT inside an HTTP2 stream is not supported.")
- # self.ctx.connect()
- # self.server_conn.connect()
- # self._initiate_server_conn()
+ raise Http2ProtocolException("HTTP2 layer should already have a connection.")
def set_server(self): # pragma: no cover
- raise NotImplementedError("Cannot change server for HTTP2 connections.")
+ raise Http2ProtocolException("Cannot change server for HTTP2 connections.")
def disconnect(self): # pragma: no cover
- raise NotImplementedError("Cannot dis- or reconnect in HTTP2 connections.")
+ raise Http2ProtocolException("Cannot dis- or reconnect in HTTP2 connections.")
def next_layer(self): # pragma: no cover
# WebSockets over HTTP/2?
@@ -140,31 +132,28 @@ class Http2Layer(Layer):
else:
eid = event.stream_id
- if isinstance(event, h2.events.RequestReceived):
- headers = Headers([[str(k), str(v)] for k, v in event.headers])
+ if isinstance(event, events.RequestReceived):
+ headers = Headers([[k, v] for k, v in event.headers])
self.streams[eid] = Http2SingleStreamLayer(self, eid, headers)
self.streams[eid].timestamp_start = time.time()
self.streams[eid].start()
- elif isinstance(event, h2.events.ResponseReceived):
- headers = Headers([[str(k), str(v)] for k, v in event.headers])
+ elif isinstance(event, events.ResponseReceived):
+ headers = Headers([[k, v] for k, v in event.headers])
self.streams[eid].queued_data_length = 0
self.streams[eid].timestamp_start = time.time()
self.streams[eid].response_headers = headers
self.streams[eid].response_arrived.set()
- elif isinstance(event, h2.events.DataReceived):
+ elif isinstance(event, events.DataReceived):
if self.config.body_size_limit and self.streams[eid].queued_data_length > self.config.body_size_limit:
raise HttpException("HTTP body too large. Limit is {}.".format(self.config.body_size_limit))
self.streams[eid].data_queue.put(event.data)
self.streams[eid].queued_data_length += len(event.data)
source_conn.h2.safe_increment_flow_control(event.stream_id, event.flow_controlled_length)
- elif isinstance(event, h2.events.StreamEnded):
+ elif isinstance(event, events.StreamEnded):
self.streams[eid].timestamp_end = time.time()
self.streams[eid].data_finished.set()
- elif isinstance(event, h2.events.StreamReset):
+ elif isinstance(event, events.StreamReset):
self.streams[eid].zombie = time.time()
- self.client_reset_streams.append(self.streams[eid].client_stream_id)
- if self.streams[eid].server_stream_id:
- self.server_reset_streams.append(self.streams[eid].server_stream_id)
if eid in self.streams and event.error_code == 0x8:
if is_server:
other_stream_id = self.streams[eid].client_stream_id
@@ -172,14 +161,14 @@ class Http2Layer(Layer):
other_stream_id = self.streams[eid].server_stream_id
if other_stream_id is not None:
other_conn.h2.safe_reset_stream(other_stream_id, event.error_code)
- elif isinstance(event, h2.events.RemoteSettingsChanged):
+ elif isinstance(event, events.RemoteSettingsChanged):
new_settings = dict([(id, cs.new_value) for (id, cs) in six.iteritems(event.changed_settings)])
other_conn.h2.safe_update_settings(new_settings)
- elif isinstance(event, h2.events.ConnectionTerminated):
+ elif isinstance(event, events.ConnectionTerminated):
# Do not immediately terminate the other connection.
# Some streams might be still sending data to the client.
return False
- elif isinstance(event, h2.events.PushedStreamReceived):
+ elif isinstance(event, events.PushedStreamReceived):
# pushed stream ids should be uniq and not dependent on race conditions
# only the parent stream id must be looked up first
parent_eid = self.server_to_client_stream_ids[event.parent_stream_id]
@@ -195,7 +184,7 @@ class Http2Layer(Layer):
self.streams[event.pushed_stream_id].timestamp_end = time.time()
self.streams[event.pushed_stream_id].request_data_finished.set()
self.streams[event.pushed_stream_id].start()
- elif isinstance(event, h2.events.TrailersReceived):
+ elif isinstance(event, events.TrailersReceived):
raise NotImplementedError()
return True
@@ -227,14 +216,16 @@ class Http2Layer(Layer):
try:
raw_frame = b''.join(http2_read_raw_frame(source_conn.rfile))
except:
+ # read frame failed: connection closed
+ # kill all streams
for stream in self.streams.values():
stream.zombie = time.time()
return
- events = source_conn.h2.receive_data(raw_frame)
+ incoming_events = source_conn.h2.receive_data(raw_frame)
source_conn.send(source_conn.h2.data_to_send())
- for event in events:
+ for event in incoming_events:
if not self._handle_event(event, source_conn, other_conn, is_server):
return
@@ -244,7 +235,7 @@ class Http2Layer(Layer):
class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
def __init__(self, ctx, stream_id, request_headers):
- super(Http2SingleStreamLayer, self).__init__(ctx)
+ super(Http2SingleStreamLayer, self).__init__(ctx, name="Thread-Http2SingleStreamLayer-{}".format(stream_id))
self.zombie = None
self.client_stream_id = stream_id
self.server_stream_id = None
@@ -284,10 +275,7 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
@queued_data_length.setter
def queued_data_length(self, v):
- if self.response_arrived.is_set():
- return self.response_queued_data_length
- else:
- return self.request_queued_data_length
+ self.request_queued_data_length = v
def is_zombie(self):
return self.zombie is not None
@@ -309,7 +297,7 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
else: # pragma: no cover
first_line_format = "absolute"
# FIXME: verify if path or :host contains what we need
- scheme, host, port, _ = utils.parse_url(path)
+ scheme, host, port, _ = parse_url(path)
if authority:
host, _, port = authority.partition(':')
@@ -339,6 +327,9 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
timestamp_end=self.timestamp_end,
)
+ def read_request_body(self, request): # pragma: no cover
+ raise NotImplementedError()
+
def send_request(self, message):
if self.pushed:
# nothing to do here
@@ -346,8 +337,8 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
with self.server_conn.h2.lock:
# We must not assign a stream id if we are already a zombie.
- if self.zombie:
- return
+ if self.zombie: # pragma: no cover
+ raise Http2ProtocolException("Zombie Stream")
self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
@@ -362,6 +353,8 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
self.server_stream_id,
message.body
)
+ if self.zombie: # pragma: no cover
+ raise Http2ProtocolException("Zombie Stream")
def read_response_headers(self):
self.response_arrived.wait()
@@ -388,8 +381,8 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
while self.response_data_queue.qsize() > 0:
yield self.response_data_queue.get()
return
- if self.zombie:
- return
+ if self.zombie: # pragma: no cover
+ raise Http2ProtocolException("Zombie Stream")
def send_response_headers(self, response):
self.client_conn.h2.safe_send_headers(
@@ -397,6 +390,8 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
self.client_stream_id,
response.headers
)
+ if self.zombie: # pragma: no cover
+ raise Http2ProtocolException("Zombie Stream")
def send_response_body(self, _response, chunks):
self.client_conn.h2.safe_send_body(
@@ -404,20 +399,28 @@ class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
self.client_stream_id,
chunks
)
+ if self.zombie: # pragma: no cover
+ raise Http2ProtocolException("Zombie Stream")
def check_close_connection(self, flow):
# This layer only handles a single stream.
# RFC 7540 8.1: An HTTP request/response exchange fully consumes a single stream.
return True
- def connect(self): # pragma: no cover
- raise ValueError("CONNECT inside an HTTP2 stream is not supported.")
-
def set_server(self, *args, **kwargs): # pragma: no cover
# do not mess with the server connection - all streams share it.
pass
def run(self):
+ self()
+
+ def __call__(self):
layer = HttpLayer(self, self.mode)
- layer()
+
+ try:
+ layer()
+ except ProtocolException as e:
+ self.log(repr(e), "info")
+ self.log(traceback.format_exc(), "debug")
+
self.zombie = time.time()