diff options
| author | Maximilian Hils <git@maximilianhils.com> | 2015-09-20 19:40:09 +0200 | 
|---|---|---|
| committer | Maximilian Hils <git@maximilianhils.com> | 2015-09-20 19:40:09 +0200 | 
| commit | 693cdfc6d75e460a00585ccc9b734b80d6eba74d (patch) | |
| tree | 868aa79ce92bbadabd1e9e361643df415cc07492 | |
| parent | 3f1ca556d14ce71331b8dbc69be4db670863271a (diff) | |
| download | mitmproxy-693cdfc6d75e460a00585ccc9b734b80d6eba74d.tar.gz mitmproxy-693cdfc6d75e460a00585ccc9b734b80d6eba74d.tar.bz2 mitmproxy-693cdfc6d75e460a00585ccc9b734b80d6eba74d.zip | |
python3++
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | .travis.yml | 1 | ||||
| -rw-r--r-- | netlib/certutils.py | 6 | ||||
| -rw-r--r-- | netlib/socks.py | 22 | ||||
| -rw-r--r-- | netlib/utils.py | 6 | ||||
| -rw-r--r-- | test/test_certutils.py | 10 | ||||
| -rw-r--r-- | test/test_socks.py | 18 | 
7 files changed, 36 insertions, 28 deletions
| @@ -13,3 +13,4 @@ _cffi__*  .eggs/  netlib.egg-info/  pathod/ +.cache/
\ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 00f8b4db..c8cbeaa2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,6 +22,7 @@ matrix:          - nosetests --with-cov --cov-report term-missing test/test_encoding.py          - nosetests --with-cov --cov-report term-missing test/test_odict.py          - nosetests --with-cov --cov-report term-missing test/test_certutils.py +        - nosetests --with-cov --cov-report term-missing test/test_socks.py      - python: pypy      - python: pypy        env: OPENSSL=1.0.2 diff --git a/netlib/certutils.py b/netlib/certutils.py index df793537..b3ddcbe4 100644 --- a/netlib/certutils.py +++ b/netlib/certutils.py @@ -3,7 +3,7 @@ import os  import ssl  import time  import datetime -import itertools +from six.moves import filter  import ipaddress  import sys @@ -396,12 +396,12 @@ class SSLCert(object):      @property      def notbefore(self):          t = self.x509.get_notBefore() -        return datetime.datetime.strptime(t, "%Y%m%d%H%M%SZ") +        return datetime.datetime.strptime(t.decode("ascii"), "%Y%m%d%H%M%SZ")      @property      def notafter(self):          t = self.x509.get_notAfter() -        return datetime.datetime.strptime(t, "%Y%m%d%H%M%SZ") +        return datetime.datetime.strptime(t.decode("ascii"), "%Y%m%d%H%M%SZ")      @property      def has_expired(self): diff --git a/netlib/socks.py b/netlib/socks.py index d38b88c8..51ad1c63 100644 --- a/netlib/socks.py +++ b/netlib/socks.py @@ -1,7 +1,7 @@  from __future__ import (absolute_import, print_function, division) -import socket  import struct  import array +import ipaddress  from . import tcp, utils @@ -133,19 +133,23 @@ class Message(object):      def from_file(cls, f):          ver, msg, rsv, atyp = struct.unpack("!BBBB", f.safe_read(4))          if rsv != 0x00: -            raise SocksError(REP.GENERAL_SOCKS_SERVER_FAILURE, -                             "Socks Request: Invalid reserved byte: %s" % rsv) - +            raise SocksError( +                REP.GENERAL_SOCKS_SERVER_FAILURE, +                "Socks Request: Invalid reserved byte: %s" % rsv +            )          if atyp == ATYP.IPV4_ADDRESS:              # We use tnoa here as ntop is not commonly available on Windows. -            host = socket.inet_ntoa(f.safe_read(4)) +            host = ipaddress.IPv4Address(f.safe_read(4)).compressed              use_ipv6 = False          elif atyp == ATYP.IPV6_ADDRESS: -            host = socket.inet_ntop(socket.AF_INET6, f.safe_read(16)) +            host = ipaddress.IPv6Address(f.safe_read(16)).compressed              use_ipv6 = True          elif atyp == ATYP.DOMAINNAME:              length, = struct.unpack("!B", f.safe_read(1))              host = f.safe_read(length) +            if not utils.is_valid_host(host): +                raise SocksError(REP.GENERAL_SOCKS_SERVER_FAILURE, "Invalid hostname: %s" % host) +            host = host.decode("idna")              use_ipv6 = False          else:              raise SocksError(REP.ADDRESS_TYPE_NOT_SUPPORTED, @@ -158,12 +162,12 @@ class Message(object):      def to_file(self, f):          f.write(struct.pack("!BBBB", self.ver, self.msg, 0x00, self.atyp))          if self.atyp == ATYP.IPV4_ADDRESS: -            f.write(socket.inet_aton(self.addr.host)) +            f.write(ipaddress.IPv4Address(self.addr.host).packed)          elif self.atyp == ATYP.IPV6_ADDRESS: -            f.write(socket.inet_pton(socket.AF_INET6, self.addr.host)) +            f.write(ipaddress.IPv6Address(self.addr.host).packed)          elif self.atyp == ATYP.DOMAINNAME:              f.write(struct.pack("!B", len(self.addr.host))) -            f.write(self.addr.host) +            f.write(self.addr.host.encode("idna"))          else:              raise SocksError(                  REP.ADDRESS_TYPE_NOT_SUPPORTED, diff --git a/netlib/utils.py b/netlib/utils.py index 6fed44b6..799b0d42 100644 --- a/netlib/utils.py +++ b/netlib/utils.py @@ -141,6 +141,12 @@ _label_valid = re.compile(b"(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE)  def is_valid_host(host): +    """ +    Checks if a hostname is valid. + +    Args: +      host (bytes): The hostname +    """      try:          host.decode("idna")      except ValueError: diff --git a/test/test_certutils.py b/test/test_certutils.py index fc91609e..991d59d6 100644 --- a/test/test_certutils.py +++ b/test/test_certutils.py @@ -100,10 +100,10 @@ class TestDummyCert:              r = certutils.dummy_cert(                  ca.default_privatekey,                  ca.default_ca, -                "foo.com", -                ["one.com", "two.com", "*.three.com"] +                b"foo.com", +                [b"one.com", b"two.com", b"*.three.com"]              ) -            assert r.cn == "foo.com" +            assert r.cn == b"foo.com"  class TestSSLCert: @@ -112,13 +112,13 @@ class TestSSLCert:          with open(tutils.test_data.path("data/text_cert"), "rb") as f:              d = f.read()          c1 = certutils.SSLCert.from_pem(d) -        assert c1.cn == "google.com" +        assert c1.cn == b"google.com"          assert len(c1.altnames) == 436          with open(tutils.test_data.path("data/text_cert_2"), "rb") as f:              d = f.read()          c2 = certutils.SSLCert.from_pem(d) -        assert c2.cn == "www.inode.co.nz" +        assert c2.cn == b"www.inode.co.nz"          assert len(c2.altnames) == 2          assert c2.digest("sha1")          assert c2.notbefore diff --git a/test/test_socks.py b/test/test_socks.py index f2fb9b98..dd8e2807 100644 --- a/test/test_socks.py +++ b/test/test_socks.py @@ -1,6 +1,6 @@ +import ipaddress  from io import BytesIO  import socket -from nose.plugins.skip import SkipTest  from netlib import socks, tcp, tutils @@ -33,7 +33,7 @@ def test_client_greeting_assert_socks5():      else:          assert False -    raw = tutils.treader(b"GET / HTTP/1.1" + " " * 100) +    raw = tutils.treader(b"GET / HTTP/1.1" + b" " * 100)      msg = socks.ClientGreeting.from_file(raw)      try:          msg.assert_socks5() @@ -64,7 +64,7 @@ def test_server_greeting():  def test_server_greeting_assert_socks5(): -    raw = tutils.treader(b"HTTP/1.1 200 OK" + " " * 100) +    raw = tutils.treader(b"HTTP/1.1 200 OK" + b" " * 100)      msg = socks.ServerGreeting.from_file(raw)      try:          msg.assert_socks5() @@ -74,7 +74,7 @@ def test_server_greeting_assert_socks5():      else:          assert False -    raw = tutils.treader(b"GET / HTTP/1.1" + " " * 100) +    raw = tutils.treader(b"GET / HTTP/1.1" + b" " * 100)      msg = socks.ServerGreeting.from_file(raw)      try:          msg.assert_socks5() @@ -97,7 +97,7 @@ def test_message():      assert msg.ver == 5      assert msg.msg == 0x01      assert msg.atyp == 0x03 -    assert msg.addr == (b"example.com", 0xDEAD) +    assert msg.addr == ("example.com", 0xDEAD)  def test_message_assert_socks5(): @@ -116,20 +116,16 @@ def test_message_ipv4():      msg.to_file(out)      assert out.getvalue() == raw.getvalue()[:-2] -    assert msg.addr == (b"127.0.0.1", 0xDEAD) +    assert msg.addr == ("127.0.0.1", 0xDEAD)  def test_message_ipv6(): -    if not hasattr(socket, "inet_ntop"): -        raise SkipTest("Skipped because inet_ntop is not available")      # Test ATYP=0x04 (IPV6)      ipv6_addr = "2001:db8:85a3:8d3:1319:8a2e:370:7344"      raw = tutils.treader(          b"\x05\x01\x00\x04" + -        socket.inet_pton( -            socket.AF_INET6, -            ipv6_addr) + +        ipaddress.IPv6Address(ipv6_addr).packed +          b"\xDE\xAD\xBE\xEF")      out = BytesIO()      msg = socks.Message.from_file(raw) | 
