aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libmproxy/models/http.py4
-rw-r--r--libmproxy/protocol/http.py420
-rw-r--r--requirements.txt2
-rw-r--r--setup.py1
4 files changed, 296 insertions, 131 deletions
diff --git a/libmproxy/models/http.py b/libmproxy/models/http.py
index e07dff69..3914440e 100644
--- a/libmproxy/models/http.py
+++ b/libmproxy/models/http.py
@@ -241,8 +241,6 @@ class HTTPRequest(MessageMixin, Request):
timestamp_end=request.timestamp_end,
form_out=(request.form_out if hasattr(request, 'form_out') else None),
)
- if hasattr(request, 'stream_id'):
- req.stream_id = request.stream_id
return req
def __hash__(self):
@@ -347,8 +345,6 @@ class HTTPResponse(MessageMixin, Response):
timestamp_start=response.timestamp_start,
timestamp_end=response.timestamp_end,
)
- if hasattr(response, 'stream_id'):
- resp.stream_id = response.stream_id
return resp
def _refresh_cookie(self, c, delta):
diff --git a/libmproxy/protocol/http.py b/libmproxy/protocol/http.py
index 12d09e71..fc045e0d 100644
--- a/libmproxy/protocol/http.py
+++ b/libmproxy/protocol/http.py
@@ -1,27 +1,37 @@
from __future__ import (absolute_import, print_function, division)
+
import sys
import traceback
-
import six
+import struct
+import threading
+import Queue
from netlib import tcp
from netlib.exceptions import HttpException, HttpReadDisconnect, NetlibException
-from netlib.http import http1, Headers
-from netlib.http import CONTENT_MISSING
-from netlib.tcp import Address
-from netlib.http.http2.connections import HTTP2Protocol
-from netlib.http.http2.frame import GoAwayFrame, PriorityFrame, WindowUpdateFrame
+from netlib.http import http1, Headers, CONTENT_MISSING
+from netlib.tcp import Address, ssl_read_select
+
+import h2
+from h2.connection import H2Connection
+from h2.events import *
+from hyperframe import frame
+
+from .base import Layer, Kill
from .. import utils
from ..exceptions import HttpProtocolException, ProtocolException
from ..models import (
- HTTPFlow, HTTPRequest, HTTPResponse, make_error_response, make_connect_response, Error, expect_continue_response
+ HTTPFlow,
+ HTTPRequest,
+ HTTPResponse,
+ make_error_response,
+ make_connect_response,
+ Error,
+ expect_continue_response
)
-from .base import Layer, Kill
class _HttpLayer(Layer):
- supports_streaming = False
-
def read_request(self):
raise NotImplementedError()
@@ -32,37 +42,18 @@ class _HttpLayer(Layer):
raise NotImplementedError()
def read_response(self, request):
- raise NotImplementedError()
-
- def send_response(self, response):
- raise NotImplementedError()
-
- def check_close_connection(self, flow):
- raise NotImplementedError()
-
-
-class _StreamingHttpLayer(_HttpLayer):
- supports_streaming = True
-
- def read_response_headers(self):
- raise NotImplementedError
-
- def read_response_body(self, request, response):
- raise NotImplementedError()
- yield "this is a generator" # pragma: no cover
-
- def read_response(self, request):
response = self.read_response_headers()
response.data.content = b"".join(
self.read_response_body(request, response)
)
return response
- def send_response_headers(self, response):
- raise NotImplementedError
+ def read_response_headers(self):
+ raise NotImplementedError()
- def send_response_body(self, response, chunks):
+ def read_response_body(self, request, response):
raise NotImplementedError()
+ yield "this is a generator" # pragma: no cover
def send_response(self, response):
if response.content == CONTENT_MISSING:
@@ -70,9 +61,17 @@ class _StreamingHttpLayer(_HttpLayer):
self.send_response_headers(response)
self.send_response_body(response, [response.content])
+ def send_response_headers(self, response):
+ raise NotImplementedError()
+
+ def send_response_body(self, response, chunks):
+ raise NotImplementedError()
+
+ def check_close_connection(self, flow):
+ raise NotImplementedError()
-class Http1Layer(_StreamingHttpLayer):
+class Http1Layer(_HttpLayer):
def __init__(self, ctx, mode):
super(Http1Layer, self).__init__(ctx)
self.mode = mode
@@ -130,104 +129,277 @@ class Http1Layer(_StreamingHttpLayer):
layer = HttpLayer(self, self.mode)
layer()
-
-# TODO: The HTTP2 layer is missing multiplexing, which requires a major rewrite.
-class Http2Layer(_HttpLayer):
-
+class SafeH2Connection(H2Connection):
+ def __init__(self, conn, *args, **kwargs):
+ super(SafeH2Connection, self).__init__(*args, **kwargs)
+ self.conn = conn
+ self.lock = threading.RLock()
+
+ def safe_increment_flow_control(self, stream_id, length):
+ with self.lock:
+ self.increment_flow_control_window(length)
+ self.conn.send(self.data_to_send())
+ with self.lock:
+ if stream_id in self.streams and not self.streams[stream_id].closed:
+ self.increment_flow_control_window(length, stream_id=stream_id)
+ self.conn.send(self.data_to_send())
+
+ def safe_reset_stream(self, stream_id, error_code):
+ with self.lock:
+ self.reset_stream(stream_id, error_code)
+ self.conn.send(self.h2.data_to_send())
+
+ def safe_acknowledge_settings(self, event):
+ with self.conn.h2.lock:
+ self.conn.h2.acknowledge_settings(event)
+ self.conn.send(self.data_to_send())
+
+ def safe_update_settings(self, new_settings):
+ with self.conn.h2.lock:
+ self.update_settings(new_settings)
+ self.conn.send(self.data_to_send())
+
+ def safe_send_headers(self, stream_id, headers):
+ with self.lock:
+ self.send_headers(stream_id, headers)
+ self.conn.send(self.data_to_send())
+
+ def safe_send_body(self, stream_id, chunks):
+ for chunk in chunks:
+ max_outbound_frame_size = self.max_outbound_frame_size
+ for i in xrange(0, len(chunk), max_outbound_frame_size):
+ frame_chunk = chunk[i:i+max_outbound_frame_size]
+ with self.lock:
+ self.send_data(stream_id, frame_chunk)
+ self.conn.send(self.data_to_send())
+ with self.lock:
+ self.end_stream(stream_id)
+ self.conn.send(self.data_to_send())
+
+class Http2Layer(Layer):
def __init__(self, ctx, mode):
super(Http2Layer, self).__init__(ctx)
self.mode = mode
- self.client_protocol = HTTP2Protocol(self.client_conn, is_server=True,
- unhandled_frame_cb=self.handle_unexpected_frame_from_client)
- self.server_protocol = HTTP2Protocol(self.server_conn, is_server=False,
- unhandled_frame_cb=self.handle_unexpected_frame_from_server)
+ self.streams = dict()
+ self.server_to_client_stream_ids = dict([(0, 0)])
+ self.client_conn.h2 = SafeH2Connection(self.client_conn, client_side=False)
+
+ # make sure that we only pass actual SSL.Connection objects in here,
+ # because otherwise ssl_read_select fails!
+ self.active_conns = [self.client_conn.connection]
+
+ if self.server_conn:
+ self._initiate_server_conn()
+
+ def _initiate_server_conn(self):
+ self.server_conn.h2 = SafeH2Connection(self.server_conn, client_side=True)
+ self.server_conn.h2.initiate_connection()
+ self.server_conn.h2.update_settings({frame.SettingsFrame.ENABLE_PUSH: False})
+ self.server_conn.send(self.server_conn.h2.data_to_send())
+ self.active_conns.append(self.server_conn.connection)
+
+ def connect(self):
+ self.ctx.connect()
+ self.server_conn.connect()
+ self._initiate_server_conn()
+
+ def set_server(self):
+ raise NotImplementedError("Cannot change server for HTTP2 connections.")
+
+ def disconnect(self):
+ raise NotImplementedError("Cannot dis- or reconnect in HTTP2 connections.")
+
+ def __call__(self):
+ preamble = self.client_conn.rfile.read(24)
+ self.client_conn.h2.initiate_connection()
+ self.client_conn.h2.update_settings({frame.SettingsFrame.ENABLE_PUSH: False})
+ self.client_conn.h2.receive_data(preamble)
+ self.client_conn.send(self.client_conn.h2.data_to_send())
+
+ while True:
+ r = ssl_read_select(self.active_conns, 1)
+ for conn in r:
+ source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn
+ other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn
+ is_server = (conn == self.server_conn.connection)
+
+ fields = struct.unpack("!HB", source_conn.rfile.peek(3))
+ length = (fields[0] << 8) + fields[1]
+ raw_frame = source_conn.rfile.safe_read(9 + length)
+
+ with source_conn.h2.lock:
+ events = source_conn.h2.receive_data(raw_frame)
+ source_conn.send(source_conn.h2.data_to_send())
+
+ for event in events:
+ if hasattr(event, 'stream_id'):
+ if is_server:
+ eid = self.server_to_client_stream_ids[event.stream_id]
+ else:
+ eid = event.stream_id
+
+ if isinstance(event, RequestReceived):
+ headers = Headers([[str(k), str(v)] for k, v in event.headers])
+ self.streams[eid] = Http2SingleStreamLayer(self, eid, headers)
+ self.streams[eid].start()
+ elif isinstance(event, ResponseReceived):
+ headers = Headers([[str(k), str(v)] for k, v in event.headers])
+ self.streams[eid].response_headers = headers
+ self.streams[eid].response_arrived.set()
+ elif isinstance(event, DataReceived):
+ self.streams[eid].data_queue.put(event.data)
+ source_conn.h2.safe_increment_flow_control(event.stream_id, len(event.data))
+ elif isinstance(event, StreamEnded):
+ self.streams[eid].data_finished.set()
+ elif isinstance(event, StreamReset):
+ self.streams[eid].zombie = True
+ if eid in self.streams and event.error_code == 0x8:
+ if is_server:
+ other_stream_id = self.streams[eid].client_stream_id
+ else:
+ other_stream_id = self.streams[eid].server_stream_id
+ other_conn.h2.safe_reset_stream(other_stream_id, event.error_code)
+ elif isinstance(event, RemoteSettingsChanged):
+ source_conn.h2.safe_acknowledge_settings(event)
+ new_settings = dict([(id, cs.new_value) for (id, cs) in event.changed_settings.iteritems()])
+ other_conn.h2.safe_update_settings(new_settings)
+
+ # TODO: cleanup resources once we are sure nobody needs them
+ # for stream_id in self.streams.keys():
+ # if self.streams[stream_id].zombie:
+ # self.streams.pop(stream_id, None)
+
+
+class Http2SingleStreamLayer(_HttpLayer, threading.Thread):
+ def __init__(self, ctx, stream_id, request_headers):
+ super(Http2SingleStreamLayer, self).__init__(ctx)
+ self.zombie = False
+ self.client_stream_id = stream_id
+ self.server_stream_id = None
+ self.request_headers = request_headers
+ self.response_headers = None
+ self.data_queue = Queue.Queue()
+
+ self.response_arrived = threading.Event()
+ self.data_finished = threading.Event()
def read_request(self):
- request = HTTPRequest.from_protocol(
- self.client_protocol,
- body_size_limit=self.config.body_size_limit
+ self.data_finished.wait()
+ self.data_finished.clear()
+
+ authority = self.request_headers.get(':authority', '')
+ method = self.request_headers.get(':method', 'GET')
+ scheme = self.request_headers.get(':scheme', 'https')
+ path = self.request_headers.get(':path', '/')
+ host = None
+ port = None
+
+ if path == '*' or path.startswith("/"):
+ form_in = "relative"
+ elif method == 'CONNECT':
+ form_in = "authority"
+ if ":" in authority:
+ host, port = authority.split(":", 1)
+ else:
+ host = authority
+ else:
+ form_in = "absolute"
+ # FIXME: verify if path or :host contains what we need
+ scheme, host, port, _ = utils.parse_url(path)
+
+ if host is None:
+ host = 'localhost'
+ if port is None:
+ port = 80 if scheme == 'http' else 443
+ port = int(port)
+
+ data = []
+ while self.data_queue.qsize() > 0:
+ data.append(self.data_queue.get())
+
+ return HTTPRequest(
+ form_in,
+ method,
+ scheme,
+ host,
+ port,
+ path,
+ (2, 0),
+ self.request_headers,
+ data,
+ # TODO: timestamp_start=None,
+ # TODO: timestamp_end=None,
+ form_out=None, # TODO: (request.form_out if hasattr(request, 'form_out') else None),
)
- self._stream_id = request.stream_id
- return request
def send_request(self, message):
- # TODO: implement flow control and WINDOW_UPDATE frames
- self.server_conn.send(self.server_protocol.assemble(message))
+ with self.server_conn.h2.lock:
+ self.server_stream_id = self.server_conn.h2.get_next_available_stream_id()
+ self.server_to_client_stream_ids[self.server_stream_id] = self.client_stream_id
- def read_response(self, request):
- return HTTPResponse.from_protocol(
- self.server_protocol,
- request_method=request.method,
- body_size_limit=self.config.body_size_limit,
- include_body=True,
- stream_id=self._stream_id
+ self.server_conn.h2.safe_send_headers(
+ self.server_stream_id,
+ message.headers
+ )
+ self.server_conn.h2.safe_send_body(
+ self.server_stream_id,
+ message.body
+ )
+
+ def read_response_headers(self):
+ self.response_arrived.wait()
+
+ status_code = int(self.response_headers.get(':status', 502))
+
+ return HTTPResponse(
+ http_version=(2, 0),
+ status_code=status_code,
+ reason='',
+ headers=self.response_headers,
+ content=None,
+ # TODO: timestamp_start=response.timestamp_start,
+ # TODO: timestamp_end=response.timestamp_end,
)
- def send_response(self, message):
- # TODO: implement flow control to prevent client buffer filling up
- # maintain a send buffer size, and read WindowUpdateFrames from client to increase the send buffer
- self.client_conn.send(self.client_protocol.assemble(message))
+ def read_response_body(self, request, response):
+ while True:
+ try:
+ yield self.data_queue.get(timeout=1)
+ except Queue.Empty:
+ pass
+ if self.data_finished.is_set():
+ while self.data_queue.qsize() > 0:
+ yield self.data_queue.get()
+ return
+
+ def send_response_headers(self, response):
+ self.client_conn.h2.safe_send_headers(
+ self.client_stream_id,
+ response.headers
+ )
+
+ def send_response_body(self, _response, chunks):
+ self.client_conn.h2.safe_send_body(
+ self.client_stream_id,
+ chunks
+ )
def check_close_connection(self, flow):
- # TODO: add a timer to disconnect after a 10 second timeout
- return False
+ # This layer only handles a single stream.
+ # RFC 7540 8.1: An HTTP request/response exchange fully consumes a single stream.
+ return True
def connect(self):
- self.ctx.connect()
- self.server_protocol = HTTP2Protocol(self.server_conn, is_server=False,
- unhandled_frame_cb=self.handle_unexpected_frame_from_server)
- self.server_protocol.perform_connection_preface()
+ raise ValueError("CONNECT inside an HTTP2 stream is not supported.")
def set_server(self, *args, **kwargs):
- self.ctx.set_server(*args, **kwargs)
- self.server_protocol = HTTP2Protocol(self.server_conn, is_server=False,
- unhandled_frame_cb=self.handle_unexpected_frame_from_server)
- self.server_protocol.perform_connection_preface()
+ # do not mess with the server connection - all streams share it.
+ pass
- def __call__(self):
- self.server_protocol.perform_connection_preface()
+ def run(self):
layer = HttpLayer(self, self.mode)
layer()
-
- # terminate the connection
- self.client_conn.send(GoAwayFrame().to_bytes())
-
- def handle_unexpected_frame_from_client(self, frame):
- if isinstance(frame, WindowUpdateFrame):
- # Clients are sending WindowUpdate frames depending on their flow control algorithm.
- # Since we cannot predict these frames, and we do not need to respond to them,
- # simply accept them, and hide them from the log.
- # Ideally we should keep track of our own flow control window and
- # stall transmission if the outgoing flow control buffer is full.
- return
- if isinstance(frame, PriorityFrame):
- # Clients are sending Priority frames depending on their implementation.
- # The RFC does not clearly state when or which priority preferences should be set.
- # Since we cannot predict these frames, and we do not need to respond to them,
- # simply accept them, and hide them from the log.
- # Ideally we should forward them to the server.
- return
- if isinstance(frame, GoAwayFrame):
- # Client wants to terminate the connection,
- # relay it to the server.
- self.server_conn.send(frame.to_bytes())
- return
- self.log("Unexpected HTTP2 frame from client: %s" % frame.human_readable(), "info")
-
- def handle_unexpected_frame_from_server(self, frame):
- if isinstance(frame, WindowUpdateFrame):
- # Servers are sending WindowUpdate frames depending on their flow control algorithm.
- # Since we cannot predict these frames, and we do not need to respond to them,
- # simply accept them, and hide them from the log.
- # Ideally we should keep track of our own flow control window and
- # stall transmission if the outgoing flow control buffer is full.
- return
- if isinstance(frame, GoAwayFrame):
- # Server wants to terminate the connection,
- # relay it to the client.
- self.client_conn.send(frame.to_bytes())
- return
- self.log("Unexpected HTTP2 frame from server: %s" % frame.human_readable(), "info")
+ self.zombie = True
class ConnectServerConnection(object):
@@ -420,7 +592,7 @@ class HttpLayer(Layer):
layer()
def send_response_to_client(self, flow):
- if not (self.supports_streaming and flow.response.stream):
+ if not flow.response.stream:
# no streaming:
# we already received the full response from the server and can
# send it to the client straight away.
@@ -441,10 +613,7 @@ class HttpLayer(Layer):
def get_response_from_server(self, flow):
def get_response():
self.send_request(flow.request)
- if self.supports_streaming:
- flow.response = self.read_response_headers()
- else:
- flow.response = self.read_response(flow.request)
+ flow.response = self.read_response_headers()
try:
get_response()
@@ -474,15 +643,14 @@ class HttpLayer(Layer):
if flow == Kill:
raise Kill()
- if self.supports_streaming:
- if flow.response.stream:
- flow.response.data.content = CONTENT_MISSING
- else:
- flow.response.data.content = b"".join(self.read_response_body(
- flow.request,
- flow.response
- ))
- flow.response.timestamp_end = utils.timestamp()
+ if flow.response.stream:
+ flow.response.data.content = CONTENT_MISSING
+ else:
+ flow.response.data.content = b"".join(self.read_response_body(
+ flow.request,
+ flow.response
+ ))
+ flow.response.timestamp_end = utils.timestamp()
# no further manipulation of self.server_conn beyond this point
# we can safely set it as the final attribute value here.
diff --git a/requirements.txt b/requirements.txt
index 3832c953..49f86b9a 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,3 @@
-e git+https://github.com/mitmproxy/netlib.git#egg=netlib
-e git+https://github.com/mitmproxy/pathod.git#egg=pathod
--e .[dev,examples,contentviews] \ No newline at end of file
+-e .[dev,examples,contentviews]
diff --git a/setup.py b/setup.py
index 29ac4753..c7686c6e 100644
--- a/setup.py
+++ b/setup.py
@@ -17,6 +17,7 @@ with open(os.path.join(here, 'README.rst'), encoding='utf-8') as f:
# This will break `pip install` on systems with old setuptools versions.
deps = {
"netlib>=%s, <%s" % (version.MINORVERSION, version.NEXT_MINORVERSION),
+ "h2>=2.0.0",
"tornado>=4.3.0, <4.4",
"configargparse>=0.10.0, <0.11",
"pyperclip>=1.5.22, <1.6",