aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libpathod/language.py13
-rw-r--r--libpathod/pathoc.py64
-rw-r--r--test/test_language.py1
-rw-r--r--test/test_pathod.py4
-rw-r--r--test/tutils.py15
5 files changed, 82 insertions, 15 deletions
diff --git a/libpathod/language.py b/libpathod/language.py
index d4c5b880..0fd418a5 100644
--- a/libpathod/language.py
+++ b/libpathod/language.py
@@ -1172,9 +1172,16 @@ class WebsocketFrame(_Message):
return resp
def values(self, settings):
- vals = [
- websockets.FrameHeader().to_bytes()
- ]
+ vals = []
+ if self.body:
+ length = len(self.body.value.get_generator(settings))
+ else:
+ length = 0
+ frame = websockets.FrameHeader(
+ mask = True,
+ payload_length = length
+ )
+ vals = [frame.to_bytes()]
if self.body:
vals.append(self.body.value.get_generator(settings))
return vals
diff --git a/libpathod/pathoc.py b/libpathod/pathoc.py
index 89a8280b..aee28c37 100644
--- a/libpathod/pathoc.py
+++ b/libpathod/pathoc.py
@@ -1,7 +1,9 @@
import sys
import os
import hashlib
+import Queue
import random
+import select
import time
import threading
@@ -77,14 +79,28 @@ class Response:
class WebsocketFrameReader(threading.Thread):
- def __init__(self, rfile, callback):
+ def __init__(self, rfile, callback, ws_read_limit):
threading.Thread.__init__(self)
+ self.ws_read_limit = ws_read_limit
self.rfile, self.callback = rfile, callback
- self.daemon = True
+ self.terminate = Queue.Queue()
+ self.is_done = Queue.Queue()
def run(self):
while 1:
- print websockets.Frame.from_file(self.rfile)
+ if self.ws_read_limit == 0:
+ break
+ r, _, _ = select.select([self.rfile], [], [], 0.05)
+ try:
+ self.terminate.get_nowait()
+ break
+ except Queue.Empty:
+ pass
+ for rfile in r:
+ print websockets.Frame.from_file(self.rfile).human_readable()
+ if self.ws_read_limit is not None:
+ self.ws_read_limit -= 1
+ self.is_done.put(None)
class Pathoc(tcp.TCPClient):
@@ -99,6 +115,9 @@ class Pathoc(tcp.TCPClient):
clientcert=None,
ciphers=None,
+ # Websockets
+ ws_read_limit = None,
+
# Output control
showreq = False,
showresp = False,
@@ -131,6 +150,8 @@ class Pathoc(tcp.TCPClient):
self.ciphers = ciphers
self.sslinfo = None
+ self.ws_read_limit = ws_read_limit
+
self.showreq = showreq
self.showresp = showresp
self.explain = explain
@@ -140,6 +161,8 @@ class Pathoc(tcp.TCPClient):
self.showsummary = showsummary
self.fp = fp
+ self.ws_framereader = None
+
def http_connect(self, connect_to):
self.wfile.write(
'CONNECT %s:%s HTTP/1.1\r\n'%tuple(connect_to) +
@@ -196,6 +219,19 @@ class Pathoc(tcp.TCPClient):
print >> fp, "%s (unprintables escaped):"%header
print >> fp, netlib.utils.cleanBin(data)
+ def stop(self):
+ self.ws_framereader.terminate.put(None)
+
+ def wait(self):
+ if self.ws_framereader:
+ while 1:
+ try:
+ self.ws_framereader.is_done.get(timeout=0.05)
+ self.ws_framereader.join()
+ return
+ except Queue.Empty:
+ pass
+
def websocket_get_frame(self, frame):
"""
Called when a frame is received from the server.
@@ -230,21 +266,30 @@ class Pathoc(tcp.TCPClient):
print >> self.fp, ">> Spec:", r.spec()
if self.showreq:
self._show(
- self.fp, ">> Request",
+ self.fp, ">> Websocket Frame",
self.wfile.get_log(),
self.hexdump
)
- def websocket_start(self, r, callback=None):
+ def websocket_start(self, r, callback=None, limit=None):
"""
Performs an HTTP request, and attempts to drop into websocket
connection.
+
+ callback: A callback called within the websocket thread for every
+ server frame.
+ limit: Disconnect after receiving N server frames.
"""
resp = self.http(r)
if resp.status_code == 101:
if self.showsummary:
- print >> self.fp, "Websocket connection established..."
- WebsocketFrameReader(self.rfile, self.websocket_get_frame).start()
+ print >> self.fp, "<< websocket connection established..."
+ self.ws_framereader = WebsocketFrameReader(
+ self.rfile,
+ self.websocket_get_frame,
+ self.ws_read_limit
+ )
+ self.ws_framereader.start()
return resp
def http(self, r):
@@ -340,6 +385,7 @@ class Pathoc(tcp.TCPClient):
def main(args): # pragma: nocover
memo = set([])
trycount = 0
+ p = None
try:
cnt = 0
while 1:
@@ -406,5 +452,9 @@ def main(args): # pragma: nocover
return
except (http.HttpError, tcp.NetLibError), v:
pass
+ p.wait()
except KeyboardInterrupt:
pass
+ if p:
+ p.stop()
+ p.wait()
diff --git a/test/test_language.py b/test/test_language.py
index 0fb8479d..c0eafcaa 100644
--- a/test/test_language.py
+++ b/test/test_language.py
@@ -638,7 +638,6 @@ class TestRequest:
class TestWebsocketFrame:
-
def test_spec(self):
e = language.WebsocketFrame.expr()
wf = e.parseString("wf:b'foo'")
diff --git a/test/test_pathod.py b/test/test_pathod.py
index 1a10d2c2..bfff3274 100644
--- a/test/test_pathod.py
+++ b/test/test_pathod.py
@@ -185,10 +185,10 @@ class CommonTests(tutils.DaemonTests):
assert r.status_code == 202
def test_websocket(self):
- r = self.pathoc("ws:/p/")
+ r = self.pathoc("ws:/p/", ws_read_limit=0)
assert r.status_code == 101
- r = self.pathoc("ws:/p/ws")
+ r = self.pathoc("ws:/p/ws", ws_read_limit=0)
assert r.status_code == 101
diff --git a/test/tutils.py b/test/tutils.py
index 4c29f5b2..f8a37a5e 100644
--- a/test/tutils.py
+++ b/test/tutils.py
@@ -64,10 +64,21 @@ class DaemonTests(object):
def get(self, spec):
return requests.get(self.d.p(spec), verify=False)
- def pathoc(self, spec, timeout=None, connect_to=None, ssl=None):
+ def pathoc(
+ self,
+ spec,
+ timeout=None,
+ connect_to=None,
+ ssl=None,
+ ws_read_limit=None
+ ):
if ssl is None:
ssl = self.ssl
- c = pathoc.Pathoc(("localhost", self.d.port), ssl=ssl)
+ c = pathoc.Pathoc(
+ ("localhost", self.d.port),
+ ssl=ssl,
+ ws_read_limit=ws_read_limit
+ )
c.connect(connect_to)
if timeout:
c.settimeout(timeout)