import StringIO, sys, os import libpry from libpathod import rparse rparse.TESTING = True class Sponge: def __getattr__(self, x): return Sponge() def __call__(self, *args, **kwargs): pass class DummyRequest(StringIO.StringIO): connection = Sponge() def write(self, d, callback=None): StringIO.StringIO.write(self, d) if callback: callback() def finish(self): return class uMisc(libpry.AutoTree): def test_generators(self): v = rparse.Value.parseString("'val'")[0] g = v.get_generator({}) assert g[:] == "val" def test_randomgenerator(self): g = rparse.RandomGenerator("one", 100) assert len(g[:10]) == 10 assert len(g[1:10]) == 9 assert len(g[:1000]) == 100 assert len(g[1000:1001]) == 0 assert g[0] def test_literalgenerator(self): g = rparse.LiteralGenerator("one") assert g == "one" assert g[:] == "one" assert g[1] == "n" def test_filegenerator(self): t = self.tmpdir() path = os.path.join(t, "foo") f = open(path, "w") f.write("x"*10000) f.close() g = rparse.FileGenerator(path) assert len(g) == 10000 assert g[0] == "x" assert g[-1] == "x" assert g[0:5] == "xxxxx" def test_valueliteral(self): v = rparse.ValueLiteral("foo") assert v.expr() assert str(v) def test_file_value(self): v = rparse.Value.parseString("<'one two'")[0] assert str(v) assert v.path == "one two" v = rparse.Value.parseString(" 100 x = [(1, 5), (0, 5)] assert r.ready_actions(100, x) == sorted(x) def test_write_values_disconnects(self): r = self.dummy_response() s = DummyRequest() tst = "foo"*100 r.write_values(s, [tst], [(0, "disconnect")], blocksize=5) assert not s.getvalue() def test_write_values(self): tst = "foo"*1025 r = rparse.parse({}, "400'msg'") s = DummyRequest() r.write_values(s, [tst], []) assert s.getvalue() == tst def test_write_values_pauses(self): tst = "".join(str(i) for i in range(10)) r = rparse.parse({}, "400'msg'") for i in range(2, 10): s = DummyRequest() r.write_values(s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i) assert s.getvalue() == tst for i in range(2, 10): s = DummyRequest() r.write_values(s, [tst], [(1, "pause", 0)], blocksize=i) assert s.getvalue() == tst tst = ["".join(str(i) for i in range(10))]*5 for i in range(2, 10): s = DummyRequest() r.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i) assert s.getvalue() == "".join(tst) def test_render(self): s = DummyRequest() r = rparse.parse({}, "400'msg'") assert r.serve(s) def test_length(self): def testlen(x): s = DummyRequest() x.serve(s) assert x.length() == len(s.getvalue()) testlen(rparse.parse({}, "400'msg'")) testlen(rparse.parse({}, "400'msg':h'foo'='bar'")) testlen(rparse.parse({}, "400'msg':h'foo'='bar':b@100b")) tests = [ uResponse(), uPauses(), uDisconnects(), uMisc(), uparse(), uShortcuts() ]