aboutsummaryrefslogtreecommitdiffstats
path: root/netlib/tutils.py
blob: 6fa2d7b6bbaa695c31da9e5fd28bc4d0d03d648d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from io import BytesIO
import tempfile
import os
import time
import shutil
from contextlib import contextmanager
import sys

from mitmproxy.utils import data
from netlib import tcp
from netlib import http


def treader(bytes):
    """
        Construct a tcp.Read object from bytes.
    """
    fp = BytesIO(bytes)
    return tcp.Reader(fp)


@contextmanager
def tmpdir(*args, **kwargs):
    orig_workdir = os.getcwd()
    temp_workdir = tempfile.mkdtemp(*args, **kwargs)
    os.chdir(temp_workdir)

    yield temp_workdir

    os.chdir(orig_workdir)
    shutil.rmtree(temp_workdir)


def _check_exception(expected, actual, exc_tb):
    if isinstance(expected, str):
        if expected.lower() not in str(actual).lower():
            raise AssertionError(
                "Expected %s, but caught %s" % (
                    repr(expected), repr(actual)
                )
            )
    else:
        if not isinstance(actual, expected):
            raise AssertionError(
                "Expected %s, but caught %s %s" % (
                    expected.__name__, actual.__class__.__name__, repr(actual)
                )
            )


def raises(expected_exception, obj=None, *args, **kwargs):
    """
        Assert that a callable raises a specified exception.

        :exc An exception class or a string. If a class, assert that an
        exception of this type is raised. If a string, assert that the string
        occurs in the string representation of the exception, based on a
        case-insenstivie match.

        :obj A callable object.

        :args Arguments to be passsed to the callable.

        :kwargs Arguments to be passed to the callable.
    """
    if obj is None:
        return RaisesContext(expected_exception)
    else:
        try:
            ret = obj(*args, **kwargs)
        except Exception as actual:
            _check_exception(expected_exception, actual, sys.exc_info()[2])
        else:
            raise AssertionError("No exception raised. Return value: {}".format(ret))


class RaisesContext:
    def __init__(self, expected_exception):
        self.expected_exception = expected_exception

    def __enter__(self):
        return

    def __exit__(self, exc_type, exc_val, exc_tb):
        if not exc_type:
            raise AssertionError("No exception raised.")
        else:
            _check_exception(self.expected_exception, exc_val, exc_tb)
        return True


test_data = data.Data(__name__)
# FIXME: Temporary workaround during repo merge.
test_data.dirname = os.path.join(test_data.dirname, "..", "test", "netlib")


def treq(**kwargs):
    """
    Returns:
        netlib.http.Request
    """
    default = dict(
        first_line_format="relative",
        method=b"GET",
        scheme=b"http",
        host=b"address",
        port=22,
        path=b"/path",
        http_version=b"HTTP/1.1",
        headers=http.Headers(((b"header", b"qvalue"), (b"content-length", b"7"))),
        content=b"content"
    )
    default.update(kwargs)
    return http.Request(**default)


def tresp(**kwargs):
    """
    Returns:
        netlib.http.Response
    """
    default = dict(
        http_version=b"HTTP/1.1",
        status_code=200,
        reason=b"OK",
        headers=http.Headers(((b"header-response", b"svalue"), (b"content-length", b"7"))),
        content=b"message",
        timestamp_start=time.time(),
        timestamp_end=time.time(),
    )
    default.update(kwargs)
    return http.Response(**default)