aboutsummaryrefslogtreecommitdiffstats
path: root/test/pathod/language/test_http.py
blob: a5b35c05ac65c47d6382d79b4b4d3fed1590f6b5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
pre { line-height: 125%; margin: 0; }
td.linenos pre { color: #000000; background-color: #f0f0f0; padding: 0 5px 0 5px; }
span.linenos { color: #000000; background-color: #f0f0f0; padding: 0 5px 0 5px; }
td.linenos pre.special { color: #000000; background-color: #ffffc0; padding: 0 5px 0 5px; }
span.linenos.special { color: #000000; background-color: #ffffc0; padding: 0 5px 0 5px; }
.highlight .hll { background-color: #ffffcc }
.highlight { background: #ffffff; }
.highlight .c { color: #888888 } /* Comment */
.highlight .err { color: #a61717; background-color: #e3d2d2 } /* Error */
.highlight .k { color: #008800; font-weight: bold } /* Keyword */
.highlight .ch { color: #888888 } /* Comment.Hashbang */
.highlight .cm { color: #888888 } /* Comment.Multiline */
.highlight .cp { color: #cc0000; font-weight: bold } /* Comment.Preproc */
.highlight .cpf { color: #888888 } /* Comment.PreprocFile */
.highlight .c1 { color: #888888 } /* Comment.Single */
.highlight .cs { color: #cc0000; font-weight: bold; background-color: #fff0f0 } /* Comment.Special */
.highlight .gd { color: #000000; background-color: #ffdddd } /* Generic.Deleted */
.highlight .ge { font-style: italic } /* Generic.Emph */
.highlight .gr { color: #aa0000 } /* Generic.Error */
.highlight .gh { color: #333333 } /* Generic.Heading */
.highlight .gi { color: #000000; background-color: #ddffdd } /* Generic.Inserted */
.highlight .go { color: #888888 } /* Generic.Output */
.highlight .gp { color: #555555 } /* Generic.Prompt */
.highlight .gs { font-weight: bold } /* Generic.Strong */
.highlight .gu { color: #666666 } /* Generic.Subheading */
.highlight .gt { color: #aa0000 } /* Generic.Traceback */
.highlight .kc { color: #008800; font-weight: bold } /* Keyword.Constant */
.highlight .kd { color: #008800; font-weight: bold } /* Keyword.Declaration */
.highlight .kn { color: #008800; font-weight: bold } /* Keyword.Namespace */
.highlight .kp { color: #008800 } /* Keyword.Pseudo */
.highlight .kr { color: #008800; font-weight: bold } /* Keyword.Reserved */
.highlight .kt { color: #888888; font-weight: bold } /* Keyword.Type */
.highlight .m { color: #0000DD; font-weight: bold } /* Literal.Number */
.highlight .s { color: #dd2200; background-color: #fff0f0 } /* Literal.String */
.highlight .na { color: #336699 } /* Name.Attribute */
.highlight .nb { color: #003388 } /* Name.Builtin */
.highlight .nc { color: #bb0066; font-weight: bold } /* Name.Class */
.highlight .no { color: #003366; font-weight: bold } /* Name.Constant */
.highlight .nd { color: #555555 } /* Name.Decorator */
.highlight .ne { color: #bb0066; font-weight: bold } /* Name.Exception */
.highlight .nf { color: #0066bb; font-weight: bold } /* Name.Function */
.highlight .nl { color: #336699; font-style: italic } /* Name.Label */
.highlight .nn { color: #bb0066; font-weight: bold } /* Name.Namespace */
.highlight .py { color: #336699; font-weight: bold } /* Name.Property */
.highlight .nt { color: #bb0066; font-weight: bold } /* Name.Tag */
.highlight .nv { color: #336699 } /* Name.Variable */
.highlight .ow { color: #008800 } /* Operator.Word */
.highlight .w { color: #bbbbbb } /* Text.Whitespace */
.highlight .mb { color: #0000DD; font-weight: bold } /* Literal.Number.Bin */
.highlight .mf { color: #0000DD; font-weight: bold } /* Literal.Number.Float */
.highlight .mh { color: #0000DD; font-weight: bold } /* Literal.Number.Hex */
.highlight .mi { color: #0000DD; font-weight: bold } /* Literal.Number.Integer */
.highlight .mo { color: #0000DD; font-weight: bold } /* Literal.Number.Oct */
.highlight .sa { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Affix */
.highlight .sb { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Backtick */
.highlight .sc { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Char */
.highlight .dl { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Delimiter */
.highlight .sd { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Doc */
.highlight .s2 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Double */
.highlight .se { color: #0044dd; background-color: #fff0f0 } /* Literal.String.Escape */
.highlight .sh { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Heredoc */
.highlight .si { color: #3333bb; background-color: #fff0f0 } /* Literal.String.Interpol */
.highlight .sx { color: #22bb22; background-color: #f0fff0 } /* Literal.String.Other */
.highlight .sr { color: #008800; background-color: #fff0ff } /* Literal.String.Regex */
.highlight .s1 { color: #dd2200; background-color: #fff0f0 } /* Literal.String.Single */
.highlight .ss { color: #aa6600; background-color: #fff0f0 } /* Literal.String.Symbol */
.highlight .bp { color: #003388 } /* Name.Builtin.Pseudo */
.highlight .fm { color: #0066bb; font-weight: bold } /* Name.Function.Magic */
.highlight .vc { color: #336699 } /* Name.Variable.Class */
.highlight .vg { color: #dd7700 } /* Name.Variable.Global */
.highlight .vi { color: #3333bb } /* Name.Variable.Instance */
.highlight .vm { color: #336699 } /* Name.Variable.Magic */
.highlight .il { color: #0000DD; font-weight: bold } /* Literal.Number.Integer.Long */
# Makefile for OpenWrt
#
# Copyright (C) 2007-2015 OpenWrt.org
#
# This is free software, licensed under the GNU General Public License v2.
# See /LICENSE for more information.
#

TOPDIR:=${CURDIR}
LC_ALL:=C
LANG:=C
SDK:=1
export TOPDIR LC_ALL LANG SDK

world:

DISTRO_PKG_CONFIG:=$(shell which -a pkg-config | grep -E '\/usr' | head -n 1)
export PATH:=$(TOPDIR)/staging_dir/host/bin:$(PATH)

ifneq ($(OPENWRT_BUILD),1)
  override OPENWRT_BUILD=1
  export OPENWRT_BUILD

  empty:=
  space:= $(empty) $(empty)
  _SINGLE=export MAKEFLAGS=$(space);

  include $(TOPDIR)/include/debug.mk
  include $(TOPDIR)/include/depends.mk
  include $(TOPDIR)/include/toplevel.mk
else
  include rules.mk
  include $(INCLUDE_DIR)/depends.mk
  include $(INCLUDE_DIR)/subdir.mk
  include package/Makefile

$(package/stamp-compile): $(BUILD_DIR)/.prepared
$(BUILD_DIR)/.prepared: Makefile
	@mkdir -p $$(dirname $@)
	@touch $@

clean: FORCE
	git clean -f -d $(STAGING_DIR); true
	git clean -f -d $(BUILD_DIR); true
	git clean -f -d $(BIN_DIR); true

dirclean: clean
	git reset --hard HEAD
	git clean -f -d
	rm -rf feeds/

# check prerequisites before starting to build
prereq: $(package/stamp-prereq) ;

world: prepare $(package/stamp-compile) FORCE
	@$(MAKE) package/index

.PHONY: clean dirclean prereq prepare world

endif
<
import io
import pytest

from pathod import language
from pathod.language import http, base

from .. import tservers


def parse_request(s):
    return next(language.parse_pathoc(s))


def test_make_error_response():
    d = io.BytesIO()
    s = http.make_error_response("foo")
    language.serve(s, d, {})


class TestRequest:

    def test_nonascii(self):
        with pytest.raises(Exception, match="ASCII"):
            parse_request("get:\xf0")

    def test_err(self):
        with pytest.raises(language.ParseException):
            parse_request('GET')

    def test_simple(self):
        r = parse_request('GET:"/foo"')
        assert r.method.string() == b"GET"
        assert r.path.string() == b"/foo"
        r = parse_request('GET:/foo')
        assert r.path.string() == b"/foo"
        r = parse_request('GET:@1k')
        assert len(r.path.string()) == 1024

    def test_multiple(self):
        r = list(language.parse_pathoc("GET:/ PUT:/"))
        assert r[0].method.string() == b"GET"
        assert r[1].method.string() == b"PUT"
        assert len(r) == 2

        l = """
            GET
            "/foo"
            ir,@1

            PUT

            "/foo



            bar"

            ir,@1
        """
        r = list(language.parse_pathoc(l))
        assert len(r) == 2
        assert r[0].method.string() == b"GET"
        assert r[1].method.string() == b"PUT"

        l = """
            get:"http://localhost:9999/p/200":ir,@1
            get:"http://localhost:9999/p/200":ir,@2
        """
        r = list(language.parse_pathoc(l))
        assert len(r) == 2
        assert r[0].method.string() == b"GET"
        assert r[1].method.string() == b"GET"

    def test_nested_response(self):
        l = "get:/p:s'200'"
        r = list(language.parse_pathoc(l))
        assert len(r) == 1
        assert len(r[0].tokens) == 3
        assert isinstance(r[0].tokens[2], http.NestedResponse)
        assert r[0].values({})

    def test_render(self):
        s = io.BytesIO()
        r = parse_request("GET:'/foo'")
        assert language.serve(
            r,
            s,
            language.Settings(request_host="foo.com")
        )

    def test_multiline(self):
        l = """
            GET
            "/foo"
            ir,@1
        """
        r = parse_request(l)
        assert r.method.string() == b"GET"
        assert r.path.string() == b"/foo"
        assert r.actions

        l = """
            GET

            "/foo



            bar"

            ir,@1
        """
        r = parse_request(l)
        assert r.method.string() == b"GET"
        assert r.path.string().endswith(b"bar")
        assert r.actions

    def test_spec(self):
        def rt(s):
            s = parse_request(s).spec()
            assert parse_request(s).spec() == s
        rt("get:/foo")
        rt("get:/foo:da")

    def test_freeze(self):
        r = parse_request("GET:/:b@100").freeze(language.Settings())
        assert len(r.spec()) > 100

    def test_path_generator(self):
        r = parse_request("GET:@100").freeze(language.Settings())
        assert len(r.spec()) > 100

    def test_websocket(self):
        r = parse_request('ws:/path/')
        res = r.resolve(language.Settings())
        assert res.method.string().lower() == b"get"
        assert res.tok(http.Path).value.val == b"/path/"
        assert res.tok(http.Method).value.val.lower() == b"get"
        assert http.get_header(b"Upgrade", res.headers).value.val == b"websocket"

        r = parse_request('ws:put:/path/')
        res = r.resolve(language.Settings())
        assert r.method.string().lower() == b"put"
        assert res.tok(http.Path).value.val == b"/path/"
        assert res.tok(http.Method).value.val.lower() == b"put"
        assert http.get_header(b"Upgrade", res.headers).value.val == b"websocket"


class TestResponse:

    def dummy_response(self):
        return next(language.parse_pathod("400'msg'"))

    def test_response(self):
        r = next(language.parse_pathod("400:m'msg'"))
        assert r.status_code.string() == b"400"
        assert r.reason.string() == b"msg"

        r = next(language.parse_pathod("400:m'msg':b@100b"))
        assert r.reason.string() == b"msg"
        assert r.body.values({})
        assert str(r)

        r = next(language.parse_pathod("200"))
        assert r.status_code.string() == b"200"
        assert not r.reason
        assert b"OK" in [i[:] for i in r.preamble({})]

    def test_render(self):
        s = io.BytesIO()
        r = next(language.parse_pathod("400:m'msg'"))
        assert language.serve(r, s, {})

        r = next(language.parse_pathod("400:p0,100:dr"))
        assert "p0" in r.spec()
        s = r.preview_safe()
        assert "p0" not in s.spec()

    def test_raw(self):
        s = io.BytesIO()
        r = next(language.parse_pathod("400:b'foo'"))
        language.serve(r, s, {})
        v = s.getvalue()
        assert b"Content-Length" in v

        s = io.BytesIO()
        r = next(language.parse_pathod("400:b'foo':r"))
        language.serve(r, s, {})
        v = s.getvalue()
        assert b"Content-Length" not in v

    def test_length(self):
        def testlen(x):
            s = io.BytesIO()
            x = next(x)
            language.serve(x, s, language.Settings())
            assert x.length(language.Settings()) == len(s.getvalue())
        testlen(language.parse_pathod("400:m'msg':r"))
        testlen(language.parse_pathod("400:m'msg':h'foo'='bar':r"))
        testlen(language.parse_pathod("400:m'msg':h'foo'='bar':b@100b:r"))

    def test_maximum_length(self):
        def testlen(x):
            x = next(x)
            s = io.BytesIO()
            m = x.maximum_length({})
            language.serve(x, s, {})
            assert m >= len(s.getvalue())

        r = language.parse_pathod("400:m'msg':b@100:d0")
        testlen(r)

        r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'")
        testlen(r)

        r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'")
        testlen(r)

    def test_parse_err(self):
        with pytest.raises(language.ParseException):
            language.parse_pathod("400:msg,b:")
        try:
            language.parse_pathod("400'msg':b:")
        except language.ParseException as v:
            assert v.marked()
            assert str(v)

    def test_nonascii(self):
        with pytest.raises(Exception, match="ASCII"):
            language.parse_pathod("foo:b\xf0")

    def test_parse_header(self):
        r = next(language.parse_pathod('400:h"foo"="bar"'))
        assert http.get_header(b"foo", r.headers)

    def test_parse_pause_before(self):
        r = next(language.parse_pathod("400:p0,10"))
        assert r.actions[0].spec() == "p0,10"

    def test_parse_pause_after(self):
        r = next(language.parse_pathod("400:pa,10"))
        assert r.actions[0].spec() == "pa,10"

    def test_parse_pause_random(self):
        r = next(language.parse_pathod("400:pr,10"))
        assert r.actions[0].spec() == "pr,10"

    def test_parse_stress(self):
        # While larger values are known to work on linux, len() technically
        # returns an int and a python 2.7 int on windows has 32bit precision.
        # Therefore, we should keep the body length < 2147483647 bytes in our
        # tests.
        r = next(language.parse_pathod("400:b@1g"))
        assert r.length({})

    def test_spec(self):
        def rt(s):
            s = next(language.parse_pathod(s)).spec()
            assert next(language.parse_pathod(s)).spec() == s
        rt("400:b@100g")
        rt("400")
        rt("400:da")

    def test_websockets(self):
        r = next(language.parse_pathod("ws"))
        with pytest.raises(Exception, match="No websocket key"):
            r.resolve(language.Settings())
        res = r.resolve(language.Settings(websocket_key=b"foo"))
        assert res.status_code.string() == b"101"


def test_ctype_shortcut():
    e = http.ShortcutContentType.expr()
    v = e.parseString("c'foo'")[0]
    assert v.key.val == b"Content-Type"
    assert v.value.val == b"foo"

    s = v.spec()
    assert s == e.parseString(s)[0].spec()

    e = http.ShortcutContentType.expr()
    v = e.parseString("c@100")[0]
    v2 = v.freeze({})
    v3 = v2.freeze({})
    assert v2.value.val == v3.value.val


def test_location_shortcut():
    e = http.ShortcutLocation.expr()
    v = e.parseString("l'foo'")[0]
    assert v.key.val == b"Location"
    assert v.value.val == b"foo"

    s = v.spec()
    assert s == e.parseString(s)[0].spec()

    e = http.ShortcutLocation.expr()
    v = e.parseString("l@100")[0]
    v2 = v.freeze({})
    v3 = v2.freeze({})
    assert v2.value.val == v3.value.val


def test_shortcuts():
    assert next(language.parse_pathod(
        "400:c'foo'")).headers[0].key.val == b"Content-Type"
    assert next(language.parse_pathod(
        "400:l'foo'")).headers[0].key.val == b"Location"

    assert b"Android" in tservers.render(parse_request("get:/:ua"))
    assert b"User-Agent" in tservers.render(parse_request("get:/:ua"))


def test_user_agent():
    e = http.ShortcutUserAgent.expr()
    v = e.parseString("ua")[0]
    assert b"Android" in v.string()

    e = http.ShortcutUserAgent.expr()
    v = e.parseString("u'a'")[0]
    assert b"Android" not in v.string()

    v = e.parseString("u@100'")[0]
    assert len(str(v.freeze({}).value)) > 100
    v2 = v.freeze({})
    v3 = v2.freeze({})
    assert v2.value.val == v3.value.val


def test_nested_response():
    e = http.NestedResponse.expr()
    v = e.parseString("s'200'")[0]
    assert v.value.val == b"200"
    with pytest.raises(language.ParseException):
        e.parseString("s'foo'")

    v = e.parseString('s"200:b@1"')[0]
    assert "@1" in v.spec()
    f = v.freeze({})
    assert "@1" not in f.spec()


def test_nested_response_freeze():
    e = http.NestedResponse(
        base.TokValueLiteral(
            r"200:b\'foo\':i10,\'\\x27\'"
        )
    )
    assert e.freeze({})
    assert e.values({})


def test_unique_components():
    with pytest.raises(Exception, match="multiple body clauses"):
        language.parse_pathod("400:b@1:b@1")