1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
from libmproxy import netstring
from cStringIO import StringIO
import libpry
class uNetstring(libpry.AutoTree):
def setUp(self):
self.test_data = "Netstring module by Will McGugan"
self.encoded_data = "9:Netstring,6:module,2:by,4:Will,7:McGugan,"
def test_header(self):
tests = [ ("netstring", "9:"),
("Will McGugan", "12:"),
("", "0:") ]
for test, result in tests:
assert netstring.header(test) == result
def test_file_encoder(self):
file_out = StringIO()
data = self.test_data.split()
encoder = netstring.FileEncoder(file_out)
for s in data:
encoder.write(s)
encoded_data = file_out.getvalue()
assert encoded_data == self.encoded_data
def test_decode_file(self):
data = self.test_data.split()
for buffer_size in range(1, len(self.encoded_data)):
file_in = StringIO(self.encoded_data[:])
decoded_data = list(netstring.decode_file(file_in, buffer_size = buffer_size))
assert decoded_data == data
def test_decoder(self):
encoded_data = self.encoded_data
for step in range(1, len(encoded_data)):
i = 0
chunks = []
while i < len(encoded_data):
chunks.append(encoded_data[i:i+step])
i += step
decoder = netstring.Decoder()
decoded_data = []
for chunk in chunks:
for s in decoder.feed(chunk):
decoded_data.append(s)
assert decoded_data == self.test_data.split()
def test_errors(self):
d = netstring.Decoder()
libpry.raises("Illegal digit", list, d.feed("1:foo"))
d = netstring.Decoder()
libpry.raises("Preceding zero", list, d.feed("01:f"))
d = netstring.Decoder(5)
libpry.raises("Maximum size", list, d.feed("500:f"))
d = netstring.Decoder()
libpry.raises("Illegal digit", list, d.feed(":f"))
tests = [
uNetstring()
]
|