From 11e7f476bd4bbcd6d072fa3659f628ae3a19705d Mon Sep 17 00:00:00 2001 From: Maximilian Hils Date: Tue, 15 Sep 2015 19:12:15 +0200 Subject: wip --- netlib/http/http2/__init__.py | 2 - netlib/http/http2/connections.py | 412 +++++++++++++++++++++++++ netlib/http/http2/frame.py | 633 --------------------------------------- netlib/http/http2/frames.py | 633 +++++++++++++++++++++++++++++++++++++++ netlib/http/http2/protocol.py | 412 ------------------------- 5 files changed, 1045 insertions(+), 1047 deletions(-) create mode 100644 netlib/http/http2/connections.py delete mode 100644 netlib/http/http2/frame.py create mode 100644 netlib/http/http2/frames.py delete mode 100644 netlib/http/http2/protocol.py (limited to 'netlib/http/http2') diff --git a/netlib/http/http2/__init__.py b/netlib/http/http2/__init__.py index 5acf7696..e69de29b 100644 --- a/netlib/http/http2/__init__.py +++ b/netlib/http/http2/__init__.py @@ -1,2 +0,0 @@ -from frame import * -from protocol import * diff --git a/netlib/http/http2/connections.py b/netlib/http/http2/connections.py new file mode 100644 index 00000000..b6d376d3 --- /dev/null +++ b/netlib/http/http2/connections.py @@ -0,0 +1,412 @@ +from __future__ import (absolute_import, print_function, division) +import itertools +import time + +from hpack.hpack import Encoder, Decoder +from netlib import http, utils +from netlib.http import semantics +from . import frame + + +class TCPHandler(object): + + def __init__(self, rfile, wfile=None): + self.rfile = rfile + self.wfile = wfile + + +class HTTP2Protocol(semantics.ProtocolMixin): + + ERROR_CODES = utils.BiDi( + NO_ERROR=0x0, + PROTOCOL_ERROR=0x1, + INTERNAL_ERROR=0x2, + FLOW_CONTROL_ERROR=0x3, + SETTINGS_TIMEOUT=0x4, + STREAM_CLOSED=0x5, + FRAME_SIZE_ERROR=0x6, + REFUSED_STREAM=0x7, + CANCEL=0x8, + COMPRESSION_ERROR=0x9, + CONNECT_ERROR=0xa, + ENHANCE_YOUR_CALM=0xb, + INADEQUATE_SECURITY=0xc, + HTTP_1_1_REQUIRED=0xd + ) + + CLIENT_CONNECTION_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + + ALPN_PROTO_H2 = 'h2' + + def __init__( + self, + tcp_handler=None, + rfile=None, + wfile=None, + is_server=False, + dump_frames=False, + encoder=None, + decoder=None, + unhandled_frame_cb=None, + ): + self.tcp_handler = tcp_handler or TCPHandler(rfile, wfile) + self.is_server = is_server + self.dump_frames = dump_frames + self.encoder = encoder or Encoder() + self.decoder = decoder or Decoder() + self.unhandled_frame_cb = unhandled_frame_cb + + self.http2_settings = frame.HTTP2_DEFAULT_SETTINGS.copy() + self.current_stream_id = None + self.connection_preface_performed = False + + def read_request( + self, + include_body=True, + body_size_limit=None, + allow_empty=False, + ): + if body_size_limit is not None: + raise NotImplementedError() + + self.perform_connection_preface() + + timestamp_start = time.time() + if hasattr(self.tcp_handler.rfile, "reset_timestamps"): + self.tcp_handler.rfile.reset_timestamps() + + stream_id, headers, body = self._receive_transmission( + include_body=include_body, + ) + + if hasattr(self.tcp_handler.rfile, "first_byte_timestamp"): + # more accurate timestamp_start + timestamp_start = self.tcp_handler.rfile.first_byte_timestamp + + timestamp_end = time.time() + + authority = headers.get(':authority', '') + method = headers.get(':method', 'GET') + scheme = headers.get(':scheme', 'https') + path = headers.get(':path', '/') + host = None + port = None + + if path == '*' or path.startswith("/"): + form_in = "relative" + elif method == 'CONNECT': + form_in = "authority" + if ":" in authority: + host, port = authority.split(":", 1) + else: + host = authority + else: + form_in = "absolute" + # FIXME: verify if path or :host contains what we need + scheme, host, port, _ = utils.parse_url(path) + + if host is None: + host = 'localhost' + if port is None: + port = 80 if scheme == 'http' else 443 + port = int(port) + + request = http.Request( + form_in, + method, + scheme, + host, + port, + path, + (2, 0), + headers, + body, + timestamp_start, + timestamp_end, + ) + # FIXME: We should not do this. + request.stream_id = stream_id + + return request + + def read_response( + self, + request_method='', + body_size_limit=None, + include_body=True, + stream_id=None, + ): + if body_size_limit is not None: + raise NotImplementedError() + + self.perform_connection_preface() + + timestamp_start = time.time() + if hasattr(self.tcp_handler.rfile, "reset_timestamps"): + self.tcp_handler.rfile.reset_timestamps() + + stream_id, headers, body = self._receive_transmission( + stream_id=stream_id, + include_body=include_body, + ) + + if hasattr(self.tcp_handler.rfile, "first_byte_timestamp"): + # more accurate timestamp_start + timestamp_start = self.tcp_handler.rfile.first_byte_timestamp + + if include_body: + timestamp_end = time.time() + else: + timestamp_end = None + + response = http.Response( + (2, 0), + int(headers.get(':status', 502)), + "", + headers, + body, + timestamp_start=timestamp_start, + timestamp_end=timestamp_end, + ) + response.stream_id = stream_id + + return response + + def assemble_request(self, request): + assert isinstance(request, semantics.Request) + + authority = self.tcp_handler.sni if self.tcp_handler.sni else self.tcp_handler.address.host + if self.tcp_handler.address.port != 443: + authority += ":%d" % self.tcp_handler.address.port + + headers = request.headers.copy() + + if ':authority' not in headers: + headers.fields.insert(0, (':authority', bytes(authority))) + if ':scheme' not in headers: + headers.fields.insert(0, (':scheme', bytes(request.scheme))) + if ':path' not in headers: + headers.fields.insert(0, (':path', bytes(request.path))) + if ':method' not in headers: + headers.fields.insert(0, (':method', bytes(request.method))) + + if hasattr(request, 'stream_id'): + stream_id = request.stream_id + else: + stream_id = self._next_stream_id() + + return list(itertools.chain( + self._create_headers(headers, stream_id, end_stream=(request.body is None or len(request.body) == 0)), + self._create_body(request.body, stream_id))) + + def assemble_response(self, response): + assert isinstance(response, semantics.Response) + + headers = response.headers.copy() + + if ':status' not in headers: + headers.fields.insert(0, (':status', bytes(str(response.status_code)))) + + if hasattr(response, 'stream_id'): + stream_id = response.stream_id + else: + stream_id = self._next_stream_id() + + return list(itertools.chain( + self._create_headers(headers, stream_id, end_stream=(response.body is None or len(response.body) == 0)), + self._create_body(response.body, stream_id), + )) + + def perform_connection_preface(self, force=False): + if force or not self.connection_preface_performed: + if self.is_server: + self.perform_server_connection_preface(force) + else: + self.perform_client_connection_preface(force) + + def perform_server_connection_preface(self, force=False): + if force or not self.connection_preface_performed: + self.connection_preface_performed = True + + magic_length = len(self.CLIENT_CONNECTION_PREFACE) + magic = self.tcp_handler.rfile.safe_read(magic_length) + assert magic == self.CLIENT_CONNECTION_PREFACE + + frm = frame.SettingsFrame(state=self, settings={ + frame.SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 0, + frame.SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS: 1, + }) + self.send_frame(frm, hide=True) + self._receive_settings(hide=True) + + def perform_client_connection_preface(self, force=False): + if force or not self.connection_preface_performed: + self.connection_preface_performed = True + + self.tcp_handler.wfile.write(self.CLIENT_CONNECTION_PREFACE) + + self.send_frame(frame.SettingsFrame(state=self), hide=True) + self._receive_settings(hide=True) # server announces own settings + self._receive_settings(hide=True) # server acks my settings + + def send_frame(self, frm, hide=False): + raw_bytes = frm.to_bytes() + self.tcp_handler.wfile.write(raw_bytes) + self.tcp_handler.wfile.flush() + if not hide and self.dump_frames: # pragma no cover + print(frm.human_readable(">>")) + + def read_frame(self, hide=False): + while True: + frm = frame.Frame.from_file(self.tcp_handler.rfile, self) + if not hide and self.dump_frames: # pragma no cover + print(frm.human_readable("<<")) + + if isinstance(frm, frame.PingFrame): + raw_bytes = frame.PingFrame(flags=frame.Frame.FLAG_ACK, payload=frm.payload).to_bytes() + self.tcp_handler.wfile.write(raw_bytes) + self.tcp_handler.wfile.flush() + continue + if isinstance(frm, frame.SettingsFrame) and not frm.flags & frame.Frame.FLAG_ACK: + self._apply_settings(frm.settings, hide) + if isinstance(frm, frame.DataFrame) and frm.length > 0: + self._update_flow_control_window(frm.stream_id, frm.length) + return frm + + def check_alpn(self): + alp = self.tcp_handler.get_alpn_proto_negotiated() + if alp != self.ALPN_PROTO_H2: + raise NotImplementedError( + "HTTP2Protocol can not handle unknown ALP: %s" % alp) + return True + + def _handle_unexpected_frame(self, frm): + if isinstance(frm, frame.SettingsFrame): + return + if self.unhandled_frame_cb: + self.unhandled_frame_cb(frm) + + def _receive_settings(self, hide=False): + while True: + frm = self.read_frame(hide) + if isinstance(frm, frame.SettingsFrame): + break + else: + self._handle_unexpected_frame(frm) + + def _next_stream_id(self): + if self.current_stream_id is None: + if self.is_server: + # servers must use even stream ids + self.current_stream_id = 2 + else: + # clients must use odd stream ids + self.current_stream_id = 1 + else: + self.current_stream_id += 2 + return self.current_stream_id + + def _apply_settings(self, settings, hide=False): + for setting, value in settings.items(): + old_value = self.http2_settings[setting] + if not old_value: + old_value = '-' + self.http2_settings[setting] = value + + frm = frame.SettingsFrame( + state=self, + flags=frame.Frame.FLAG_ACK) + self.send_frame(frm, hide) + + def _update_flow_control_window(self, stream_id, increment): + frm = frame.WindowUpdateFrame(stream_id=0, window_size_increment=increment) + self.send_frame(frm) + frm = frame.WindowUpdateFrame(stream_id=stream_id, window_size_increment=increment) + self.send_frame(frm) + + def _create_headers(self, headers, stream_id, end_stream=True): + def frame_cls(chunks): + for i in chunks: + if i == 0: + yield frame.HeadersFrame, i + else: + yield frame.ContinuationFrame, i + + header_block_fragment = self.encoder.encode(headers.fields) + + chunk_size = self.http2_settings[frame.SettingsFrame.SETTINGS.SETTINGS_MAX_FRAME_SIZE] + chunks = range(0, len(header_block_fragment), chunk_size) + frms = [frm_cls( + state=self, + flags=frame.Frame.FLAG_NO_FLAGS, + stream_id=stream_id, + header_block_fragment=header_block_fragment[i:i+chunk_size]) for frm_cls, i in frame_cls(chunks)] + + last_flags = frame.Frame.FLAG_END_HEADERS + if end_stream: + last_flags |= frame.Frame.FLAG_END_STREAM + frms[-1].flags = last_flags + + if self.dump_frames: # pragma no cover + for frm in frms: + print(frm.human_readable(">>")) + + return [frm.to_bytes() for frm in frms] + + def _create_body(self, body, stream_id): + if body is None or len(body) == 0: + return b'' + + chunk_size = self.http2_settings[frame.SettingsFrame.SETTINGS.SETTINGS_MAX_FRAME_SIZE] + chunks = range(0, len(body), chunk_size) + frms = [frame.DataFrame( + state=self, + flags=frame.Frame.FLAG_NO_FLAGS, + stream_id=stream_id, + payload=body[i:i+chunk_size]) for i in chunks] + frms[-1].flags = frame.Frame.FLAG_END_STREAM + + if self.dump_frames: # pragma no cover + for frm in frms: + print(frm.human_readable(">>")) + + return [frm.to_bytes() for frm in frms] + + def _receive_transmission(self, stream_id=None, include_body=True): + if not include_body: + raise NotImplementedError() + + body_expected = True + + header_block_fragment = b'' + body = b'' + + while True: + frm = self.read_frame() + if ( + (isinstance(frm, frame.HeadersFrame) or isinstance(frm, frame.ContinuationFrame)) and + (stream_id is None or frm.stream_id == stream_id) + ): + stream_id = frm.stream_id + header_block_fragment += frm.header_block_fragment + if frm.flags & frame.Frame.FLAG_END_STREAM: + body_expected = False + if frm.flags & frame.Frame.FLAG_END_HEADERS: + break + else: + self._handle_unexpected_frame(frm) + + while body_expected: + frm = self.read_frame() + if isinstance(frm, frame.DataFrame) and frm.stream_id == stream_id: + body += frm.payload + if frm.flags & frame.Frame.FLAG_END_STREAM: + break + else: + self._handle_unexpected_frame(frm) + + headers = http.Headers( + [[str(k), str(v)] for k, v in self.decoder.decode(header_block_fragment)] + ) + + return stream_id, headers, body diff --git a/netlib/http/http2/frame.py b/netlib/http/http2/frame.py deleted file mode 100644 index b36b3adf..00000000 --- a/netlib/http/http2/frame.py +++ /dev/null @@ -1,633 +0,0 @@ -import sys -import struct -from hpack.hpack import Encoder, Decoder - -from .. import utils - - -class FrameSizeError(Exception): - pass - - -class Frame(object): - - """ - Baseclass Frame - contains header - payload is defined in subclasses - """ - - FLAG_NO_FLAGS = 0x0 - FLAG_ACK = 0x1 - FLAG_END_STREAM = 0x1 - FLAG_END_HEADERS = 0x4 - FLAG_PADDED = 0x8 - FLAG_PRIORITY = 0x20 - - def __init__( - self, - state=None, - length=0, - flags=FLAG_NO_FLAGS, - stream_id=0x0): - valid_flags = reduce(lambda x, y: x | y, self.VALID_FLAGS, 0x0) - if flags | valid_flags != valid_flags: - raise ValueError('invalid flags detected.') - - if state is None: - class State(object): - pass - - state = State() - state.http2_settings = HTTP2_DEFAULT_SETTINGS.copy() - state.encoder = Encoder() - state.decoder = Decoder() - - self.state = state - - self.length = length - self.type = self.TYPE - self.flags = flags - self.stream_id = stream_id - - @classmethod - def _check_frame_size(cls, length, state): - if state: - settings = state.http2_settings - else: - settings = HTTP2_DEFAULT_SETTINGS.copy() - - max_frame_size = settings[ - SettingsFrame.SETTINGS.SETTINGS_MAX_FRAME_SIZE] - - if length > max_frame_size: - raise FrameSizeError( - "Frame size exceeded: %d, but only %d allowed." % ( - length, max_frame_size)) - - @classmethod - def from_file(cls, fp, state=None): - """ - read a HTTP/2 frame sent by a server or client - fp is a "file like" object that could be backed by a network - stream or a disk or an in memory stream reader - """ - raw_header = fp.safe_read(9) - - fields = struct.unpack("!HBBBL", raw_header) - length = (fields[0] << 8) + fields[1] - flags = fields[3] - stream_id = fields[4] - - if raw_header[:4] == b'HTTP': # pragma no cover - print >> sys.stderr, "WARNING: This looks like an HTTP/1 connection!" - - cls._check_frame_size(length, state) - - payload = fp.safe_read(length) - return FRAMES[fields[2]].from_bytes( - state, - length, - flags, - stream_id, - payload) - - def to_bytes(self): - payload = self.payload_bytes() - self.length = len(payload) - - self._check_frame_size(self.length, self.state) - - b = struct.pack('!HB', (self.length & 0xFFFF00) >> 8, self.length & 0x0000FF) - b += struct.pack('!B', self.TYPE) - b += struct.pack('!B', self.flags) - b += struct.pack('!L', self.stream_id & 0x7FFFFFFF) - b += payload - - return b - - def payload_bytes(self): # pragma: no cover - raise NotImplementedError() - - def payload_human_readable(self): # pragma: no cover - raise NotImplementedError() - - def human_readable(self, direction="-"): - self.length = len(self.payload_bytes()) - - return "\n".join([ - "%s: %s | length: %d | flags: %#x | stream_id: %d" % ( - direction, self.__class__.__name__, self.length, self.flags, self.stream_id), - self.payload_human_readable(), - "===============================================================", - ]) - - def __eq__(self, other): - return self.to_bytes() == other.to_bytes() - - -class DataFrame(Frame): - TYPE = 0x0 - VALID_FLAGS = [Frame.FLAG_END_STREAM, Frame.FLAG_PADDED] - - def __init__( - self, - state=None, - length=0, - flags=Frame.FLAG_NO_FLAGS, - stream_id=0x0, - payload=b'', - pad_length=0): - super(DataFrame, self).__init__(state, length, flags, stream_id) - self.payload = payload - self.pad_length = pad_length - - @classmethod - def from_bytes(cls, state, length, flags, stream_id, payload): - f = cls(state=state, length=length, flags=flags, stream_id=stream_id) - - if f.flags & Frame.FLAG_PADDED: - f.pad_length = struct.unpack('!B', payload[0])[0] - f.payload = payload[1:-f.pad_length] - else: - f.payload = payload - - return f - - def payload_bytes(self): - if self.stream_id == 0x0: - raise ValueError('DATA frames MUST be associated with a stream.') - - b = b'' - if self.flags & self.FLAG_PADDED: - b += struct.pack('!B', self.pad_length) - - b += bytes(self.payload) - - if self.flags & self.FLAG_PADDED: - b += b'\0' * self.pad_length - - return b - - def payload_human_readable(self): - return "payload: %s" % str(self.payload) - - -class HeadersFrame(Frame): - TYPE = 0x1 - VALID_FLAGS = [ - Frame.FLAG_END_STREAM, - Frame.FLAG_END_HEADERS, - Frame.FLAG_PADDED, - Frame.FLAG_PRIORITY] - - def __init__( - self, - state=None, - length=0, - flags=Frame.FLAG_NO_FLAGS, - stream_id=0x0, - header_block_fragment=b'', - pad_length=0, - exclusive=False, - stream_dependency=0x0, - weight=0): - super(HeadersFrame, self).__init__(state, length, flags, stream_id) - - self.header_block_fragment = header_block_fragment - self.pad_length = pad_length - self.exclusive = exclusive - self.stream_dependency = stream_dependency - self.weight = weight - - @classmethod - def from_bytes(cls, state, length, flags, stream_id, payload): - f = cls(state=state, length=length, flags=flags, stream_id=stream_id) - - if f.flags & Frame.FLAG_PADDED: - f.pad_length = struct.unpack('!B', payload[0])[0] - f.header_block_fragment = payload[1:-f.pad_length] - else: - f.header_block_fragment = payload[0:] - - if f.flags & Frame.FLAG_PRIORITY: - f.stream_dependency, f.weight = struct.unpack( - '!LB', f.header_block_fragment[:5]) - f.exclusive = bool(f.stream_dependency >> 31) - f.stream_dependency &= 0x7FFFFFFF - f.header_block_fragment = f.header_block_fragment[5:] - - return f - - def payload_bytes(self): - if self.stream_id == 0x0: - raise ValueError('HEADERS frames MUST be associated with a stream.') - - b = b'' - if self.flags & self.FLAG_PADDED: - b += struct.pack('!B', self.pad_length) - - if self.flags & self.FLAG_PRIORITY: - b += struct.pack('!LB', - (int(self.exclusive) << 31) | self.stream_dependency, - self.weight) - - b += self.header_block_fragment - - if self.flags & self.FLAG_PADDED: - b += b'\0' * self.pad_length - - return b - - def payload_human_readable(self): - s = [] - - if self.flags & self.FLAG_PRIORITY: - s.append("exclusive: %d" % self.exclusive) - s.append("stream dependency: %#x" % self.stream_dependency) - s.append("weight: %d" % self.weight) - - if self.flags & self.FLAG_PADDED: - s.append("padding: %d" % self.pad_length) - - s.append( - "header_block_fragment: %s" % - self.header_block_fragment.encode('hex')) - - return "\n".join(s) - - -class PriorityFrame(Frame): - TYPE = 0x2 - VALID_FLAGS = [] - - def __init__( - self, - state=None, - length=0, - flags=Frame.FLAG_NO_FLAGS, - stream_id=0x0, - exclusive=False, - stream_dependency=0x0, - weight=0): - super(PriorityFrame, self).__init__(state, length, flags, stream_id) - self.exclusive = exclusive - self.stream_dependency = stream_dependency - self.weight = weight - - @classmethod - def from_bytes(cls, state, length, flags, stream_id, payload): - f = cls(state=state, length=length, flags=flags, stream_id=stream_id) - - f.stream_dependency, f.weight = struct.unpack('!LB', payload) - f.exclusive = bool(f.stream_dependency >> 31) - f.stream_dependency &= 0x7FFFFFFF - - return f - - def payload_bytes(self): - if self.stream_id == 0x0: - raise ValueError( - 'PRIORITY frames MUST be associated with a stream.') - - return struct.pack( - '!LB', - (int( - self.exclusive) << 31) | self.stream_dependency, - self.weight) - - def payload_human_readable(self): - s = [] - s.append("exclusive: %d" % self.exclusive) - s.append("stream dependency: %#x" % self.stream_dependency) - s.append("weight: %d" % self.weight) - return "\n".join(s) - - -class RstStreamFrame(Frame): - TYPE = 0x3 - VALID_FLAGS = [] - - def __init__( - self, - state=None, - length=0, - flags=Frame.FLAG_NO_FLAGS, - stream_id=0x0, - error_code=0x0): - super(RstStreamFrame, self).__init__(state, length, flags, stream_id) - self.error_code = error_code - - @classmethod - def from_bytes(cls, state, length, flags, stream_id, payload): - f = cls(state=state, length=length, flags=flags, stream_id=stream_id) - f.error_code = struct.unpack('!L', payload)[0] - return f - - def payload_bytes(self): - if self.stream_id == 0x0: - raise ValueError( - 'RST_STREAM frames MUST be associated with a stream.') - - return struct.pack('!L', self.error_code) - - def payload_human_readable(self): - return "error code: %#x" % self.error_code - - -class SettingsFrame(Frame): - TYPE = 0x4 - VALID_FLAGS = [Frame.FLAG_ACK] - - SETTINGS = utils.BiDi( - SETTINGS_HEADER_TABLE_SIZE=0x1, - SETTINGS_ENABLE_PUSH=0x2, - SETTINGS_MAX_CONCURRENT_STREAMS=0x3, - SETTINGS_INITIAL_WINDOW_SIZE=0x4, - SETTINGS_MAX_FRAME_SIZE=0x5, - SETTINGS_MAX_HEADER_LIST_SIZE=0x6, - ) - - def __init__( - self, - state=None, - length=0, - flags=Frame.FLAG_NO_FLAGS, - stream_id=0x0, - settings=None): - super(SettingsFrame, self).__init__(state, length, flags, stream_id) - - if settings is None: - settings = {} - - self.settings = settings - - @classmethod - def from_bytes(cls, state, length, flags, stream_id, payload): - f = cls(state=state, length=length, flags=flags, stream_id=stream_id) - - for i in xrange(0, len(payload), 6): - identifier, value = struct.unpack("!HL", payload[i:i + 6]) - f.settings[identifier] = value - - return f - - def payload_bytes(self): - if self.stream_id != 0x0: - raise ValueError( - 'SETTINGS frames MUST NOT be associated with a stream.') - - b = b'' - for identifier, value in self.settings.items(): - b += struct.pack("!HL", identifier & 0xFF, value) - - return b - - def payload_human_readable(self): - s = [] - - for identifier, value in self.settings.items(): - s.append("%s: %#x" % (self.SETTINGS.get_name(identifier), value)) - - if not s: - return "settings: None" - else: - return "\n".join(s) - - -class PushPromiseFrame(Frame): - TYPE = 0x5 - VALID_FLAGS = [Frame.FLAG_END_HEADERS, Frame.FLAG_PADDED] - - def __init__( - self, - state=None, - length=0, - flags=Frame.FLAG_NO_FLAGS, - stream_id=0x0, - promised_stream=0x0, - header_block_fragment=b'', - pad_length=0): - super(PushPromiseFrame, self).__init__(state, length, flags, stream_id) - self.pad_length = pad_length - self.promised_stream = promised_stream - self.header_block_fragment = header_block_fragment - - @classmethod - def from_bytes(cls, state, length, flags, stream_id, payload): - f = cls(state=state, length=length, flags=flags, stream_id=stream_id) - - if f.flags & Frame.FLAG_PADDED: - f.pad_length, f.promised_stream = struct.unpack('!BL', payload[:5]) - f.header_block_fragment = payload[5:-f.pad_length] - else: - f.promised_stream = int(struct.unpack("!L", payload[:4])[0]) - f.header_block_fragment = payload[4:] - - f.promised_stream &= 0x7FFFFFFF - - return f - - def payload_bytes(self): - if self.stream_id == 0x0: - raise ValueError( - 'PUSH_PROMISE frames MUST be associated with a stream.') - - if self.promised_stream == 0x0: - raise ValueError('Promised stream id not valid.') - - b = b'' - if self.flags & self.FLAG_PADDED: - b += struct.pack('!B', self.pad_length) - - b += struct.pack('!L', self.promised_stream & 0x7FFFFFFF) - b += bytes(self.header_block_fragment) - - if self.flags & self.FLAG_PADDED: - b += b'\0' * self.pad_length - - return b - - def payload_human_readable(self): - s = [] - - if self.flags & self.FLAG_PADDED: - s.append("padding: %d" % self.pad_length) - - s.append("promised stream: %#x" % self.promised_stream) - s.append( - "header_block_fragment: %s" % - self.header_block_fragment.encode('hex')) - - return "\n".join(s) - - -class PingFrame(Frame): - TYPE = 0x6 - VALID_FLAGS = [Frame.FLAG_ACK] - - def __init__( - self, - state=None, - length=0, - flags=Frame.FLAG_NO_FLAGS, - stream_id=0x0, - payload=b''): - super(PingFrame, self).__init__(state, length, flags, stream_id) - self.payload = payload - - @classmethod - def from_bytes(cls, state, length, flags, stream_id, payload): - f = cls(state=state, length=length, flags=flags, stream_id=stream_id) - f.payload = payload - return f - - def payload_bytes(self): - if self.stream_id != 0x0: - raise ValueError( - 'PING frames MUST NOT be associated with a stream.') - - b = self.payload[0:8] - b += b'\0' * (8 - len(b)) - return b - - def payload_human_readable(self): - return "opaque data: %s" % str(self.payload) - - -class GoAwayFrame(Frame): - TYPE = 0x7 - VALID_FLAGS = [] - - def __init__( - self, - state=None, - length=0, - flags=Frame.FLAG_NO_FLAGS, - stream_id=0x0, - last_stream=0x0, - error_code=0x0, - data=b''): - super(GoAwayFrame, self).__init__(state, length, flags, stream_id) - self.last_stream = last_stream - self.error_code = error_code - self.data = data - - @classmethod - def from_bytes(cls, state, length, flags, stream_id, payload): - f = cls(state=state, length=length, flags=flags, stream_id=stream_id) - - f.last_stream, f.error_code = struct.unpack("!LL", payload[:8]) - f.last_stream &= 0x7FFFFFFF - f.data = payload[8:] - - return f - - def payload_bytes(self): - if self.stream_id != 0x0: - raise ValueError( - 'GOAWAY frames MUST NOT be associated with a stream.') - - b = struct.pack('!LL', self.last_stream & 0x7FFFFFFF, self.error_code) - b += bytes(self.data) - return b - - def payload_human_readable(self): - s = [] - s.append("last stream: %#x" % self.last_stream) - s.append("error code: %d" % self.error_code) - s.append("debug data: %s" % str(self.data)) - return "\n".join(s) - - -class WindowUpdateFrame(Frame): - TYPE = 0x8 - VALID_FLAGS = [] - - def __init__( - self, - state=None, - length=0, - flags=Frame.FLAG_NO_FLAGS, - stream_id=0x0, - window_size_increment=0x0): - super(WindowUpdateFrame, self).__init__(state, length, flags, stream_id) - self.window_size_increment = window_size_increment - - @classmethod - def from_bytes(cls, state, length, flags, stream_id, payload): - f = cls(state=state, length=length, flags=flags, stream_id=stream_id) - - f.window_size_increment = struct.unpack("!L", payload)[0] - f.window_size_increment &= 0x7FFFFFFF - - return f - - def payload_bytes(self): - if self.window_size_increment <= 0 or self.window_size_increment >= 2 ** 31: - raise ValueError( - 'Window Size Increment MUST be greater than 0 and less than 2^31.') - - return struct.pack('!L', self.window_size_increment & 0x7FFFFFFF) - - def payload_human_readable(self): - return "window size increment: %#x" % self.window_size_increment - - -class ContinuationFrame(Frame): - TYPE = 0x9 - VALID_FLAGS = [Frame.FLAG_END_HEADERS] - - def __init__( - self, - state=None, - length=0, - flags=Frame.FLAG_NO_FLAGS, - stream_id=0x0, - header_block_fragment=b''): - super(ContinuationFrame, self).__init__(state, length, flags, stream_id) - self.header_block_fragment = header_block_fragment - - @classmethod - def from_bytes(cls, state, length, flags, stream_id, payload): - f = cls(state=state, length=length, flags=flags, stream_id=stream_id) - f.header_block_fragment = payload - return f - - def payload_bytes(self): - if self.stream_id == 0x0: - raise ValueError( - 'CONTINUATION frames MUST be associated with a stream.') - - return self.header_block_fragment - - def payload_human_readable(self): - s = [] - s.append( - "header_block_fragment: %s" % - self.header_block_fragment.encode('hex')) - return "\n".join(s) - -_FRAME_CLASSES = [ - DataFrame, - HeadersFrame, - PriorityFrame, - RstStreamFrame, - SettingsFrame, - PushPromiseFrame, - PingFrame, - GoAwayFrame, - WindowUpdateFrame, - ContinuationFrame -] -FRAMES = {cls.TYPE: cls for cls in _FRAME_CLASSES} - - -HTTP2_DEFAULT_SETTINGS = { - SettingsFrame.SETTINGS.SETTINGS_HEADER_TABLE_SIZE: 4096, - SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 1, - SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS: None, - SettingsFrame.SETTINGS.SETTINGS_INITIAL_WINDOW_SIZE: 2 ** 16 - 1, - SettingsFrame.SETTINGS.SETTINGS_MAX_FRAME_SIZE: 2 ** 14, - SettingsFrame.SETTINGS.SETTINGS_MAX_HEADER_LIST_SIZE: None, -} diff --git a/netlib/http/http2/frames.py b/netlib/http/http2/frames.py new file mode 100644 index 00000000..b36b3adf --- /dev/null +++ b/netlib/http/http2/frames.py @@ -0,0 +1,633 @@ +import sys +import struct +from hpack.hpack import Encoder, Decoder + +from .. import utils + + +class FrameSizeError(Exception): + pass + + +class Frame(object): + + """ + Baseclass Frame + contains header + payload is defined in subclasses + """ + + FLAG_NO_FLAGS = 0x0 + FLAG_ACK = 0x1 + FLAG_END_STREAM = 0x1 + FLAG_END_HEADERS = 0x4 + FLAG_PADDED = 0x8 + FLAG_PRIORITY = 0x20 + + def __init__( + self, + state=None, + length=0, + flags=FLAG_NO_FLAGS, + stream_id=0x0): + valid_flags = reduce(lambda x, y: x | y, self.VALID_FLAGS, 0x0) + if flags | valid_flags != valid_flags: + raise ValueError('invalid flags detected.') + + if state is None: + class State(object): + pass + + state = State() + state.http2_settings = HTTP2_DEFAULT_SETTINGS.copy() + state.encoder = Encoder() + state.decoder = Decoder() + + self.state = state + + self.length = length + self.type = self.TYPE + self.flags = flags + self.stream_id = stream_id + + @classmethod + def _check_frame_size(cls, length, state): + if state: + settings = state.http2_settings + else: + settings = HTTP2_DEFAULT_SETTINGS.copy() + + max_frame_size = settings[ + SettingsFrame.SETTINGS.SETTINGS_MAX_FRAME_SIZE] + + if length > max_frame_size: + raise FrameSizeError( + "Frame size exceeded: %d, but only %d allowed." % ( + length, max_frame_size)) + + @classmethod + def from_file(cls, fp, state=None): + """ + read a HTTP/2 frame sent by a server or client + fp is a "file like" object that could be backed by a network + stream or a disk or an in memory stream reader + """ + raw_header = fp.safe_read(9) + + fields = struct.unpack("!HBBBL", raw_header) + length = (fields[0] << 8) + fields[1] + flags = fields[3] + stream_id = fields[4] + + if raw_header[:4] == b'HTTP': # pragma no cover + print >> sys.stderr, "WARNING: This looks like an HTTP/1 connection!" + + cls._check_frame_size(length, state) + + payload = fp.safe_read(length) + return FRAMES[fields[2]].from_bytes( + state, + length, + flags, + stream_id, + payload) + + def to_bytes(self): + payload = self.payload_bytes() + self.length = len(payload) + + self._check_frame_size(self.length, self.state) + + b = struct.pack('!HB', (self.length & 0xFFFF00) >> 8, self.length & 0x0000FF) + b += struct.pack('!B', self.TYPE) + b += struct.pack('!B', self.flags) + b += struct.pack('!L', self.stream_id & 0x7FFFFFFF) + b += payload + + return b + + def payload_bytes(self): # pragma: no cover + raise NotImplementedError() + + def payload_human_readable(self): # pragma: no cover + raise NotImplementedError() + + def human_readable(self, direction="-"): + self.length = len(self.payload_bytes()) + + return "\n".join([ + "%s: %s | length: %d | flags: %#x | stream_id: %d" % ( + direction, self.__class__.__name__, self.length, self.flags, self.stream_id), + self.payload_human_readable(), + "===============================================================", + ]) + + def __eq__(self, other): + return self.to_bytes() == other.to_bytes() + + +class DataFrame(Frame): + TYPE = 0x0 + VALID_FLAGS = [Frame.FLAG_END_STREAM, Frame.FLAG_PADDED] + + def __init__( + self, + state=None, + length=0, + flags=Frame.FLAG_NO_FLAGS, + stream_id=0x0, + payload=b'', + pad_length=0): + super(DataFrame, self).__init__(state, length, flags, stream_id) + self.payload = payload + self.pad_length = pad_length + + @classmethod + def from_bytes(cls, state, length, flags, stream_id, payload): + f = cls(state=state, length=length, flags=flags, stream_id=stream_id) + + if f.flags & Frame.FLAG_PADDED: + f.pad_length = struct.unpack('!B', payload[0])[0] + f.payload = payload[1:-f.pad_length] + else: + f.payload = payload + + return f + + def payload_bytes(self): + if self.stream_id == 0x0: + raise ValueError('DATA frames MUST be associated with a stream.') + + b = b'' + if self.flags & self.FLAG_PADDED: + b += struct.pack('!B', self.pad_length) + + b += bytes(self.payload) + + if self.flags & self.FLAG_PADDED: + b += b'\0' * self.pad_length + + return b + + def payload_human_readable(self): + return "payload: %s" % str(self.payload) + + +class HeadersFrame(Frame): + TYPE = 0x1 + VALID_FLAGS = [ + Frame.FLAG_END_STREAM, + Frame.FLAG_END_HEADERS, + Frame.FLAG_PADDED, + Frame.FLAG_PRIORITY] + + def __init__( + self, + state=None, + length=0, + flags=Frame.FLAG_NO_FLAGS, + stream_id=0x0, + header_block_fragment=b'', + pad_length=0, + exclusive=False, + stream_dependency=0x0, + weight=0): + super(HeadersFrame, self).__init__(state, length, flags, stream_id) + + self.header_block_fragment = header_block_fragment + self.pad_length = pad_length + self.exclusive = exclusive + self.stream_dependency = stream_dependency + self.weight = weight + + @classmethod + def from_bytes(cls, state, length, flags, stream_id, payload): + f = cls(state=state, length=length, flags=flags, stream_id=stream_id) + + if f.flags & Frame.FLAG_PADDED: + f.pad_length = struct.unpack('!B', payload[0])[0] + f.header_block_fragment = payload[1:-f.pad_length] + else: + f.header_block_fragment = payload[0:] + + if f.flags & Frame.FLAG_PRIORITY: + f.stream_dependency, f.weight = struct.unpack( + '!LB', f.header_block_fragment[:5]) + f.exclusive = bool(f.stream_dependency >> 31) + f.stream_dependency &= 0x7FFFFFFF + f.header_block_fragment = f.header_block_fragment[5:] + + return f + + def payload_bytes(self): + if self.stream_id == 0x0: + raise ValueError('HEADERS frames MUST be associated with a stream.') + + b = b'' + if self.flags & self.FLAG_PADDED: + b += struct.pack('!B', self.pad_length) + + if self.flags & self.FLAG_PRIORITY: + b += struct.pack('!LB', + (int(self.exclusive) << 31) | self.stream_dependency, + self.weight) + + b += self.header_block_fragment + + if self.flags & self.FLAG_PADDED: + b += b'\0' * self.pad_length + + return b + + def payload_human_readable(self): + s = [] + + if self.flags & self.FLAG_PRIORITY: + s.append("exclusive: %d" % self.exclusive) + s.append("stream dependency: %#x" % self.stream_dependency) + s.append("weight: %d" % self.weight) + + if self.flags & self.FLAG_PADDED: + s.append("padding: %d" % self.pad_length) + + s.append( + "header_block_fragment: %s" % + self.header_block_fragment.encode('hex')) + + return "\n".join(s) + + +class PriorityFrame(Frame): + TYPE = 0x2 + VALID_FLAGS = [] + + def __init__( + self, + state=None, + length=0, + flags=Frame.FLAG_NO_FLAGS, + stream_id=0x0, + exclusive=False, + stream_dependency=0x0, + weight=0): + super(PriorityFrame, self).__init__(state, length, flags, stream_id) + self.exclusive = exclusive + self.stream_dependency = stream_dependency + self.weight = weight + + @classmethod + def from_bytes(cls, state, length, flags, stream_id, payload): + f = cls(state=state, length=length, flags=flags, stream_id=stream_id) + + f.stream_dependency, f.weight = struct.unpack('!LB', payload) + f.exclusive = bool(f.stream_dependency >> 31) + f.stream_dependency &= 0x7FFFFFFF + + return f + + def payload_bytes(self): + if self.stream_id == 0x0: + raise ValueError( + 'PRIORITY frames MUST be associated with a stream.') + + return struct.pack( + '!LB', + (int( + self.exclusive) << 31) | self.stream_dependency, + self.weight) + + def payload_human_readable(self): + s = [] + s.append("exclusive: %d" % self.exclusive) + s.append("stream dependency: %#x" % self.stream_dependency) + s.append("weight: %d" % self.weight) + return "\n".join(s) + + +class RstStreamFrame(Frame): + TYPE = 0x3 + VALID_FLAGS = [] + + def __init__( + self, + state=None, + length=0, + flags=Frame.FLAG_NO_FLAGS, + stream_id=0x0, + error_code=0x0): + super(RstStreamFrame, self).__init__(state, length, flags, stream_id) + self.error_code = error_code + + @classmethod + def from_bytes(cls, state, length, flags, stream_id, payload): + f = cls(state=state, length=length, flags=flags, stream_id=stream_id) + f.error_code = struct.unpack('!L', payload)[0] + return f + + def payload_bytes(self): + if self.stream_id == 0x0: + raise ValueError( + 'RST_STREAM frames MUST be associated with a stream.') + + return struct.pack('!L', self.error_code) + + def payload_human_readable(self): + return "error code: %#x" % self.error_code + + +class SettingsFrame(Frame): + TYPE = 0x4 + VALID_FLAGS = [Frame.FLAG_ACK] + + SETTINGS = utils.BiDi( + SETTINGS_HEADER_TABLE_SIZE=0x1, + SETTINGS_ENABLE_PUSH=0x2, + SETTINGS_MAX_CONCURRENT_STREAMS=0x3, + SETTINGS_INITIAL_WINDOW_SIZE=0x4, + SETTINGS_MAX_FRAME_SIZE=0x5, + SETTINGS_MAX_HEADER_LIST_SIZE=0x6, + ) + + def __init__( + self, + state=None, + length=0, + flags=Frame.FLAG_NO_FLAGS, + stream_id=0x0, + settings=None): + super(SettingsFrame, self).__init__(state, length, flags, stream_id) + + if settings is None: + settings = {} + + self.settings = settings + + @classmethod + def from_bytes(cls, state, length, flags, stream_id, payload): + f = cls(state=state, length=length, flags=flags, stream_id=stream_id) + + for i in xrange(0, len(payload), 6): + identifier, value = struct.unpack("!HL", payload[i:i + 6]) + f.settings[identifier] = value + + return f + + def payload_bytes(self): + if self.stream_id != 0x0: + raise ValueError( + 'SETTINGS frames MUST NOT be associated with a stream.') + + b = b'' + for identifier, value in self.settings.items(): + b += struct.pack("!HL", identifier & 0xFF, value) + + return b + + def payload_human_readable(self): + s = [] + + for identifier, value in self.settings.items(): + s.append("%s: %#x" % (self.SETTINGS.get_name(identifier), value)) + + if not s: + return "settings: None" + else: + return "\n".join(s) + + +class PushPromiseFrame(Frame): + TYPE = 0x5 + VALID_FLAGS = [Frame.FLAG_END_HEADERS, Frame.FLAG_PADDED] + + def __init__( + self, + state=None, + length=0, + flags=Frame.FLAG_NO_FLAGS, + stream_id=0x0, + promised_stream=0x0, + header_block_fragment=b'', + pad_length=0): + super(PushPromiseFrame, self).__init__(state, length, flags, stream_id) + self.pad_length = pad_length + self.promised_stream = promised_stream + self.header_block_fragment = header_block_fragment + + @classmethod + def from_bytes(cls, state, length, flags, stream_id, payload): + f = cls(state=state, length=length, flags=flags, stream_id=stream_id) + + if f.flags & Frame.FLAG_PADDED: + f.pad_length, f.promised_stream = struct.unpack('!BL', payload[:5]) + f.header_block_fragment = payload[5:-f.pad_length] + else: + f.promised_stream = int(struct.unpack("!L", payload[:4])[0]) + f.header_block_fragment = payload[4:] + + f.promised_stream &= 0x7FFFFFFF + + return f + + def payload_bytes(self): + if self.stream_id == 0x0: + raise ValueError( + 'PUSH_PROMISE frames MUST be associated with a stream.') + + if self.promised_stream == 0x0: + raise ValueError('Promised stream id not valid.') + + b = b'' + if self.flags & self.FLAG_PADDED: + b += struct.pack('!B', self.pad_length) + + b += struct.pack('!L', self.promised_stream & 0x7FFFFFFF) + b += bytes(self.header_block_fragment) + + if self.flags & self.FLAG_PADDED: + b += b'\0' * self.pad_length + + return b + + def payload_human_readable(self): + s = [] + + if self.flags & self.FLAG_PADDED: + s.append("padding: %d" % self.pad_length) + + s.append("promised stream: %#x" % self.promised_stream) + s.append( + "header_block_fragment: %s" % + self.header_block_fragment.encode('hex')) + + return "\n".join(s) + + +class PingFrame(Frame): + TYPE = 0x6 + VALID_FLAGS = [Frame.FLAG_ACK] + + def __init__( + self, + state=None, + length=0, + flags=Frame.FLAG_NO_FLAGS, + stream_id=0x0, + payload=b''): + super(PingFrame, self).__init__(state, length, flags, stream_id) + self.payload = payload + + @classmethod + def from_bytes(cls, state, length, flags, stream_id, payload): + f = cls(state=state, length=length, flags=flags, stream_id=stream_id) + f.payload = payload + return f + + def payload_bytes(self): + if self.stream_id != 0x0: + raise ValueError( + 'PING frames MUST NOT be associated with a stream.') + + b = self.payload[0:8] + b += b'\0' * (8 - len(b)) + return b + + def payload_human_readable(self): + return "opaque data: %s" % str(self.payload) + + +class GoAwayFrame(Frame): + TYPE = 0x7 + VALID_FLAGS = [] + + def __init__( + self, + state=None, + length=0, + flags=Frame.FLAG_NO_FLAGS, + stream_id=0x0, + last_stream=0x0, + error_code=0x0, + data=b''): + super(GoAwayFrame, self).__init__(state, length, flags, stream_id) + self.last_stream = last_stream + self.error_code = error_code + self.data = data + + @classmethod + def from_bytes(cls, state, length, flags, stream_id, payload): + f = cls(state=state, length=length, flags=flags, stream_id=stream_id) + + f.last_stream, f.error_code = struct.unpack("!LL", payload[:8]) + f.last_stream &= 0x7FFFFFFF + f.data = payload[8:] + + return f + + def payload_bytes(self): + if self.stream_id != 0x0: + raise ValueError( + 'GOAWAY frames MUST NOT be associated with a stream.') + + b = struct.pack('!LL', self.last_stream & 0x7FFFFFFF, self.error_code) + b += bytes(self.data) + return b + + def payload_human_readable(self): + s = [] + s.append("last stream: %#x" % self.last_stream) + s.append("error code: %d" % self.error_code) + s.append("debug data: %s" % str(self.data)) + return "\n".join(s) + + +class WindowUpdateFrame(Frame): + TYPE = 0x8 + VALID_FLAGS = [] + + def __init__( + self, + state=None, + length=0, + flags=Frame.FLAG_NO_FLAGS, + stream_id=0x0, + window_size_increment=0x0): + super(WindowUpdateFrame, self).__init__(state, length, flags, stream_id) + self.window_size_increment = window_size_increment + + @classmethod + def from_bytes(cls, state, length, flags, stream_id, payload): + f = cls(state=state, length=length, flags=flags, stream_id=stream_id) + + f.window_size_increment = struct.unpack("!L", payload)[0] + f.window_size_increment &= 0x7FFFFFFF + + return f + + def payload_bytes(self): + if self.window_size_increment <= 0 or self.window_size_increment >= 2 ** 31: + raise ValueError( + 'Window Size Increment MUST be greater than 0 and less than 2^31.') + + return struct.pack('!L', self.window_size_increment & 0x7FFFFFFF) + + def payload_human_readable(self): + return "window size increment: %#x" % self.window_size_increment + + +class ContinuationFrame(Frame): + TYPE = 0x9 + VALID_FLAGS = [Frame.FLAG_END_HEADERS] + + def __init__( + self, + state=None, + length=0, + flags=Frame.FLAG_NO_FLAGS, + stream_id=0x0, + header_block_fragment=b''): + super(ContinuationFrame, self).__init__(state, length, flags, stream_id) + self.header_block_fragment = header_block_fragment + + @classmethod + def from_bytes(cls, state, length, flags, stream_id, payload): + f = cls(state=state, length=length, flags=flags, stream_id=stream_id) + f.header_block_fragment = payload + return f + + def payload_bytes(self): + if self.stream_id == 0x0: + raise ValueError( + 'CONTINUATION frames MUST be associated with a stream.') + + return self.header_block_fragment + + def payload_human_readable(self): + s = [] + s.append( + "header_block_fragment: %s" % + self.header_block_fragment.encode('hex')) + return "\n".join(s) + +_FRAME_CLASSES = [ + DataFrame, + HeadersFrame, + PriorityFrame, + RstStreamFrame, + SettingsFrame, + PushPromiseFrame, + PingFrame, + GoAwayFrame, + WindowUpdateFrame, + ContinuationFrame +] +FRAMES = {cls.TYPE: cls for cls in _FRAME_CLASSES} + + +HTTP2_DEFAULT_SETTINGS = { + SettingsFrame.SETTINGS.SETTINGS_HEADER_TABLE_SIZE: 4096, + SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 1, + SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS: None, + SettingsFrame.SETTINGS.SETTINGS_INITIAL_WINDOW_SIZE: 2 ** 16 - 1, + SettingsFrame.SETTINGS.SETTINGS_MAX_FRAME_SIZE: 2 ** 14, + SettingsFrame.SETTINGS.SETTINGS_MAX_HEADER_LIST_SIZE: None, +} diff --git a/netlib/http/http2/protocol.py b/netlib/http/http2/protocol.py deleted file mode 100644 index b6d376d3..00000000 --- a/netlib/http/http2/protocol.py +++ /dev/null @@ -1,412 +0,0 @@ -from __future__ import (absolute_import, print_function, division) -import itertools -import time - -from hpack.hpack import Encoder, Decoder -from netlib import http, utils -from netlib.http import semantics -from . import frame - - -class TCPHandler(object): - - def __init__(self, rfile, wfile=None): - self.rfile = rfile - self.wfile = wfile - - -class HTTP2Protocol(semantics.ProtocolMixin): - - ERROR_CODES = utils.BiDi( - NO_ERROR=0x0, - PROTOCOL_ERROR=0x1, - INTERNAL_ERROR=0x2, - FLOW_CONTROL_ERROR=0x3, - SETTINGS_TIMEOUT=0x4, - STREAM_CLOSED=0x5, - FRAME_SIZE_ERROR=0x6, - REFUSED_STREAM=0x7, - CANCEL=0x8, - COMPRESSION_ERROR=0x9, - CONNECT_ERROR=0xa, - ENHANCE_YOUR_CALM=0xb, - INADEQUATE_SECURITY=0xc, - HTTP_1_1_REQUIRED=0xd - ) - - CLIENT_CONNECTION_PREFACE = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" - - ALPN_PROTO_H2 = 'h2' - - def __init__( - self, - tcp_handler=None, - rfile=None, - wfile=None, - is_server=False, - dump_frames=False, - encoder=None, - decoder=None, - unhandled_frame_cb=None, - ): - self.tcp_handler = tcp_handler or TCPHandler(rfile, wfile) - self.is_server = is_server - self.dump_frames = dump_frames - self.encoder = encoder or Encoder() - self.decoder = decoder or Decoder() - self.unhandled_frame_cb = unhandled_frame_cb - - self.http2_settings = frame.HTTP2_DEFAULT_SETTINGS.copy() - self.current_stream_id = None - self.connection_preface_performed = False - - def read_request( - self, - include_body=True, - body_size_limit=None, - allow_empty=False, - ): - if body_size_limit is not None: - raise NotImplementedError() - - self.perform_connection_preface() - - timestamp_start = time.time() - if hasattr(self.tcp_handler.rfile, "reset_timestamps"): - self.tcp_handler.rfile.reset_timestamps() - - stream_id, headers, body = self._receive_transmission( - include_body=include_body, - ) - - if hasattr(self.tcp_handler.rfile, "first_byte_timestamp"): - # more accurate timestamp_start - timestamp_start = self.tcp_handler.rfile.first_byte_timestamp - - timestamp_end = time.time() - - authority = headers.get(':authority', '') - method = headers.get(':method', 'GET') - scheme = headers.get(':scheme', 'https') - path = headers.get(':path', '/') - host = None - port = None - - if path == '*' or path.startswith("/"): - form_in = "relative" - elif method == 'CONNECT': - form_in = "authority" - if ":" in authority: - host, port = authority.split(":", 1) - else: - host = authority - else: - form_in = "absolute" - # FIXME: verify if path or :host contains what we need - scheme, host, port, _ = utils.parse_url(path) - - if host is None: - host = 'localhost' - if port is None: - port = 80 if scheme == 'http' else 443 - port = int(port) - - request = http.Request( - form_in, - method, - scheme, - host, - port, - path, - (2, 0), - headers, - body, - timestamp_start, - timestamp_end, - ) - # FIXME: We should not do this. - request.stream_id = stream_id - - return request - - def read_response( - self, - request_method='', - body_size_limit=None, - include_body=True, - stream_id=None, - ): - if body_size_limit is not None: - raise NotImplementedError() - - self.perform_connection_preface() - - timestamp_start = time.time() - if hasattr(self.tcp_handler.rfile, "reset_timestamps"): - self.tcp_handler.rfile.reset_timestamps() - - stream_id, headers, body = self._receive_transmission( - stream_id=stream_id, - include_body=include_body, - ) - - if hasattr(self.tcp_handler.rfile, "first_byte_timestamp"): - # more accurate timestamp_start - timestamp_start = self.tcp_handler.rfile.first_byte_timestamp - - if include_body: - timestamp_end = time.time() - else: - timestamp_end = None - - response = http.Response( - (2, 0), - int(headers.get(':status', 502)), - "", - headers, - body, - timestamp_start=timestamp_start, - timestamp_end=timestamp_end, - ) - response.stream_id = stream_id - - return response - - def assemble_request(self, request): - assert isinstance(request, semantics.Request) - - authority = self.tcp_handler.sni if self.tcp_handler.sni else self.tcp_handler.address.host - if self.tcp_handler.address.port != 443: - authority += ":%d" % self.tcp_handler.address.port - - headers = request.headers.copy() - - if ':authority' not in headers: - headers.fields.insert(0, (':authority', bytes(authority))) - if ':scheme' not in headers: - headers.fields.insert(0, (':scheme', bytes(request.scheme))) - if ':path' not in headers: - headers.fields.insert(0, (':path', bytes(request.path))) - if ':method' not in headers: - headers.fields.insert(0, (':method', bytes(request.method))) - - if hasattr(request, 'stream_id'): - stream_id = request.stream_id - else: - stream_id = self._next_stream_id() - - return list(itertools.chain( - self._create_headers(headers, stream_id, end_stream=(request.body is None or len(request.body) == 0)), - self._create_body(request.body, stream_id))) - - def assemble_response(self, response): - assert isinstance(response, semantics.Response) - - headers = response.headers.copy() - - if ':status' not in headers: - headers.fields.insert(0, (':status', bytes(str(response.status_code)))) - - if hasattr(response, 'stream_id'): - stream_id = response.stream_id - else: - stream_id = self._next_stream_id() - - return list(itertools.chain( - self._create_headers(headers, stream_id, end_stream=(response.body is None or len(response.body) == 0)), - self._create_body(response.body, stream_id), - )) - - def perform_connection_preface(self, force=False): - if force or not self.connection_preface_performed: - if self.is_server: - self.perform_server_connection_preface(force) - else: - self.perform_client_connection_preface(force) - - def perform_server_connection_preface(self, force=False): - if force or not self.connection_preface_performed: - self.connection_preface_performed = True - - magic_length = len(self.CLIENT_CONNECTION_PREFACE) - magic = self.tcp_handler.rfile.safe_read(magic_length) - assert magic == self.CLIENT_CONNECTION_PREFACE - - frm = frame.SettingsFrame(state=self, settings={ - frame.SettingsFrame.SETTINGS.SETTINGS_ENABLE_PUSH: 0, - frame.SettingsFrame.SETTINGS.SETTINGS_MAX_CONCURRENT_STREAMS: 1, - }) - self.send_frame(frm, hide=True) - self._receive_settings(hide=True) - - def perform_client_connection_preface(self, force=False): - if force or not self.connection_preface_performed: - self.connection_preface_performed = True - - self.tcp_handler.wfile.write(self.CLIENT_CONNECTION_PREFACE) - - self.send_frame(frame.SettingsFrame(state=self), hide=True) - self._receive_settings(hide=True) # server announces own settings - self._receive_settings(hide=True) # server acks my settings - - def send_frame(self, frm, hide=False): - raw_bytes = frm.to_bytes() - self.tcp_handler.wfile.write(raw_bytes) - self.tcp_handler.wfile.flush() - if not hide and self.dump_frames: # pragma no cover - print(frm.human_readable(">>")) - - def read_frame(self, hide=False): - while True: - frm = frame.Frame.from_file(self.tcp_handler.rfile, self) - if not hide and self.dump_frames: # pragma no cover - print(frm.human_readable("<<")) - - if isinstance(frm, frame.PingFrame): - raw_bytes = frame.PingFrame(flags=frame.Frame.FLAG_ACK, payload=frm.payload).to_bytes() - self.tcp_handler.wfile.write(raw_bytes) - self.tcp_handler.wfile.flush() - continue - if isinstance(frm, frame.SettingsFrame) and not frm.flags & frame.Frame.FLAG_ACK: - self._apply_settings(frm.settings, hide) - if isinstance(frm, frame.DataFrame) and frm.length > 0: - self._update_flow_control_window(frm.stream_id, frm.length) - return frm - - def check_alpn(self): - alp = self.tcp_handler.get_alpn_proto_negotiated() - if alp != self.ALPN_PROTO_H2: - raise NotImplementedError( - "HTTP2Protocol can not handle unknown ALP: %s" % alp) - return True - - def _handle_unexpected_frame(self, frm): - if isinstance(frm, frame.SettingsFrame): - return - if self.unhandled_frame_cb: - self.unhandled_frame_cb(frm) - - def _receive_settings(self, hide=False): - while True: - frm = self.read_frame(hide) - if isinstance(frm, frame.SettingsFrame): - break - else: - self._handle_unexpected_frame(frm) - - def _next_stream_id(self): - if self.current_stream_id is None: - if self.is_server: - # servers must use even stream ids - self.current_stream_id = 2 - else: - # clients must use odd stream ids - self.current_stream_id = 1 - else: - self.current_stream_id += 2 - return self.current_stream_id - - def _apply_settings(self, settings, hide=False): - for setting, value in settings.items(): - old_value = self.http2_settings[setting] - if not old_value: - old_value = '-' - self.http2_settings[setting] = value - - frm = frame.SettingsFrame( - state=self, - flags=frame.Frame.FLAG_ACK) - self.send_frame(frm, hide) - - def _update_flow_control_window(self, stream_id, increment): - frm = frame.WindowUpdateFrame(stream_id=0, window_size_increment=increment) - self.send_frame(frm) - frm = frame.WindowUpdateFrame(stream_id=stream_id, window_size_increment=increment) - self.send_frame(frm) - - def _create_headers(self, headers, stream_id, end_stream=True): - def frame_cls(chunks): - for i in chunks: - if i == 0: - yield frame.HeadersFrame, i - else: - yield frame.ContinuationFrame, i - - header_block_fragment = self.encoder.encode(headers.fields) - - chunk_size = self.http2_settings[frame.SettingsFrame.SETTINGS.SETTINGS_MAX_FRAME_SIZE] - chunks = range(0, len(header_block_fragment), chunk_size) - frms = [frm_cls( - state=self, - flags=frame.Frame.FLAG_NO_FLAGS, - stream_id=stream_id, - header_block_fragment=header_block_fragment[i:i+chunk_size]) for frm_cls, i in frame_cls(chunks)] - - last_flags = frame.Frame.FLAG_END_HEADERS - if end_stream: - last_flags |= frame.Frame.FLAG_END_STREAM - frms[-1].flags = last_flags - - if self.dump_frames: # pragma no cover - for frm in frms: - print(frm.human_readable(">>")) - - return [frm.to_bytes() for frm in frms] - - def _create_body(self, body, stream_id): - if body is None or len(body) == 0: - return b'' - - chunk_size = self.http2_settings[frame.SettingsFrame.SETTINGS.SETTINGS_MAX_FRAME_SIZE] - chunks = range(0, len(body), chunk_size) - frms = [frame.DataFrame( - state=self, - flags=frame.Frame.FLAG_NO_FLAGS, - stream_id=stream_id, - payload=body[i:i+chunk_size]) for i in chunks] - frms[-1].flags = frame.Frame.FLAG_END_STREAM - - if self.dump_frames: # pragma no cover - for frm in frms: - print(frm.human_readable(">>")) - - return [frm.to_bytes() for frm in frms] - - def _receive_transmission(self, stream_id=None, include_body=True): - if not include_body: - raise NotImplementedError() - - body_expected = True - - header_block_fragment = b'' - body = b'' - - while True: - frm = self.read_frame() - if ( - (isinstance(frm, frame.HeadersFrame) or isinstance(frm, frame.ContinuationFrame)) and - (stream_id is None or frm.stream_id == stream_id) - ): - stream_id = frm.stream_id - header_block_fragment += frm.header_block_fragment - if frm.flags & frame.Frame.FLAG_END_STREAM: - body_expected = False - if frm.flags & frame.Frame.FLAG_END_HEADERS: - break - else: - self._handle_unexpected_frame(frm) - - while body_expected: - frm = self.read_frame() - if isinstance(frm, frame.DataFrame) and frm.stream_id == stream_id: - body += frm.payload - if frm.flags & frame.Frame.FLAG_END_STREAM: - break - else: - self._handle_unexpected_frame(frm) - - headers = http.Headers( - [[str(k), str(v)] for k, v in self.decoder.decode(header_block_fragment)] - ) - - return stream_id, headers, body -- cgit v1.2.3