diff options
author | Maximilian Hils <git@maximilianhils.com> | 2015-09-20 19:40:09 +0200 |
---|---|---|
committer | Maximilian Hils <git@maximilianhils.com> | 2015-09-20 19:40:09 +0200 |
commit | 693cdfc6d75e460a00585ccc9b734b80d6eba74d (patch) | |
tree | 868aa79ce92bbadabd1e9e361643df415cc07492 | |
parent | 3f1ca556d14ce71331b8dbc69be4db670863271a (diff) | |
download | mitmproxy-693cdfc6d75e460a00585ccc9b734b80d6eba74d.tar.gz mitmproxy-693cdfc6d75e460a00585ccc9b734b80d6eba74d.tar.bz2 mitmproxy-693cdfc6d75e460a00585ccc9b734b80d6eba74d.zip |
python3++
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | .travis.yml | 1 | ||||
-rw-r--r-- | netlib/certutils.py | 6 | ||||
-rw-r--r-- | netlib/socks.py | 22 | ||||
-rw-r--r-- | netlib/utils.py | 6 | ||||
-rw-r--r-- | test/test_certutils.py | 10 | ||||
-rw-r--r-- | test/test_socks.py | 18 |
7 files changed, 36 insertions, 28 deletions
@@ -13,3 +13,4 @@ _cffi__* .eggs/ netlib.egg-info/ pathod/ +.cache/
\ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 00f8b4db..c8cbeaa2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,6 +22,7 @@ matrix: - nosetests --with-cov --cov-report term-missing test/test_encoding.py - nosetests --with-cov --cov-report term-missing test/test_odict.py - nosetests --with-cov --cov-report term-missing test/test_certutils.py + - nosetests --with-cov --cov-report term-missing test/test_socks.py - python: pypy - python: pypy env: OPENSSL=1.0.2 diff --git a/netlib/certutils.py b/netlib/certutils.py index df793537..b3ddcbe4 100644 --- a/netlib/certutils.py +++ b/netlib/certutils.py @@ -3,7 +3,7 @@ import os import ssl import time import datetime -import itertools +from six.moves import filter import ipaddress import sys @@ -396,12 +396,12 @@ class SSLCert(object): @property def notbefore(self): t = self.x509.get_notBefore() - return datetime.datetime.strptime(t, "%Y%m%d%H%M%SZ") + return datetime.datetime.strptime(t.decode("ascii"), "%Y%m%d%H%M%SZ") @property def notafter(self): t = self.x509.get_notAfter() - return datetime.datetime.strptime(t, "%Y%m%d%H%M%SZ") + return datetime.datetime.strptime(t.decode("ascii"), "%Y%m%d%H%M%SZ") @property def has_expired(self): diff --git a/netlib/socks.py b/netlib/socks.py index d38b88c8..51ad1c63 100644 --- a/netlib/socks.py +++ b/netlib/socks.py @@ -1,7 +1,7 @@ from __future__ import (absolute_import, print_function, division) -import socket import struct import array +import ipaddress from . import tcp, utils @@ -133,19 +133,23 @@ class Message(object): def from_file(cls, f): ver, msg, rsv, atyp = struct.unpack("!BBBB", f.safe_read(4)) if rsv != 0x00: - raise SocksError(REP.GENERAL_SOCKS_SERVER_FAILURE, - "Socks Request: Invalid reserved byte: %s" % rsv) - + raise SocksError( + REP.GENERAL_SOCKS_SERVER_FAILURE, + "Socks Request: Invalid reserved byte: %s" % rsv + ) if atyp == ATYP.IPV4_ADDRESS: # We use tnoa here as ntop is not commonly available on Windows. - host = socket.inet_ntoa(f.safe_read(4)) + host = ipaddress.IPv4Address(f.safe_read(4)).compressed use_ipv6 = False elif atyp == ATYP.IPV6_ADDRESS: - host = socket.inet_ntop(socket.AF_INET6, f.safe_read(16)) + host = ipaddress.IPv6Address(f.safe_read(16)).compressed use_ipv6 = True elif atyp == ATYP.DOMAINNAME: length, = struct.unpack("!B", f.safe_read(1)) host = f.safe_read(length) + if not utils.is_valid_host(host): + raise SocksError(REP.GENERAL_SOCKS_SERVER_FAILURE, "Invalid hostname: %s" % host) + host = host.decode("idna") use_ipv6 = False else: raise SocksError(REP.ADDRESS_TYPE_NOT_SUPPORTED, @@ -158,12 +162,12 @@ class Message(object): def to_file(self, f): f.write(struct.pack("!BBBB", self.ver, self.msg, 0x00, self.atyp)) if self.atyp == ATYP.IPV4_ADDRESS: - f.write(socket.inet_aton(self.addr.host)) + f.write(ipaddress.IPv4Address(self.addr.host).packed) elif self.atyp == ATYP.IPV6_ADDRESS: - f.write(socket.inet_pton(socket.AF_INET6, self.addr.host)) + f.write(ipaddress.IPv6Address(self.addr.host).packed) elif self.atyp == ATYP.DOMAINNAME: f.write(struct.pack("!B", len(self.addr.host))) - f.write(self.addr.host) + f.write(self.addr.host.encode("idna")) else: raise SocksError( REP.ADDRESS_TYPE_NOT_SUPPORTED, diff --git a/netlib/utils.py b/netlib/utils.py index 6fed44b6..799b0d42 100644 --- a/netlib/utils.py +++ b/netlib/utils.py @@ -141,6 +141,12 @@ _label_valid = re.compile(b"(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE) def is_valid_host(host): + """ + Checks if a hostname is valid. + + Args: + host (bytes): The hostname + """ try: host.decode("idna") except ValueError: diff --git a/test/test_certutils.py b/test/test_certutils.py index fc91609e..991d59d6 100644 --- a/test/test_certutils.py +++ b/test/test_certutils.py @@ -100,10 +100,10 @@ class TestDummyCert: r = certutils.dummy_cert( ca.default_privatekey, ca.default_ca, - "foo.com", - ["one.com", "two.com", "*.three.com"] + b"foo.com", + [b"one.com", b"two.com", b"*.three.com"] ) - assert r.cn == "foo.com" + assert r.cn == b"foo.com" class TestSSLCert: @@ -112,13 +112,13 @@ class TestSSLCert: with open(tutils.test_data.path("data/text_cert"), "rb") as f: d = f.read() c1 = certutils.SSLCert.from_pem(d) - assert c1.cn == "google.com" + assert c1.cn == b"google.com" assert len(c1.altnames) == 436 with open(tutils.test_data.path("data/text_cert_2"), "rb") as f: d = f.read() c2 = certutils.SSLCert.from_pem(d) - assert c2.cn == "www.inode.co.nz" + assert c2.cn == b"www.inode.co.nz" assert len(c2.altnames) == 2 assert c2.digest("sha1") assert c2.notbefore diff --git a/test/test_socks.py b/test/test_socks.py index f2fb9b98..dd8e2807 100644 --- a/test/test_socks.py +++ b/test/test_socks.py @@ -1,6 +1,6 @@ +import ipaddress from io import BytesIO import socket -from nose.plugins.skip import SkipTest from netlib import socks, tcp, tutils @@ -33,7 +33,7 @@ def test_client_greeting_assert_socks5(): else: assert False - raw = tutils.treader(b"GET / HTTP/1.1" + " " * 100) + raw = tutils.treader(b"GET / HTTP/1.1" + b" " * 100) msg = socks.ClientGreeting.from_file(raw) try: msg.assert_socks5() @@ -64,7 +64,7 @@ def test_server_greeting(): def test_server_greeting_assert_socks5(): - raw = tutils.treader(b"HTTP/1.1 200 OK" + " " * 100) + raw = tutils.treader(b"HTTP/1.1 200 OK" + b" " * 100) msg = socks.ServerGreeting.from_file(raw) try: msg.assert_socks5() @@ -74,7 +74,7 @@ def test_server_greeting_assert_socks5(): else: assert False - raw = tutils.treader(b"GET / HTTP/1.1" + " " * 100) + raw = tutils.treader(b"GET / HTTP/1.1" + b" " * 100) msg = socks.ServerGreeting.from_file(raw) try: msg.assert_socks5() @@ -97,7 +97,7 @@ def test_message(): assert msg.ver == 5 assert msg.msg == 0x01 assert msg.atyp == 0x03 - assert msg.addr == (b"example.com", 0xDEAD) + assert msg.addr == ("example.com", 0xDEAD) def test_message_assert_socks5(): @@ -116,20 +116,16 @@ def test_message_ipv4(): msg.to_file(out) assert out.getvalue() == raw.getvalue()[:-2] - assert msg.addr == (b"127.0.0.1", 0xDEAD) + assert msg.addr == ("127.0.0.1", 0xDEAD) def test_message_ipv6(): - if not hasattr(socket, "inet_ntop"): - raise SkipTest("Skipped because inet_ntop is not available") # Test ATYP=0x04 (IPV6) ipv6_addr = "2001:db8:85a3:8d3:1319:8a2e:370:7344" raw = tutils.treader( b"\x05\x01\x00\x04" + - socket.inet_pton( - socket.AF_INET6, - ipv6_addr) + + ipaddress.IPv6Address(ipv6_addr).packed + b"\xDE\xAD\xBE\xEF") out = BytesIO() msg = socks.Message.from_file(raw) |