aboutsummaryrefslogtreecommitdiffstats
path: root/test/pathod/language/test_writer.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/pathod/language/test_writer.py')
-rw-r--r--test/pathod/language/test_writer.py90
1 files changed, 90 insertions, 0 deletions
diff --git a/test/pathod/language/test_writer.py b/test/pathod/language/test_writer.py
new file mode 100644
index 00000000..7feb985d
--- /dev/null
+++ b/test/pathod/language/test_writer.py
@@ -0,0 +1,90 @@
+import io
+from pathod import language
+from pathod.language import writer
+
+
+def test_send_chunk():
+ v = b"foobarfoobar"
+ for bs in range(1, len(v) + 2):
+ s = io.BytesIO()
+ writer.send_chunk(s, v, bs, 0, len(v))
+ assert s.getvalue() == v
+ for start in range(len(v)):
+ for end in range(len(v)):
+ s = io.BytesIO()
+ writer.send_chunk(s, v, bs, start, end)
+ assert s.getvalue() == v[start:end]
+
+
+def test_write_values_inject():
+ tst = b"foo"
+
+ s = io.BytesIO()
+ writer.write_values(s, [tst], [(0, "inject", b"aaa")], blocksize=5)
+ assert s.getvalue() == b"aaafoo"
+
+ s = io.BytesIO()
+ writer.write_values(s, [tst], [(1, "inject", b"aaa")], blocksize=5)
+ assert s.getvalue() == b"faaaoo"
+
+ s = io.BytesIO()
+ writer.write_values(s, [tst], [(1, "inject", b"aaa")], blocksize=5)
+ assert s.getvalue() == b"faaaoo"
+
+
+def test_write_values_disconnects():
+ s = io.BytesIO()
+ tst = b"foo" * 100
+ writer.write_values(s, [tst], [(0, "disconnect")], blocksize=5)
+ assert not s.getvalue()
+
+
+def test_write_values():
+ tst = b"foobarvoing"
+ s = io.BytesIO()
+ writer.write_values(s, [tst], [])
+ assert s.getvalue() == tst
+
+ for bs in range(1, len(tst) + 2):
+ for off in range(len(tst)):
+ s = io.BytesIO()
+ writer.write_values(
+ s, [tst], [(off, "disconnect")], blocksize=bs
+ )
+ assert s.getvalue() == tst[:off]
+
+
+def test_write_values_pauses():
+ tst = "".join(str(i) for i in range(10)).encode()
+ for i in range(2, 10):
+ s = io.BytesIO()
+ writer.write_values(
+ s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i
+ )
+ assert s.getvalue() == tst
+
+ for i in range(2, 10):
+ s = io.BytesIO()
+ writer.write_values(s, [tst], [(1, "pause", 0)], blocksize=i)
+ assert s.getvalue() == tst
+
+ tst = [tst] * 5
+ for i in range(2, 10):
+ s = io.BytesIO()
+ writer.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i)
+ assert s.getvalue() == b"".join(tst)
+
+
+def test_write_values_after():
+ s = io.BytesIO()
+ r = next(language.parse_pathod("400:da"))
+ language.serve(r, s, {})
+
+ s = io.BytesIO()
+ r = next(language.parse_pathod("400:pa,0"))
+ language.serve(r, s, {})
+
+ s = io.BytesIO()
+ r = next(language.parse_pathod("400:ia,'xx'"))
+ language.serve(r, s, {})
+ assert s.getvalue().endswith(b'xx')