aboutsummaryrefslogtreecommitdiffstats
path: root/libmproxy
diff options
context:
space:
mode:
authorThomas Kriechbaumer <thomas@kriechbaumer.name>2016-01-20 19:58:24 +0100
committerThomas Kriechbaumer <thomas@kriechbaumer.name>2016-02-04 09:52:03 +0100
commit936422cd735ac7d2bf19633c24dfeb1175cb3377 (patch)
treef5779b04c4917ca8325c7a43cc4acd0ad9d6c98e /libmproxy
parenta05a961e7fa44259fe2768f43e53dde8888860ba (diff)
downloadmitmproxy-936422cd735ac7d2bf19633c24dfeb1175cb3377.tar.gz
mitmproxy-936422cd735ac7d2bf19633c24dfeb1175cb3377.tar.bz2
mitmproxy-936422cd735ac7d2bf19633c24dfeb1175cb3377.zip
split files into http, http1, and http2
Diffstat (limited to 'libmproxy')
-rw-r--r--libmproxy/protocol/__init__.py8
-rw-r--r--libmproxy/protocol/http.py425
-rw-r--r--libmproxy/protocol/http1.py75
-rw-r--r--libmproxy/protocol/http2.py365
4 files changed, 452 insertions, 421 deletions
diff --git a/libmproxy/protocol/__init__.py b/libmproxy/protocol/__init__.py
index d46f16f5..ea958d06 100644
--- a/libmproxy/protocol/__init__.py
+++ b/libmproxy/protocol/__init__.py
@@ -27,15 +27,19 @@ as late as possible; this makes server replay without any outgoing connections p
from __future__ import (absolute_import, print_function, division)
from .base import Layer, ServerConnectionMixin, Kill
-from .http import Http1Layer, UpstreamConnectLayer, Http2Layer
from .tls import TlsLayer
from .tls import is_tls_record_magic
from .tls import TlsClientHello
+from .http import UpstreamConnectLayer
+from .http1 import Http1Layer
+from .http2 import Http2Layer
from .rawtcp import RawTCPLayer
__all__ = [
"Layer", "ServerConnectionMixin", "Kill",
- "Http1Layer", "UpstreamConnectLayer", "Http2Layer",
"TlsLayer", "is_tls_record_magic", "TlsClientHello",
+ "UpstreamConnectLayer",
+ "Http1Layer",
+ "Http2Layer",
"RawTCPLayer",
]
diff --git a/libmproxy/protocol/http.py b/libmproxy/protocol/http.py
index a8626af8..ee9d2d46 100644
--- a/libmproxy/protocol/http.py
+++ b/libmproxy/protocol/http.py
@@ -3,26 +3,17 @@ from __future__ import (absolute_import, print_function, division)
import sys
import traceback
import six
-import struct
-import threading
-import time
-import Queue
from netlib import tcp
from netlib.exceptions import HttpException, HttpReadDisconnect, NetlibException
-from netlib.http import http1, Headers, CONTENT_MISSING
-from netlib.tcp import Address, ssl_read_select
+from netlib.http import Headers, CONTENT_MISSING
-import h2
-from h2.connection import H2Connection
-from h2.events import *
+from h2.exceptions import H2Error
-from .base import Layer, Kill
from .. import utils
from ..exceptions import HttpProtocolException, ProtocolException
from ..models import (
HTTPFlow,
- HTTPRequest,
HTTPResponse,
make_error_response,
make_connect_response,
@@ -30,8 +21,9 @@ from ..models import (
expect_continue_response
)
+from .base import Layer, Kill
-class _HttpLayer(Layer):
+class _HttpTransmissionLayer(Layer):
def read_request(self):
raise NotImplementedError()
@@ -71,411 +63,6 @@ class _HttpLayer(Layer):
raise NotImplementedError()
-class Http1Layer(_HttpLayer):
- def __init__(self, ctx, mode):
- super(Http1Layer, self).__init__(ctx)
- self.mode = mode
-
- def read_request(self):
- req = http1.read_request(self.client_conn.rfile, body_size_limit=self.config.body_size_limit)
- return HTTPRequest.wrap(req)
-
- def read_request_body(self, request):
- expected_size = http1.expected_http_body_size(request)
- return http1.read_body(self.client_conn.rfile, expected_size, self.config.body_size_limit)
-
- def send_request(self, request):
- self.server_conn.wfile.write(http1.assemble_request(request))
- self.server_conn.wfile.flush()
-
- def read_response_headers(self):
- resp = http1.read_response_head(self.server_conn.rfile)
- return HTTPResponse.wrap(resp)
-
- def read_response_body(self, request, response):
- expected_size = http1.expected_http_body_size(request, response)
- return http1.read_body(self.server_conn.rfile, expected_size, self.config.body_size_limit)
-
- def send_response_headers(self, response):
- raw = http1.assemble_response_head(response)
- self.client_conn.wfile.write(raw)
- self.client_conn.wfile.flush()
-
- def send_response_body(self, response, chunks):
- for chunk in http1.assemble_body(response.headers, chunks):
- self.client_conn.wfile.write(chunk)
- self.client_conn.wfile.flush()
-
- def check_close_connection(self, flow):
- request_close = http1.connection_close(
- flow.request.http_version,
- flow.request.headers
- )
- response_close = http1.connection_close(
- flow.response.http_version,
- flow.response.headers
- )
- read_until_eof = http1.expected_http_body_size(flow.request, flow.response) == -1
- close_connection = request_close or response_close or read_until_eof
- if flow.request.form_in == "authority" and flow.response.status_code == 200:
- # Workaround for https://github.com/mitmproxy/mitmproxy/issues/313:
- # Charles Proxy sends a CONNECT response with HTTP/1.0
- # and no Content-Length header
-
- return False
- return close_connection
-
- def __call__(self):
- layer = HttpLayer(self, self.mode)
- layer()
-
-
-class SafeH2Connection(H2Connection):
- def __init__(self, conn, *args, **kwargs):
- super(SafeH2Connection, self).__init__(*args, **kwargs)
- self.conn = conn
- self.lock = threading.RLock()
-
- def safe_close_connection(self, error_code):
- with self.lock:
- self.close_connection(error_code)
- self.conn.send(self.data_to_send())
-
- def safe_increment_flow_control(self, stream_id, length):
- if length == 0:
- return
-
- with self.lock:
- self.increment_flow_control_window(length)
- self.conn.send(self.data_to_send())
- with self.lock:
- if stream_id in self.streams and not self.streams[stream_id].closed:
- self.increment_flow_control_window(length, stream_id=stream_id)
- self.conn.send(self.data_to_send())
-
- def safe_reset_stream(self, stream_id, error_code):
- with self.lock:
- try:
- self.reset_stream(stream_id, error_code)
- except h2.exceptions.StreamClosedError:
- # stream is already closed - good
- pass
- self.conn.send(self.data_to_send())
-
- def safe_update_settings(self, new_settings):
- with self.lock:
- self.update_settings(new_settings)
- self.conn.send(self.data_to_send())
-
- def safe_send_headers(self, is_zombie, stream_id, headers):
- with self.lock:
- if is_zombie(self, stream_id):
- return
- self.send_headers(stream_id, headers)
- self.conn.send(self.data_to_send())
-
- def safe_send_body(self, is_zombie, stream_id, chunks):
- for chunk in chunks:
- position = 0
- while position < len(chunk):
- self.lock.acquire()
- if is_zombie(self, stream_id):
- return
- max_outbound_frame_size = self.max_outbound_frame_size
- frame_chunk = chunk[position:position+max_outbound_frame_size]
- if self.local_flow_control_window(stream_id) < len(frame_chunk):
- self.lock.release()
- time.sleep(0)
- continue
- self.send_data(stream_id, frame_chunk)
- self.conn.send(self.data_to_send())
- self.lock.release()
- position += max_outbound_frame_size
- with self.lock:
- if is_zombie(self, stream_id):
- return
- self.end_stream(stream_id)
- self.conn.send(self.data_to_send())
-
-
-class Http2Layer(Layer):
- def __init__(self, ctx, mode):
- super(Http2Layer, self).__init__(ctx)
- self.mode = mode
- self.streams = dict()
- self.server_to_client_stream_ids = dict([(0, 0)])
- self.client_conn.h2 = SafeH2Connection(self.client_conn, client_side=False)
-
- # make sure that we only pass actual SSL.Connection objects in here,
- # because otherwise ssl_read_select fails!
- self.active_conns = [self.client_conn.connection]
-
- def _initiate_server_conn(self):
- self.server_conn.h2 = SafeH2Connection(self.server_conn, client_side=True)
- self.server_conn.h2.initiate_connection()
- self.server_conn.send(self.server_conn.h2.data_to_send())
- self.active_conns.append(self.server_conn.connection)
-
- def connect(self):
- self.ctx.connect()
- self.server_conn.connect()
- self._initiate_server_conn()
-
- def set_server(self):
- raise NotImplementedError("Cannot change server for HTTP2 connections.")
-
- def disconnect(self):
- raise NotImplementedError("Cannot dis- or reconnect in HTTP2 connections.")
-
- def next_layer(self):
- # WebSockets over HTTP/2?
- # CONNECT for proxying?
- raise NotImplementedError()
-
- def _handle_event(self, event, source_conn, other_conn, is_server):
- if hasattr(event, 'stream_id'):
- if is_server and event.stream_id % 2 == 1:
- eid = self.server_to_client_stream_ids[event.stream_id]
- else:
- eid = event.stream_id
-
- if isinstance(event, RequestReceived):
- headers = Headers([[str(k), str(v)] for k, v in event.headers])
- self.streams[eid] = Http2SingleStreamLayer(self, eid, headers)
- self.streams[eid].timestamp_start = time.time()
- self.streams[eid].start()
- elif isinstance(event, ResponseReceived):
- headers = Headers([[str(k), str(v)] for k, v in event.headers])
- self.streams[eid].queued_data_length = 0
- self.streams[eid].timestamp_start = time.time()
- self.streams[eid].response_headers = headers
- self.streams[eid].response_arrived.set()
- elif isinstance(event, DataReceived):
- if self.config.body_size_limit and self.streams[eid].queued_data_length > self.config.body_size_limit:
- raise HttpException("HTTP body too large. Limit is {}.".format(self.config.body_size_limit))
- self.streams[eid].data_queue.put(event.data)
- self.streams[eid].queued_data_length += len(event.data)
- source_conn.h2.safe_increment_flow_control(event.stream_id, len(event.data))
- elif isinstance(event, StreamEnded):
- self.streams[eid].timestamp_end = time.time()
- self.streams[eid].data_finished.set()
- elif isinstance(event, StreamReset):
- self.streams[eid].zombie = time.time()
- if eid in self.streams and event.error_code == 0x8:
- if is_server:
- other_stream_id = self.streams[eid].client_stream_id
- else:
- other_stream_id = self.streams[eid].server_stream_id
- other_conn.h2.safe_reset_stream(other_stream_id, event.error_code)
- elif isinstance(event, RemoteSettingsChanged):
- new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()])
- other_conn.h2.safe_update_settings(new_settings)
- elif isinstance(event, ConnectionTerminated):
- other_conn.h2.safe_close_connection(event.error_code)
- return False
- elif isinstance(event, PushedStreamReceived):
- # pushed stream ids should be uniq and not dependent on race conditions
- # only the parent stream id must be looked up first
- parent_eid = self.server_to_client_stream_ids[event.parent_stream_id]
- with self.client_conn.h2.lock:
- self.client_conn.h2.push_stream(parent_eid, event.pushed_stream_id, event.headers)
-
- headers = Headers([[str(k), str(v)] for k, v in event.headers])
- headers['x-mitmproxy-pushed'] = 'true'
- self.streams[event.pushed_stream_id] = Http2SingleStreamLayer(self, event.pushed_stream_id, headers)
- self.streams[event.pushed_stream_id].timestamp_start = time.time()
- self.streams[event.pushed_stream_id].pushed = True
- self.streams[event.pushed_stream_id].parent_stream_id = parent_eid
- self.streams[event.pushed_stream_id].timestamp_end = time.time()
- self.streams[event.pushed_stream_id].data_finished.set()
- self.streams[event.pushed_stream_id].start()
- elif isinstance(event, TrailersReceived):
- raise NotImplementedError()
-
- return True
-
- def _cleanup_streams(self):
- death_time = time.time() - 10
- for stream_id in self.streams.keys():
- zombie = self.streams[stream_id].zombie
- if zombie and zombie <= death_time:
- self.streams.pop(stream_id, None)
-
- def __call__(self):
- if self.server_conn:
- self._initiate_server_conn()
-
- preamble = self.client_conn.rfile.read(24)
- self.client_conn.h2.initiate_connection()
- self.client_conn.h2.receive_data(preamble)
- self.client_conn.send(self.client_conn.h2.data_to_send())
-
- while True:
- r = ssl_read_select(self.active_conns, 1)
- for conn in r:
- source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn
- other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn
- is_server = (conn == self.server_conn.connection)
-
- field = source_conn.rfile.peek(3)
- length = int(field.encode('hex'), 16)
- raw_frame = source_conn.rfile.safe_read(9 + length)
-
- with source_conn.h2.lock:
- events = source_conn.h2.receive_data(raw_frame)
- source_conn.send(source_conn.h2.data_to_send())
-
- for event in events:
- if not self._handle_event(event, source_conn, other_conn, is_server):
- return
-
- self._cleanup_streams()
-
-
-class Http2SingleStreamLayer(_HttpLayer, threading.Thread):
- def __init__(self, ctx, stream_id, request_headers):
- super(Http2SingleStreamLayer, self).__init__(ctx)
- self.zombie = None
- self.client_stream_id = stream_id
- self.server_stream_id = None
- self.request_headers = request_headers
- self.response_headers = None
- self.data_queue = Queue.Queue()
- self.queued_data_length = 0
-
- self.response_arrived = threading.Event()
- self.data_finished = threading.Event()
-
- def is_zombie(self, h2_conn, stream_id):
- if self.zombie:
- return True
- return False
-
- def read_request(self):
- self.data_finished.wait()
- self.data_finished.clear()
-
- authority = self.request_headers.get(':authority', '')
- method = self.request_headers.get(':method', 'GET')
- scheme = self.request_headers.get(':scheme', 'https')
- path = self.request_headers.get(':path', '/')
- host = None
- port = None
-
- if path == '*' or path.startswith("/"):
- form_in = "relative"
- elif method == 'CONNECT':
- form_in = "authority"
- if ":" in authority:
- host, port = authority.split(":", 1)
- else:
- host = authority
- else:
- form_in = "absolute"
- # FIXME: verify if path or :host contains what we need
- scheme, host, port, _ = utils.parse_url(path)
-
- if host is None:
- host = 'localhost'
- if port is None:
- port = 80 if scheme == 'http' else 443
- port = int(port)
-
- data = []
- while self.data_queue.qsize() > 0:
- data.append(self.data_queue.get())
- data = b"".join(data)
-
- return HTTPRequest(
- form_in,
- method,
- scheme,
- host,
- port,
- path,
- (2, 0),
- self.request_headers,
- data,
- timestamp_start=self.timestamp_start,
- timestamp_end=self.timestamp_end,
- )
-
- def send_request(self, message):
- with self.server_conn.h2.lock:
- self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
- self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
-
- self.server_conn.h2.safe_send_headers(
- self.is_zombie,
- self.server_stream_id,
- message.headers
- )
- self.server_conn.h2.safe_send_body(
- self.is_zombie,
- self.server_stream_id,
- message.body
- )
-
- def read_response_headers(self):
- self.response_arrived.wait()
-
- status_code = int(self.response_headers.get(':status', 502))
-
- return HTTPResponse(
- http_version=(2, 0),
- status_code=status_code,
- reason='',
- headers=self.response_headers,
- content=None,
- timestamp_start=self.timestamp_start,
- timestamp_end=self.timestamp_end,
- )
-
- def read_response_body(self, request, response):
- while True:
- try:
- yield self.data_queue.get(timeout=1)
- except Queue.Empty:
- pass
- if self.data_finished.is_set():
- while self.data_queue.qsize() > 0:
- yield self.data_queue.get()
- return
- if self.zombie:
- return
-
- def send_response_headers(self, response):
- self.client_conn.h2.safe_send_headers(
- self.is_zombie,
- self.client_stream_id,
- response.headers
- )
-
- def send_response_body(self, _response, chunks):
- self.client_conn.h2.safe_send_body(
- self.is_zombie,
- self.client_stream_id,
- chunks
- )
-
- def check_close_connection(self, flow):
- # This layer only handles a single stream.
- # RFC 7540 8.1: An HTTP request/response exchange fully consumes a single stream.
- return True
-
- def connect(self):
- raise ValueError("CONNECT inside an HTTP2 stream is not supported.")
-
- def set_server(self, *args, **kwargs):
- # do not mess with the server connection - all streams share it.
- pass
-
- def run(self):
- layer = HttpLayer(self, self.mode)
- layer()
- self.zombie = time.time()
-
-
class ConnectServerConnection(object):
"""
@@ -531,7 +118,7 @@ class UpstreamConnectLayer(Layer):
def set_server(self, address, server_tls=None, sni=None):
if self.ctx.server_conn:
self.ctx.disconnect()
- address = Address.wrap(address)
+ address = tcp.Address.wrap(address)
self.connect_request.host = address.host
self.connect_request.port = address.port
self.server_conn.address = address
@@ -646,7 +233,7 @@ class HttpLayer(Layer):
try:
response = make_error_response(code, message)
self.send_response(response)
- except NetlibException, h2.exceptions.H2Error:
+ except NetlibException, H2Error:
pass
def change_upstream_proxy_server(self, address):
diff --git a/libmproxy/protocol/http1.py b/libmproxy/protocol/http1.py
new file mode 100644
index 00000000..0d6095df
--- /dev/null
+++ b/libmproxy/protocol/http1.py
@@ -0,0 +1,75 @@
+from __future__ import (absolute_import, print_function, division)
+
+import sys
+import traceback
+import six
+import struct
+import threading
+import time
+import Queue
+
+from netlib import tcp
+from netlib.http import http1
+
+from .http import _HttpTransmissionLayer, HttpLayer
+from .. import utils
+from ..models import HTTPRequest, HTTPResponse
+
+
+class Http1Layer(_HttpTransmissionLayer):
+ def __init__(self, ctx, mode):
+ super(Http1Layer, self).__init__(ctx)
+ self.mode = mode
+
+ def read_request(self):
+ req = http1.read_request(self.client_conn.rfile, body_size_limit=self.config.body_size_limit)
+ return HTTPRequest.wrap(req)
+
+ def read_request_body(self, request):
+ expected_size = http1.expected_http_body_size(request)
+ return http1.read_body(self.client_conn.rfile, expected_size, self.config.body_size_limit)
+
+ def send_request(self, request):
+ self.server_conn.wfile.write(http1.assemble_request(request))
+ self.server_conn.wfile.flush()
+
+ def read_response_headers(self):
+ resp = http1.read_response_head(self.server_conn.rfile)
+ return HTTPResponse.wrap(resp)
+
+ def read_response_body(self, request, response):
+ expected_size = http1.expected_http_body_size(request, response)
+ return http1.read_body(self.server_conn.rfile, expected_size, self.config.body_size_limit)
+
+ def send_response_headers(self, response):
+ raw = http1.assemble_response_head(response)
+ self.client_conn.wfile.write(raw)
+ self.client_conn.wfile.flush()
+
+ def send_response_body(self, response, chunks):
+ for chunk in http1.assemble_body(response.headers, chunks):
+ self.client_conn.wfile.write(chunk)
+ self.client_conn.wfile.flush()
+
+ def check_close_connection(self, flow):
+ request_close = http1.connection_close(
+ flow.request.http_version,
+ flow.request.headers
+ )
+ response_close = http1.connection_close(
+ flow.response.http_version,
+ flow.response.headers
+ )
+ read_until_eof = http1.expected_http_body_size(flow.request, flow.response) == -1
+ close_connection = request_close or response_close or read_until_eof
+ if flow.request.form_in == "authority" and flow.response.status_code == 200:
+ # Workaround for https://github.com/mitmproxy/mitmproxy/issues/313:
+ # Charles Proxy sends a CONNECT response with HTTP/1.0
+ # and no Content-Length header
+
+ return False
+ return close_connection
+
+ def __call__(self):
+ layer = HttpLayer(self, self.mode)
+ layer()
diff --git a/libmproxy/protocol/http2.py b/libmproxy/protocol/http2.py
new file mode 100644
index 00000000..a6d1b73f
--- /dev/null
+++ b/libmproxy/protocol/http2.py
@@ -0,0 +1,365 @@
+from __future__ import (absolute_import, print_function, division)
+
+import struct
+import threading
+import time
+import Queue
+
+from netlib.tcp import ssl_read_select
+from netlib.exceptions import HttpException
+from netlib.http import Headers
+
+import h2
+from h2.connection import H2Connection
+from h2.events import *
+
+from .base import Layer
+from .http import _HttpTransmissionLayer, HttpLayer
+from .. import utils
+from ..models import HTTPRequest, HTTPResponse
+
+
+class SafeH2Connection(H2Connection):
+ def __init__(self, conn, *args, **kwargs):
+ super(SafeH2Connection, self).__init__(*args, **kwargs)
+ self.conn = conn
+ self.lock = threading.RLock()
+
+ def safe_close_connection(self, error_code):
+ with self.lock:
+ self.close_connection(error_code)
+ self.conn.send(self.data_to_send())
+
+ def safe_increment_flow_control(self, stream_id, length):
+ if length == 0:
+ return
+
+ with self.lock:
+ self.increment_flow_control_window(length)
+ self.conn.send(self.data_to_send())
+ with self.lock:
+ if stream_id in self.streams and not self.streams[stream_id].closed:
+ self.increment_flow_control_window(length, stream_id=stream_id)
+ self.conn.send(self.data_to_send())
+
+ def safe_reset_stream(self, stream_id, error_code):
+ with self.lock:
+ try:
+ self.reset_stream(stream_id, error_code)
+ except h2.exceptions.StreamClosedError:
+ # stream is already closed - good
+ pass
+ self.conn.send(self.data_to_send())
+
+ def safe_update_settings(self, new_settings):
+ with self.lock:
+ self.update_settings(new_settings)
+ self.conn.send(self.data_to_send())
+
+ def safe_send_headers(self, is_zombie, stream_id, headers):
+ with self.lock:
+ if is_zombie(self, stream_id):
+ return
+ self.send_headers(stream_id, headers)
+ self.conn.send(self.data_to_send())
+
+ def safe_send_body(self, is_zombie, stream_id, chunks):
+ for chunk in chunks:
+ position = 0
+ while position < len(chunk):
+ self.lock.acquire()
+ if is_zombie(self, stream_id):
+ return
+ max_outbound_frame_size = self.max_outbound_frame_size
+ frame_chunk = chunk[position:position+max_outbound_frame_size]
+ if self.local_flow_control_window(stream_id) < len(frame_chunk):
+ self.lock.release()
+ time.sleep(0)
+ continue
+ self.send_data(stream_id, frame_chunk)
+ self.conn.send(self.data_to_send())
+ self.lock.release()
+ position += max_outbound_frame_size
+ with self.lock:
+ if is_zombie(self, stream_id):
+ return
+ self.end_stream(stream_id)
+ self.conn.send(self.data_to_send())
+
+
+class Http2Layer(Layer):
+ def __init__(self, ctx, mode):
+ super(Http2Layer, self).__init__(ctx)
+ self.mode = mode
+ self.streams = dict()
+ self.server_to_client_stream_ids = dict([(0, 0)])
+ self.client_conn.h2 = SafeH2Connection(self.client_conn, client_side=False)
+
+ # make sure that we only pass actual SSL.Connection objects in here,
+ # because otherwise ssl_read_select fails!
+ self.active_conns = [self.client_conn.connection]
+
+ def _initiate_server_conn(self):
+ self.server_conn.h2 = SafeH2Connection(self.server_conn, client_side=True)
+ self.server_conn.h2.initiate_connection()
+ self.server_conn.send(self.server_conn.h2.data_to_send())
+ self.active_conns.append(self.server_conn.connection)
+
+ def connect(self):
+ self.ctx.connect()
+ self.server_conn.connect()
+ self._initiate_server_conn()
+
+ def set_server(self):
+ raise NotImplementedError("Cannot change server for HTTP2 connections.")
+
+ def disconnect(self):
+ raise NotImplementedError("Cannot dis- or reconnect in HTTP2 connections.")
+
+ def next_layer(self):
+ # WebSockets over HTTP/2?
+ # CONNECT for proxying?
+ raise NotImplementedError()
+
+ def _handle_event(self, event, source_conn, other_conn, is_server):
+ if hasattr(event, 'stream_id'):
+ if is_server and event.stream_id % 2 == 1:
+ eid = self.server_to_client_stream_ids[event.stream_id]
+ else:
+ eid = event.stream_id
+
+ if isinstance(event, RequestReceived):
+ headers = Headers([[str(k), str(v)] for k, v in event.headers])
+ self.streams[eid] = Http2SingleStreamLayer(self, eid, headers)
+ self.streams[eid].timestamp_start = time.time()
+ self.streams[eid].start()
+ elif isinstance(event, ResponseReceived):
+ headers = Headers([[str(k), str(v)] for k, v in event.headers])
+ self.streams[eid].queued_data_length = 0
+ self.streams[eid].timestamp_start = time.time()
+ self.streams[eid].response_headers = headers
+ self.streams[eid].response_arrived.set()
+ elif isinstance(event, DataReceived):
+ if self.config.body_size_limit and self.streams[eid].queued_data_length > self.config.body_size_limit:
+ raise HttpException("HTTP body too large. Limit is {}.".format(self.config.body_size_limit))
+ self.streams[eid].data_queue.put(event.data)
+ self.streams[eid].queued_data_length += len(event.data)
+ source_conn.h2.safe_increment_flow_control(event.stream_id, len(event.data))
+ elif isinstance(event, StreamEnded):
+ self.streams[eid].timestamp_end = time.time()
+ self.streams[eid].data_finished.set()
+ elif isinstance(event, StreamReset):
+ self.streams[eid].zombie = time.time()
+ if eid in self.streams and event.error_code == 0x8:
+ if is_server:
+ other_stream_id = self.streams[eid].client_stream_id
+ else:
+ other_stream_id = self.streams[eid].server_stream_id
+ other_conn.h2.safe_reset_stream(other_stream_id, event.error_code)
+ elif isinstance(event, RemoteSettingsChanged):
+ new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()])
+ other_conn.h2.safe_update_settings(new_settings)
+ elif isinstance(event, ConnectionTerminated):
+ other_conn.h2.safe_close_connection(event.error_code)
+ return False
+ elif isinstance(event, PushedStreamReceived):
+ # pushed stream ids should be uniq and not dependent on race conditions
+ # only the parent stream id must be looked up first
+ parent_eid = self.server_to_client_stream_ids[event.parent_stream_id]
+ with self.client_conn.h2.lock:
+ self.client_conn.h2.push_stream(parent_eid, event.pushed_stream_id, event.headers)
+
+ headers = Headers([[str(k), str(v)] for k, v in event.headers])
+ headers['x-mitmproxy-pushed'] = 'true'
+ self.streams[event.pushed_stream_id] = Http2SingleStreamLayer(self, event.pushed_stream_id, headers)
+ self.streams[event.pushed_stream_id].timestamp_start = time.time()
+ self.streams[event.pushed_stream_id].pushed = True
+ self.streams[event.pushed_stream_id].parent_stream_id = parent_eid
+ self.streams[event.pushed_stream_id].timestamp_end = time.time()
+ self.streams[event.pushed_stream_id].data_finished.set()
+ self.streams[event.pushed_stream_id].start()
+ elif isinstance(event, TrailersReceived):
+ raise NotImplementedError()
+
+ return True
+
+ def _cleanup_streams(self):
+ death_time = time.time() - 10
+ for stream_id in self.streams.keys():
+ zombie = self.streams[stream_id].zombie
+ if zombie and zombie <= death_time:
+ self.streams.pop(stream_id, None)
+
+ def __call__(self):
+ if self.server_conn:
+ self._initiate_server_conn()
+
+ preamble = self.client_conn.rfile.read(24)
+ self.client_conn.h2.initiate_connection()
+ self.client_conn.h2.receive_data(preamble)
+ self.client_conn.send(self.client_conn.h2.data_to_send())
+
+ while True:
+ r = ssl_read_select(self.active_conns, 1)
+ for conn in r:
+ source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn
+ other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn
+ is_server = (conn == self.server_conn.connection)
+
+ field = source_conn.rfile.peek(3)
+ length = int(field.encode('hex'), 16)
+ raw_frame = source_conn.rfile.safe_read(9 + length)
+
+ with source_conn.h2.lock:
+ events = source_conn.h2.receive_data(raw_frame)
+ source_conn.send(source_conn.h2.data_to_send())
+
+ for event in events:
+ if not self._handle_event(event, source_conn, other_conn, is_server):
+ return
+
+ self._cleanup_streams()
+
+
+class Http2SingleStreamLayer(_HttpTransmissionLayer, threading.Thread):
+ def __init__(self, ctx, stream_id, request_headers):
+ super(Http2SingleStreamLayer, self).__init__(ctx)
+ self.zombie = None
+ self.client_stream_id = stream_id
+ self.server_stream_id = None
+ self.request_headers = request_headers
+ self.response_headers = None
+ self.data_queue = Queue.Queue()
+ self.queued_data_length = 0
+
+ self.response_arrived = threading.Event()
+ self.data_finished = threading.Event()
+
+ def is_zombie(self, h2_conn, stream_id):
+ if self.zombie:
+ return True
+ return False
+
+ def read_request(self):
+ self.data_finished.wait()
+ self.data_finished.clear()
+
+ authority = self.request_headers.get(':authority', '')
+ method = self.request_headers.get(':method', 'GET')
+ scheme = self.request_headers.get(':scheme', 'https')
+ path = self.request_headers.get(':path', '/')
+ host = None
+ port = None
+
+ if path == '*' or path.startswith("/"):
+ form_in = "relative"
+ elif method == 'CONNECT':
+ form_in = "authority"
+ if ":" in authority:
+ host, port = authority.split(":", 1)
+ else:
+ host = authority
+ else:
+ form_in = "absolute"
+ # FIXME: verify if path or :host contains what we need
+ scheme, host, port, _ = utils.parse_url(path)
+
+ if host is None:
+ host = 'localhost'
+ if port is None:
+ port = 80 if scheme == 'http' else 443
+ port = int(port)
+
+ data = []
+ while self.data_queue.qsize() > 0:
+ data.append(self.data_queue.get())
+ data = b"".join(data)
+
+ return HTTPRequest(
+ form_in,
+ method,
+ scheme,
+ host,
+ port,
+ path,
+ (2, 0),
+ self.request_headers,
+ data,
+ timestamp_start=self.timestamp_start,
+ timestamp_end=self.timestamp_end,
+ )
+
+ def send_request(self, message):
+ with self.server_conn.h2.lock:
+ self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
+ self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
+
+ self.server_conn.h2.safe_send_headers(
+ self.is_zombie,
+ self.server_stream_id,
+ message.headers
+ )
+ self.server_conn.h2.safe_send_body(
+ self.is_zombie,
+ self.server_stream_id,
+ message.body
+ )
+
+ def read_response_headers(self):
+ self.response_arrived.wait()
+
+ status_code = int(self.response_headers.get(':status', 502))
+
+ return HTTPResponse(
+ http_version=(2, 0),
+ status_code=status_code,
+ reason='',
+ headers=self.response_headers,
+ content=None,
+ timestamp_start=self.timestamp_start,
+ timestamp_end=self.timestamp_end,
+ )
+
+ def read_response_body(self, request, response):
+ while True:
+ try:
+ yield self.data_queue.get(timeout=1)
+ except Queue.Empty:
+ pass
+ if self.data_finished.is_set():
+ while self.data_queue.qsize() > 0:
+ yield self.data_queue.get()
+ return
+ if self.zombie:
+ return
+
+ def send_response_headers(self, response):
+ self.client_conn.h2.safe_send_headers(
+ self.is_zombie,
+ self.client_stream_id,
+ response.headers
+ )
+
+ def send_response_body(self, _response, chunks):
+ self.client_conn.h2.safe_send_body(
+ self.is_zombie,
+ self.client_stream_id,
+ chunks
+ )
+
+ def check_close_connection(self, flow):
+ # This layer only handles a single stream.
+ # RFC 7540 8.1: An HTTP request/response exchange fully consumes a single stream.
+ return True
+
+ def connect(self):
+ raise ValueError("CONNECT inside an HTTP2 stream is not supported.")
+
+ def set_server(self, *args, **kwargs):
+ # do not mess with the server connection - all streams share it.
+ pass
+
+ def run(self):
+ layer = HttpLayer(self, self.mode)
+ layer()
+ self.zombie = time.time()