aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--libpathod/language/base.py92
-rw-r--r--libpathod/language/http.py73
-rw-r--r--test/test_language_base.py76
-rw-r--r--test/test_language_http.py62
4 files changed, 153 insertions, 150 deletions
diff --git a/libpathod/language/base.py b/libpathod/language/base.py
index 4e1900a7..4c337a9b 100644
--- a/libpathod/language/base.py
+++ b/libpathod/language/base.py
@@ -4,7 +4,6 @@ import os
import copy
import abc
import contrib.pyparsing as pp
-from netlib import http_uastrings
from .. import utils
from . import generators, exceptions
@@ -234,91 +233,29 @@ class _Component(_Token):
return "".join(i[:] for i in self.values(settings or {}))
-class _Header(_Component):
+class KeyValue(_Component):
+ """
+ A key/value pair.
+ klass.preamble: leader
+ """
def __init__(self, key, value):
self.key, self.value = key, value
- def values(self, settings):
- return [
- self.key.get_generator(settings),
- ": ",
- self.value.get_generator(settings),
- "\r\n",
- ]
-
-
-class Header(_Header):
@classmethod
def expr(klass):
- e = pp.Literal("h").suppress()
+ e = pp.Literal(klass.preamble).suppress()
e += Value
e += pp.Literal("=").suppress()
e += Value
return e.setParseAction(lambda x: klass(*x))
def spec(self):
- return "h%s=%s"%(self.key.spec(), self.value.spec())
+ return "%s%s=%s"%(self.preamble, self.key.spec(), self.value.spec())
def freeze(self, settings):
- return Header(self.key.freeze(settings), self.value.freeze(settings))
-
-
-class ShortcutContentType(_Header):
- def __init__(self, value):
- _Header.__init__(self, ValueLiteral("Content-Type"), value)
-
- @classmethod
- def expr(klass):
- e = pp.Literal("c").suppress()
- e = e + Value
- return e.setParseAction(lambda x: klass(*x))
-
- def spec(self):
- return "c%s"%(self.value.spec())
-
- def freeze(self, settings):
- return ShortcutContentType(self.value.freeze(settings))
-
-
-class ShortcutLocation(_Header):
- def __init__(self, value):
- _Header.__init__(self, ValueLiteral("Location"), value)
-
- @classmethod
- def expr(klass):
- e = pp.Literal("l").suppress()
- e = e + Value
- return e.setParseAction(lambda x: klass(*x))
-
- def spec(self):
- return "l%s"%(self.value.spec())
-
- def freeze(self, settings):
- return ShortcutLocation(self.value.freeze(settings))
-
-
-class ShortcutUserAgent(_Header):
- def __init__(self, value):
- self.specvalue = value
- if isinstance(value, basestring):
- value = ValueLiteral(http_uastrings.get_by_shortcut(value)[2])
- _Header.__init__(self, ValueLiteral("User-Agent"), value)
-
- @classmethod
- def expr(klass):
- e = pp.Literal("u").suppress()
- u = reduce(
- operator.or_,
- [pp.Literal(i[1]) for i in http_uastrings.UASTRINGS]
+ return self.__class__(
+ self.key.freeze(settings), self.value.freeze(settings)
)
- e += u | Value
- return e.setParseAction(lambda x: klass(*x))
-
- def spec(self):
- return "u%s"%self.specvalue
-
- def freeze(self, settings):
- return ShortcutUserAgent(self.value.freeze(settings))
class PathodSpec(_Token):
@@ -407,12 +344,15 @@ class OptionsOrValue(_Component):
"""
Can be any of a specified set of options, or a value specifier.
"""
+ preamble = ""
def __init__(self, value):
# If it's a string, we were passed one of the options, so we upper-case
# it to be canonical. The user can specify a different case by using a
# string value literal.
+ self.option_used = False
if isinstance(value, basestring):
value = ValueLiteral(value.upper())
+ self.option_used = True
self.value = value
@classmethod
@@ -421,6 +361,8 @@ class OptionsOrValue(_Component):
m = pp.MatchFirst(parts)
spec = m | Value.copy()
spec = spec.setParseAction(lambda x: klass(*x))
+ if klass.preamble:
+ spec = pp.Literal(klass.preamble).suppress() + spec
return spec
def values(self, settings):
@@ -432,7 +374,7 @@ class OptionsOrValue(_Component):
s = self.value.spec()
if s[1:-1].lower() in self.options:
s = s[1:-1].lower()
- return "%s"%s
+ return "%s%s"%(self.preamble, s)
def freeze(self, settings):
return self.__class__(self.value.freeze(settings))
@@ -617,10 +559,6 @@ class _Message(object):
def actions(self):
return self.toks(_Action)
- @property
- def headers(self):
- return self.toks(_Header)
-
def length(self, settings):
"""
Calculate the length of the base message without any applied
diff --git a/libpathod/language/http.py b/libpathod/language/http.py
index 30a5fd9f..a759aeb1 100644
--- a/libpathod/language/http.py
+++ b/libpathod/language/http.py
@@ -4,7 +4,7 @@ import abc
import contrib.pyparsing as pp
import netlib.websockets
-from netlib import http_status
+from netlib import http_status, http_uastrings
from . import base, generators, exceptions
@@ -45,6 +45,49 @@ class Method(base.OptionsOrValue):
]
+class _HeaderMixin(object):
+ def format_header(self, key, value):
+ return [key, ": ", value, "\r\n"]
+
+ def values(self, settings):
+ return self.format_header(
+ self.key.get_generator(settings),
+ self.value.get_generator(settings),
+ )
+
+
+class Header(_HeaderMixin, base.KeyValue):
+ preamble = "h"
+
+
+class ShortcutContentType(_HeaderMixin, base.PreValue):
+ preamble = "c"
+ key = base.ValueLiteral("Content-Type")
+
+
+class ShortcutLocation(_HeaderMixin, base.PreValue):
+ preamble = "l"
+ key = base.ValueLiteral("Location")
+
+
+class ShortcutUserAgent(_HeaderMixin, base.OptionsOrValue):
+ preamble = "u"
+ options = [i[1] for i in http_uastrings.UASTRINGS]
+ key = base.ValueLiteral("User-Agent")
+
+ def values(self, settings):
+ if self.option_used:
+ value = http_uastrings.get_by_shortcut(
+ self.value.val.lower()
+ )[2]
+ else:
+ value = self.value
+ return self.format_header(
+ self.key.get_generator(settings),
+ value
+ )
+
+
def get_header(val, headers):
"""
Header keys may be Values, so we have to "generate" them as we try the
@@ -72,6 +115,10 @@ class _HTTPMessage(base._Message):
def preamble(self, settings): # pragma: no cover
pass
+ @property
+ def headers(self):
+ return self.toks(_HeaderMixin)
+
def values(self, settings):
vals = self.preamble(settings)
vals.append("\r\n")
@@ -86,12 +133,12 @@ class _HTTPMessage(base._Message):
class Response(_HTTPMessage):
comps = (
Body,
- base.Header,
+ Header,
base.PauseAt,
base.DisconnectAt,
base.InjectAt,
- base.ShortcutContentType,
- base.ShortcutLocation,
+ ShortcutContentType,
+ ShortcutLocation,
Raw,
Reason
)
@@ -145,7 +192,7 @@ class Response(_HTTPMessage):
for i in hdrs.lst:
if not get_header(i[0], self.headers):
tokens.append(
- base.Header(
+ Header(
base.ValueLiteral(i[0]),
base.ValueLiteral(i[1]))
)
@@ -156,7 +203,7 @@ class Response(_HTTPMessage):
else:
length = len(self.body.value.get_generator(settings))
tokens.append(
- base.Header(
+ Header(
base.ValueLiteral("Content-Length"),
base.ValueLiteral(str(length)),
)
@@ -193,12 +240,12 @@ class Response(_HTTPMessage):
class Request(_HTTPMessage):
comps = (
Body,
- base.Header,
+ Header,
base.PauseAt,
base.DisconnectAt,
base.InjectAt,
- base.ShortcutContentType,
- base.ShortcutUserAgent,
+ ShortcutContentType,
+ ShortcutUserAgent,
Raw,
base.PathodSpec,
)
@@ -241,7 +288,7 @@ class Request(_HTTPMessage):
for i in netlib.websockets.client_handshake_headers().lst:
if not get_header(i[0], self.headers):
tokens.append(
- base.Header(
+ Header(
base.ValueLiteral(i[0]),
base.ValueLiteral(i[1])
)
@@ -251,7 +298,7 @@ class Request(_HTTPMessage):
if self.body:
length = len(self.body.value.get_generator(settings))
tokens.append(
- base.Header(
+ Header(
base.ValueLiteral("Content-Length"),
base.ValueLiteral(str(length)),
)
@@ -259,7 +306,7 @@ class Request(_HTTPMessage):
if settings.request_host:
if not get_header("Host", self.headers):
tokens.append(
- base.Header(
+ Header(
base.ValueLiteral("Host"),
base.ValueLiteral(settings.request_host)
)
@@ -302,7 +349,7 @@ class PathodErrorResponse(Response):
def make_error_response(reason, body=None):
tokens = [
Code("800"),
- base.Header(
+ Header(
base.ValueLiteral("Content-Type"),
base.ValueLiteral("text/plain")
),
diff --git a/test/test_language_base.py b/test/test_language_base.py
index e4458021..48afd675 100644
--- a/test/test_language_base.py
+++ b/test/test_language_base.py
@@ -225,9 +225,20 @@ class TestMisc:
assert v2.value.val == v3.value.val
-class TestHeaders:
- def test_header(self):
- e = base.Header.expr()
+class TKeyValue(base.KeyValue):
+ preamble = "h"
+ def values(self, settings):
+ return [
+ self.key.get_generator(settings),
+ ": ",
+ self.value.get_generator(settings),
+ "\r\n",
+ ]
+
+
+class TestKeyValue:
+ def test_simple(self):
+ e = TKeyValue.expr()
v = e.parseString("h'foo'='bar'")[0]
assert v.key.val == "foo"
assert v.value.val == "bar"
@@ -239,69 +250,14 @@ class TestHeaders:
s = v.spec()
assert s == e.parseString(s)[0].spec()
- def test_header_freeze(self):
- e = base.Header.expr()
+ def test_freeze(self):
+ e = TKeyValue.expr()
v = e.parseString("h@10=@10'")[0]
v2 = v.freeze({})
v3 = v2.freeze({})
assert v2.key.val == v3.key.val
assert v2.value.val == v3.value.val
- def test_ctype_shortcut(self):
- e = base.ShortcutContentType.expr()
- v = e.parseString("c'foo'")[0]
- assert v.key.val == "Content-Type"
- assert v.value.val == "foo"
-
- s = v.spec()
- assert s == e.parseString(s)[0].spec()
-
- e = base.ShortcutContentType.expr()
- v = e.parseString("c@100")[0]
- v2 = v.freeze({})
- v3 = v2.freeze({})
- assert v2.value.val == v3.value.val
-
- def test_location_shortcut(self):
- e = base.ShortcutLocation.expr()
- v = e.parseString("l'foo'")[0]
- assert v.key.val == "Location"
- assert v.value.val == "foo"
-
- s = v.spec()
- assert s == e.parseString(s)[0].spec()
-
- e = base.ShortcutLocation.expr()
- v = e.parseString("l@100")[0]
- v2 = v.freeze({})
- v3 = v2.freeze({})
- assert v2.value.val == v3.value.val
-
- def test_shortcuts(self):
- assert language.parse_response("400:c'foo'").headers[0].key.val == "Content-Type"
- assert language.parse_response("400:l'foo'").headers[0].key.val == "Location"
-
- assert 'Android' in parse_request("get:/:ua").headers[0].value.val
- assert parse_request("get:/:ua").headers[0].key.val == "User-Agent"
-
-
-class TestShortcutUserAgent:
- def test_location_shortcut(self):
- e = base.ShortcutUserAgent.expr()
- v = e.parseString("ua")[0]
- assert "Android" in str(v.value)
- assert v.spec() == "ua"
- assert v.key.val == "User-Agent"
-
- v = e.parseString("u'foo'")[0]
- assert "foo" in str(v.value)
- assert "foo" in v.spec()
-
- v = e.parseString("u@100'")[0]
- assert len(str(v.freeze({}).value)) > 100
- v2 = v.freeze({})
- v3 = v2.freeze({})
- assert v2.value.val == v3.value.val
class Test_Action:
diff --git a/test/test_language_http.py b/test/test_language_http.py
index f2528da4..f9d2cf24 100644
--- a/test/test_language_http.py
+++ b/test/test_language_http.py
@@ -9,6 +9,12 @@ def parse_request(s):
return language.parse_requests(s)[0]
+def render(r, settings=language.Settings()):
+ s = cStringIO.StringIO()
+ assert language.serve(r, s, settings)
+ return s.getvalue()
+
+
def test_make_error_response():
d = cStringIO.StringIO()
s = http.make_error_response("foo")
@@ -258,3 +264,59 @@ class TestResponse:
tutils.raises("no websocket key", r.resolve, language.Settings())
res = r.resolve(language.Settings(websocket_key="foo"))
assert res.code.string() == "101"
+
+
+def test_ctype_shortcut():
+ e = http.ShortcutContentType.expr()
+ v = e.parseString("c'foo'")[0]
+ assert v.key.val == "Content-Type"
+ assert v.value.val == "foo"
+
+ s = v.spec()
+ assert s == e.parseString(s)[0].spec()
+
+ e = http.ShortcutContentType.expr()
+ v = e.parseString("c@100")[0]
+ v2 = v.freeze({})
+ v3 = v2.freeze({})
+ assert v2.value.val == v3.value.val
+
+
+def test_location_shortcut():
+ e = http.ShortcutLocation.expr()
+ v = e.parseString("l'foo'")[0]
+ assert v.key.val == "Location"
+ assert v.value.val == "foo"
+
+ s = v.spec()
+ assert s == e.parseString(s)[0].spec()
+
+ e = http.ShortcutLocation.expr()
+ v = e.parseString("l@100")[0]
+ v2 = v.freeze({})
+ v3 = v2.freeze({})
+ assert v2.value.val == v3.value.val
+
+
+def test_shortcuts():
+ assert language.parse_response("400:c'foo'").headers[0].key.val == "Content-Type"
+ assert language.parse_response("400:l'foo'").headers[0].key.val == "Location"
+
+ assert "Android" in render(parse_request("get:/:ua"))
+ assert "User-Agent" in render(parse_request("get:/:ua"))
+
+
+def test_user_agent():
+ e = http.ShortcutUserAgent.expr()
+ v = e.parseString("ua")[0]
+ assert "Android" in str(v.values({})[2])
+
+ e = http.ShortcutUserAgent.expr()
+ v = e.parseString("u'a'")[0]
+ assert "Android" not in str(v.values({})[2])
+
+ v = e.parseString("u@100'")[0]
+ assert len(str(v.freeze({}).value)) > 100
+ v2 = v.freeze({})
+ v3 = v2.freeze({})
+ assert v2.value.val == v3.value.val