aboutsummaryrefslogtreecommitdiffstats
path: root/pathod/test.py
blob: 11462729652cb89659c958eff727e1c70a5469aa (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from six.moves import cStringIO as StringIO
import threading
import time

from six.moves import queue

from . import pathod


class TimeoutError(Exception):
    pass


class Daemon:
    IFACE = "127.0.0.1"

    def __init__(self, ssl=None, **daemonargs):
        self.q = queue.Queue()
        self.logfp = StringIO()
        daemonargs["logfp"] = self.logfp
        self.thread = _PaThread(self.IFACE, self.q, ssl, daemonargs)
        self.thread.start()
        self.port = self.q.get(True, 5)
        self.urlbase = "%s://%s:%s" % (
            "https" if ssl else "http",
            self.IFACE,
            self.port
        )

    def __enter__(self):
        return self

    def __exit__(self, type, value, traceback):
        self.logfp.truncate(0)
        self.shutdown()
        return False

    def p(self, spec):
        """
            Return a URL that will render the response in spec.
        """
        return "%s/p/%s" % (self.urlbase, spec)

    def text_log(self):
        return self.logfp.getvalue()

    def wait_for_silence(self, timeout=5):
        start = time.time()
        while 1:
            if time.time() - start >= timeout:
                raise TimeoutError(
                    "%s service threads still alive" %
                    self.thread.server.handler_counter.count
                )
            if self.thread.server.handler_counter.count == 0:
                return

    def expect_log(self, n, timeout=5):
        l = []
        start = time.time()
        while True:
            l = self.log()
            if time.time() - start >= timeout:
                return None
            if len(l) >= n:
                break
        return l

    def last_log(self):
        """
            Returns the last logged request, or None.
        """
        l = self.expect_log(1)
        if not l:
            return None
        return l[-1]

    def log(self):
        """
            Return the log buffer as a list of dictionaries.
        """
        return self.thread.server.get_log()

    def clear_log(self):
        """
            Clear the log.
        """
        return self.thread.server.clear_log()

    def shutdown(self):
        """
            Shut the daemon down, return after the thread has exited.
        """
        self.thread.server.shutdown()
        self.thread.join()


class _PaThread(threading.Thread):

    def __init__(self, iface, q, ssl, daemonargs):
        threading.Thread.__init__(self)
        self.name = "PathodThread"
        self.iface, self.q, self.ssl = iface, q, ssl
        self.daemonargs = daemonargs
        self.server = None

    def run(self):
        self.server = pathod.Pathod(
            (self.iface, 0),
            ssl=self.ssl,
            **self.daemonargs
        )
        self.name = "PathodThread (%s:%s)" % (
            self.server.address.host,
            self.server.address.port
        )
        self.q.put(self.server.address.port)
        self.server.serve_forever()