aboutsummaryrefslogtreecommitdiffstats
path: root/test/pathod
diff options
context:
space:
mode:
authorMaximilian Hils <git@maximilianhils.com>2016-02-15 16:34:22 +0100
committerMaximilian Hils <git@maximilianhils.com>2016-02-15 16:34:22 +0100
commitd7158f975e671b78f0a064dd873cfa7805667528 (patch)
tree9b40f263c4f613a0dc49f5d8628c371164afd546 /test/pathod
parent5fe473fb431699c71aa74bb715c2cb5b0500f044 (diff)
downloadmitmproxy-d7158f975e671b78f0a064dd873cfa7805667528.tar.gz
mitmproxy-d7158f975e671b78f0a064dd873cfa7805667528.tar.bz2
mitmproxy-d7158f975e671b78f0a064dd873cfa7805667528.zip
move tests into shared folder
Diffstat (limited to 'test/pathod')
-rw-r--r--test/pathod/data/clientcert/.gitignore3
-rw-r--r--test/pathod/data/clientcert/client.cnf5
-rw-r--r--test/pathod/data/clientcert/client.pem42
-rw-r--r--test/pathod/data/clientcert/make8
-rw-r--r--test/pathod/data/file1
-rw-r--r--test/pathod/data/request1
-rw-r--r--test/pathod/data/response1
-rw-r--r--test/pathod/data/testkey.pem68
-rw-r--r--test/pathod/scripts/generate.sh17
-rw-r--r--test/pathod/scripts/openssl.cnf39
-rw-r--r--test/pathod/test_app.py85
-rw-r--r--test/pathod/test_language_actions.py135
-rw-r--r--test/pathod/test_language_base.py352
-rw-r--r--test/pathod/test_language_generators.py42
-rw-r--r--test/pathod/test_language_http.py358
-rw-r--r--test/pathod/test_language_http2.py233
-rw-r--r--test/pathod/test_language_websocket.py142
-rw-r--r--test/pathod/test_language_writer.py91
-rw-r--r--test/pathod/test_log.py25
-rw-r--r--test/pathod/test_pathoc.py308
-rw-r--r--test/pathod/test_pathoc_cmdline.py59
-rw-r--r--test/pathod/test_pathod.py289
-rw-r--r--test/pathod/test_pathod_cmdline.py85
-rw-r--r--test/pathod/test_test.py45
-rw-r--r--test/pathod/test_utils.py39
-rw-r--r--test/pathod/tutils.py128
26 files changed, 2601 insertions, 0 deletions
diff --git a/test/pathod/data/clientcert/.gitignore b/test/pathod/data/clientcert/.gitignore
new file mode 100644
index 00000000..07bc53d2
--- /dev/null
+++ b/test/pathod/data/clientcert/.gitignore
@@ -0,0 +1,3 @@
+client.crt
+client.key
+client.req
diff --git a/test/pathod/data/clientcert/client.cnf b/test/pathod/data/clientcert/client.cnf
new file mode 100644
index 00000000..5046a944
--- /dev/null
+++ b/test/pathod/data/clientcert/client.cnf
@@ -0,0 +1,5 @@
+[ ssl_client ]
+basicConstraints = CA:FALSE
+nsCertType = client
+keyUsage = digitalSignature, keyEncipherment
+extendedKeyUsage = clientAuth
diff --git a/test/pathod/data/clientcert/client.pem b/test/pathod/data/clientcert/client.pem
new file mode 100644
index 00000000..4927bca2
--- /dev/null
+++ b/test/pathod/data/clientcert/client.pem
@@ -0,0 +1,42 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIEpAIBAAKCAQEAzCpoRjSTfIN24kkNap/GYmP9zVWj0Gk8R5BB/PvvN0OB1Zk0
+EEYPsWCcuhEdK0ehiDZX030doF0DOncKKa6mop/d0x2o+ts42peDhZM6JNUrm6d+
+ZWQVtio33mpp77UMhR093vaA+ExDnmE26kBTVijJ1+fRAVDXG/cmQINEri91Kk/G
+3YJ5e45UrohGI5seBZ4vV0xbHtmczFRhYFlGOvYsoIe4Lvz/eFS2pIrTIpYQ2VM/
+SQQl+JFy+NlQRsWG2NrxtKOzMnnDE7YN4I3z5D5eZFo1EtwZ48LNCeSwrEOdfuzP
+G5q5qbs5KpE/x85H9umuRwSCIArbMwBYV8a8JwIDAQABAoIBAFE3FV/IDltbmHEP
+iky93hbJm+6QgKepFReKpRVTyqb7LaygUvueQyPWQMIriKTsy675nxo8DQr7tQsO
+y3YlSZgra/xNMikIB6e82c7K8DgyrDQw/rCqjZB3Xt4VCqsWJDLXnQMSn98lx0g7
+d7Lbf8soUpKWXqfdVpSDTi4fibSX6kshXyfSTpcz4AdoncEpViUfU1xkEEmZrjT8
+1GcCsDC41xdNmzCpqRuZX7DKSFRoB+0hUzsC1oiqM7FD5kixonRd4F5PbRXImIzt
+6YCsT2okxTA04jX7yByis7LlOLTlkmLtKQYuc3erOFvwx89s4vW+AeFei+GGNitn
+tHfSwbECgYEA7SzV+nN62hAERHlg8cEQT4TxnsWvbronYWcc/ev44eHSPDWL5tPi
+GHfSbW6YAq5Wa0I9jMWfXyhOYEC3MZTC5EEeLOB71qVrTwcy/sY66rOrcgjFI76Q
+5JFHQ4wy3SWU50KxE0oWJO9LIowprG+pW1vzqC3VF0T7q0FqESrY4LUCgYEA3F7Z
+80ndnCUlooJAb+Hfotv7peFf1o6+m1PTRcz1lLnVt5R5lXj86kn+tXEpYZo1RiGR
+2rE2N0seeznWCooakHcsBN7/qmFIhhooJNF7yW+JP2I4P2UV5+tJ+8bcs/voUkQD
+1x+rGOuMn8nvHBd2+Vharft8eGL2mgooPVI2XusCgYEAlMZpO3+w8pTVeHaDP2MR
+7i/AuQ3cbCLNjSX3Y7jgGCFllWspZRRIYXzYPNkA9b2SbBnTLjjRLgnEkFBIGgvs
+7O2EFjaCuDRvydUEQhjq4ErwIsopj7B8h0QyZcbOKTbn3uFQ3n68wVJx2Sv/ADHT
+FIHrp/WIE96r19Niy34LKXkCgYB2W59VsuOKnMz01l5DeR5C+0HSWxS9SReIl2IO
+yEFSKullWyJeLIgyUaGy0990430feKI8whcrZXYumuah7IDN/KOwzhCk8vEfzWao
+N7bzfqtJVrh9HA7C7DVlO+6H4JFrtcoWPZUIomJ549w/yz6EN3ckoMC+a/Ck1TW9
+ka1QFwKBgQCywG6TrZz0UmOjyLQZ+8Q4uvZklSW5NAKBkNnyuQ2kd5rzyYgMPE8C
+Er8T88fdVIKvkhDyHhwcI7n58xE5Gr7wkwsrk/Hbd9/ZB2GgAPY3cATskK1v1McU
+YeX38CU0fUS4aoy26hWQXkViB47IGQ3jWo3ZCtzIJl8DI9/RsBWTnw==
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIICYDCCAckCAQEwDQYJKoZIhvcNAQEFBQAwKDESMBAGA1UEAxMJbWl0bXByb3h5
+MRIwEAYDVQQKEwltaXRtcHJveHkwHhcNMTMwMTIwMDEwODEzWhcNMTUxMDE3MDEw
+ODEzWjBFMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UE
+ChMYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOC
+AQ8AMIIBCgKCAQEAzCpoRjSTfIN24kkNap/GYmP9zVWj0Gk8R5BB/PvvN0OB1Zk0
+EEYPsWCcuhEdK0ehiDZX030doF0DOncKKa6mop/d0x2o+ts42peDhZM6JNUrm6d+
+ZWQVtio33mpp77UMhR093vaA+ExDnmE26kBTVijJ1+fRAVDXG/cmQINEri91Kk/G
+3YJ5e45UrohGI5seBZ4vV0xbHtmczFRhYFlGOvYsoIe4Lvz/eFS2pIrTIpYQ2VM/
+SQQl+JFy+NlQRsWG2NrxtKOzMnnDE7YN4I3z5D5eZFo1EtwZ48LNCeSwrEOdfuzP
+G5q5qbs5KpE/x85H9umuRwSCIArbMwBYV8a8JwIDAQABMA0GCSqGSIb3DQEBBQUA
+A4GBAFvI+cd47B85PQ970n2dU/PlA2/Hb1ldrrXh2guR4hX6vYx/uuk5yRI/n0Rd
+KOXJ3czO0bd2Fpe3ZoNpkW0pOSDej/Q+58ScuJd0gWCT/Sh1eRk6ZdC0kusOuWoY
+bPOPMkG45LPgUMFOnZEsfJP6P5mZIxlbCvSMFC25nPHWlct7
+-----END CERTIFICATE-----
diff --git a/test/pathod/data/clientcert/make b/test/pathod/data/clientcert/make
new file mode 100644
index 00000000..d1caea81
--- /dev/null
+++ b/test/pathod/data/clientcert/make
@@ -0,0 +1,8 @@
+#!/bin/sh
+
+openssl genrsa -out client.key 2048
+openssl req -key client.key -new -out client.req
+openssl x509 -req -days 365 -in client.req -signkey client.key -out client.crt -extfile client.cnf -extensions ssl_client
+openssl x509 -req -days 1000 -in client.req -CA ~/.mitmproxy/mitmproxy-ca.pem -CAkey ~/.mitmproxy/mitmproxy-ca.pem -set_serial 00001 -out client.crt -extensions ssl_client
+cat client.key client.crt > client.pem
+openssl x509 -text -noout -in client.pem
diff --git a/test/pathod/data/file b/test/pathod/data/file
new file mode 100644
index 00000000..26918572
--- /dev/null
+++ b/test/pathod/data/file
@@ -0,0 +1 @@
+testfile
diff --git a/test/pathod/data/request b/test/pathod/data/request
new file mode 100644
index 00000000..c4c90e76
--- /dev/null
+++ b/test/pathod/data/request
@@ -0,0 +1 @@
+get:/foo
diff --git a/test/pathod/data/response b/test/pathod/data/response
new file mode 100644
index 00000000..8f897c85
--- /dev/null
+++ b/test/pathod/data/response
@@ -0,0 +1 @@
+202
diff --git a/test/pathod/data/testkey.pem b/test/pathod/data/testkey.pem
new file mode 100644
index 00000000..b804bd4c
--- /dev/null
+++ b/test/pathod/data/testkey.pem
@@ -0,0 +1,68 @@
+-----BEGIN RSA PRIVATE KEY-----
+MIIG5QIBAAKCAYEAwvtKxoZvBV2AxPAkCx8PXbuE7KeqK9bBvk8x+JchPMdf/KZj
+sdu2v6Gm8Hi053i7ZGxouFvonJxHAiK6cwk9OYQwa9fbOFf2mgWKEBO4fbCH93tW
+DCTdWVxFyNViAvxGHlJs3/IU03pIG29AgUnhRW8pGbabAfx8emcOZJZ3ykEuimaC
+4s7mRwdc63GXnbcjTtRkrJsBATI+xvPwuR2+4daX7sPCf0kel3bN2jMpwXfvk/Ww
+kJ2BIEeZCg0qIvyMjH9qrUirUnsmQnpPln0CGBbQEBsW9yMfGoFdREiMYls5jZeq
+NxjWNv1RTRIm/4RjMwyxnoTA9eDS9wwO2NnJS4vfXAnUTP4BYx8Pe4ZMA2Gm6YrC
+ysT6YA1xdHNpcuHXClxwmPj/cm8Z5kIg5clbNIK60ts9yFr/Ao3KPPYJ2GBv8/Oe
+ApPBJuubews+/9/13Ew/SJ1t2u28+sPbgXUG8dC2n4vWTvJwKf6Duqxgnm82zdzj
+SZoXRQsP984qiN7NAgMBAAECggGBALB6rqWdzCL5DLI0AQun40qdjaR95UKksNvF
+5p7we379nl2ZZKb5DSHJ+MWzG1pfJo2wqeAkIBiQQp0mPcgdVrMWeJVD3QHUbDng
+RaRjlRr+izJvCeUYANj+8ZLjwECfgf+z7yOLg1oeVeGvAp2C90jXYkYJx6c2lpxb
+ZuWYY3hHIw7V1iXfywIDIhFg0TBJMMYK68xmx7QDfFqrNPj4eWsDxqSvvv1iezPw
+rkWPBX49RjWPrW5XgSZsZ5J3c+oS1rZmIY7EAgopTWB/3wJjZR1Idz/9l9LIWlBP
+6zVC27CIZzSEeGguqNVeyzJ0TPWh5idYNRmSZr6eTUF0245LNO/gqvWKgRSNIZko
+HoBa2F1AvCiB67S1kxjwS5y3VkudZE4jkgGKcC2Ws/9QmOZ0HAsjI8VAMp2hj6iN
+0HdPMTNtsLgbhKyXsoZuW4YmwfSTPxGi2gvcI7GUozpTz84n1cOneJnz1ygx6Uru
+v8DpQg+VX6xTy4X6AK1F8OYNMZ/jaQKBwQDv30NevQStnGbTmcSr+0fd4rmWFklK
+V6B2X7zWynVpSGvCWkqVSp3mG6aiZItAltVMRL/9LT6zIheyClyd+vXIjR2+W210
+XMxrvz7A8qCXkvB2dyEhrMdCfZ7p8+kf+eD2c/Mnxb7VpmDfHYLx30JeQoBwjrwU
+Eul+dE1P+r8bWBaLTjlsipTya74yItWWAToXAo+s1BXBtXhEsLoe4FghlC0u724d
+ucjDaeICdLcerApdvg6Q6p4kVHaoF6ka6I8CgcEA0Bdc05ery9gLC6CclV+BhA5Q
+dfDq2P7qhc7e1ipwNRrQo2gy5HhgOkTL3dJWc+8rV6CBP/JfchnsW40tDOnPCTLT
+gg3n7vv3RHrtncApXuhIFR+B5xjohTPBzxRUMiAOre2d0F5b6eBXFjptf/1i2tQ+
+qdqJoyOGOZP0hKVslGIfz+CKc6WEkIqX7c91Msdr5myeaWDI5TsurfuKRBH395T3
+BMAi6oinAAEb1rdySenLO2A/0kVmBVlTpaN3TNjjAoHBAMvS4uQ1qSv8okNbfgrF
+UqPwa9JkzZImM2tinovFLU9xAl/7aTTCWrmU9Vs4JDuV71kHcjwnngeJCKl4tIpp
+HUB06Lk/5xnhYLKNpz087cjeSwXe5IBA2HBfXhFd+NH6+nVwwUUieq4A2n+8C/CK
+zVJbH9iE8Lv99fpFyQwU/R63EzD8Hz9j4ny7oLnpb6QvFrVGr98jt/kJwlBb+0sR
+RtIBnwMq4F7R5w5lgm6jzpZ5ibVuMeJh+k7Ulp7uu/rpcQKBwQDE3sWIvf7f7PaO
+OpbJz0CmYjCHVLWrNIlGrPAv6Jid9U+cuXEkrCpGFl5V77CxIH59+bEuga0BMztl
+ZkxP4khoqHhom6VpeWJ3nGGAFJRPYS0JJvTsYalilBPxSYdaoO+iZ6MdxpfozcE2
+m3KLW3uSEqlyYvpCqNJNWQhGEoeGXstADWyPevHPGgAhElwL/ZW8u9inU9Tc4sAI
+BGnMer+BsaJ+ERU3lK+Clony+z2aZiFLfIUE93lM6DT2CZBN2QcCgcAVk4L0bfA6
+HFnP/ZWNlnYWpOVFKcq57PX+J5/k7Tf34e2cYM2P0eqYggWZbzVd8qoCOQCHrAx0
+aZSSvEyKAVvzRNeqbm1oXaMojksMnrSX5henHjPbZlr1EmM7+zMnSTMkfVOx/6g1
+97sASej31XdOAgKCBJGymrwvYrCLW+P5cHqd+D8v/PvfpRIQM54p5ixRt3EYZvtR
+zGrzsr0OGyOLZtj1DB0a3kvajAAOCl3TawJSzviKo2mwc+/xj28MCQM=
+-----END RSA PRIVATE KEY-----
+-----BEGIN CERTIFICATE-----
+MIIE4TCCA0mgAwIBAgIJALONCAWZxPhUMA0GCSqGSIb3DQEBCwUAMEExCzAJBgNV
+BAYTAk5aMQ4wDAYDVQQIDAVPdGFnbzEPMA0GA1UECgwGUGF0aG9kMREwDwYDVQQD
+DAh0ZXN0LmNvbTAeFw0xNTA0MTgyMjA0NTNaFw00MjA5MDIyMjA0NTNaMEExCzAJ
+BgNVBAYTAk5aMQ4wDAYDVQQIDAVPdGFnbzEPMA0GA1UECgwGUGF0aG9kMREwDwYD
+VQQDDAh0ZXN0LmNvbTCCAaIwDQYJKoZIhvcNAQEBBQADggGPADCCAYoCggGBAML7
+SsaGbwVdgMTwJAsfD127hOynqivWwb5PMfiXITzHX/ymY7Hbtr+hpvB4tOd4u2Rs
+aLhb6JycRwIiunMJPTmEMGvX2zhX9poFihATuH2wh/d7Vgwk3VlcRcjVYgL8Rh5S
+bN/yFNN6SBtvQIFJ4UVvKRm2mwH8fHpnDmSWd8pBLopmguLO5kcHXOtxl523I07U
+ZKybAQEyPsbz8LkdvuHWl+7Dwn9JHpd2zdozKcF375P1sJCdgSBHmQoNKiL8jIx/
+aq1Iq1J7JkJ6T5Z9AhgW0BAbFvcjHxqBXURIjGJbOY2XqjcY1jb9UU0SJv+EYzMM
+sZ6EwPXg0vcMDtjZyUuL31wJ1Ez+AWMfD3uGTANhpumKwsrE+mANcXRzaXLh1wpc
+cJj4/3JvGeZCIOXJWzSCutLbPcha/wKNyjz2Cdhgb/PzngKTwSbrm3sLPv/f9dxM
+P0idbdrtvPrD24F1BvHQtp+L1k7ycCn+g7qsYJ5vNs3c40maF0ULD/fOKojezQID
+AQABo4HbMIHYMAsGA1UdDwQEAwIFoDAdBgNVHQ4EFgQUbEgfTauEqEP/bnBtby1K
+bihJvcswcQYDVR0jBGowaIAUbEgfTauEqEP/bnBtby1KbihJvcuhRaRDMEExCzAJ
+BgNVBAYTAk5aMQ4wDAYDVQQIDAVPdGFnbzEPMA0GA1UECgwGUGF0aG9kMREwDwYD
+VQQDDAh0ZXN0LmNvbYIJALONCAWZxPhUMAwGA1UdEwQFMAMBAf8wKQYDVR0RBCIw
+IIIIdGVzdC5jb22CCXRlc3QyLmNvbYIJdGVzdDMuY29tMA0GCSqGSIb3DQEBCwUA
+A4IBgQBcTedXtUb91DxQRtg73iomz7cQ4niZntUBW8iE5rpoA7prtQNGHMCbHwaX
+tbWFkzBmL5JTBWvd/6AQ2LtiB3rYB3W/iRhbpsNJ501xaoOguPEQ9720Ph8TEveM
+208gNzGsEOcNALwyXj2y9M19NGu9zMa8eu1Tc3IsQaVaGKHx8XZn5HTNUx8EdcwI
+Z/Ji9ETDCL7+e5INv0tqfFSazWaQUwxM4IzPMkKTYRcMuN/6eog609k9r9pp32Ut
+rKlzc6GIkAlgJJ0Wkoz1V46DmJNJdJG7eLu/mtsB85j6hytIQeWTf1fll5YnMZLF
+HgNZtfYn8Q0oTdBQ0ZOaZeQCfZ8emYBdLJf2YB83uGRMjQ1FoeIxzQqiRq8WHRdb
+9Q45i0DINMnNp0DbLMA4numZ7wT9SQb6sql9eUyuCNDw7nGIWTHUNfLtU1Er3h1d
+icJuApx9+//UN/pGh0yTXb3fZbiI4IehRmkpnIWonIAwaVGm6JZU04wiIn8CuBho
+/qQdlS8=
+-----END CERTIFICATE-----
diff --git a/test/pathod/scripts/generate.sh b/test/pathod/scripts/generate.sh
new file mode 100644
index 00000000..eec3077d
--- /dev/null
+++ b/test/pathod/scripts/generate.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+
+if [ ! -f ./private.key ]
+then
+ openssl genrsa -out private.key 3072
+fi
+openssl req \
+ -batch \
+ -new -x509 \
+ -key private.key \
+ -sha256 \
+ -out cert.pem \
+ -days 9999 \
+ -config ./openssl.cnf
+openssl x509 -in cert.pem -text -noout
+cat ./private.key ./cert.pem > testcert.pem
+rm ./private.key ./cert.pem
diff --git a/test/pathod/scripts/openssl.cnf b/test/pathod/scripts/openssl.cnf
new file mode 100644
index 00000000..5c890354
--- /dev/null
+++ b/test/pathod/scripts/openssl.cnf
@@ -0,0 +1,39 @@
+[ req ]
+default_bits = 1024
+default_keyfile = privkey.pem
+distinguished_name = req_distinguished_name
+x509_extensions = v3_ca
+
+[ req_distinguished_name ]
+countryName = Country Name (2 letter code)
+countryName_default = NZ
+countryName_min = 2
+countryName_max = 2
+stateOrProvinceName = State or Province Name (full name)
+stateOrProvinceName_default = Otago
+localityName = Locality Name (eg, city)
+0.organizationName = Organization Name (eg, company)
+0.organizationName_default = Pathod
+commonName = Common Name (e.g. server FQDN or YOUR name)
+commonName_default = test.com
+commonName_max = 64
+
+[ v3_req ]
+
+basicConstraints = CA:FALSE
+keyUsage = nonRepudiation, digitalSignature, keyEncipherment
+
+[ v3_ca ]
+
+keyUsage = digitalSignature, keyEncipherment
+subjectKeyIdentifier=hash
+authorityKeyIdentifier=keyid:always,issuer:always
+basicConstraints = CA:true
+subjectAltName = @alternate_names
+
+
+[ alternate_names ]
+
+DNS.1 = test.com
+DNS.2 = test2.com
+DNS.3 = test3.com
diff --git a/test/pathod/test_app.py b/test/pathod/test_app.py
new file mode 100644
index 00000000..4536db8e
--- /dev/null
+++ b/test/pathod/test_app.py
@@ -0,0 +1,85 @@
+import tutils
+
+
+class TestApp(tutils.DaemonTests):
+ SSL = False
+
+ def test_index(self):
+ r = self.getpath("/")
+ assert r.status_code == 200
+ assert r.content
+
+ def test_about(self):
+ r = self.getpath("/about")
+ assert r.ok
+
+ def test_download(self):
+ r = self.getpath("/download")
+ assert r.ok
+
+ def test_docs(self):
+ assert self.getpath("/docs/pathod").status_code == 200
+ assert self.getpath("/docs/pathoc").status_code == 200
+ assert self.getpath("/docs/language").status_code == 200
+ assert self.getpath("/docs/libpathod").status_code == 200
+ assert self.getpath("/docs/test").status_code == 200
+
+ def test_log(self):
+ assert self.getpath("/log").status_code == 200
+ assert self.get("200:da").status_code == 200
+ id = self.d.log()[0]["id"]
+ assert self.getpath("/log").status_code == 200
+ assert self.getpath("/log/%s" % id).status_code == 200
+ assert self.getpath("/log/9999999").status_code == 404
+
+ def test_log_binary(self):
+ assert self.get("200:h@10b=@10b:da")
+
+ def test_response_preview(self):
+ r = self.getpath("/response_preview", params=dict(spec="200"))
+ assert r.status_code == 200
+ assert 'Response' in r.content
+
+ r = self.getpath("/response_preview", params=dict(spec="foo"))
+ assert r.status_code == 200
+ assert 'Error' in r.content
+
+ r = self.getpath("/response_preview", params=dict(spec="200:b@100m"))
+ assert r.status_code == 200
+ assert "too large" in r.content
+
+ r = self.getpath("/response_preview", params=dict(spec="200:b@5k"))
+ assert r.status_code == 200
+ assert 'Response' in r.content
+
+ r = self.getpath(
+ "/response_preview",
+ params=dict(
+ spec="200:b<nonexistent"))
+ assert r.status_code == 200
+ assert 'File access denied' in r.content
+
+ r = self.getpath("/response_preview", params=dict(spec="200:b<file"))
+ assert r.status_code == 200
+ assert 'testfile' in r.content
+
+ def test_request_preview(self):
+ r = self.getpath("/request_preview", params=dict(spec="get:/"))
+ assert r.status_code == 200
+ assert 'Request' in r.content
+
+ r = self.getpath("/request_preview", params=dict(spec="foo"))
+ assert r.status_code == 200
+ assert 'Error' in r.content
+
+ r = self.getpath("/request_preview", params=dict(spec="get:/:b@100m"))
+ assert r.status_code == 200
+ assert "too large" in r.content
+
+ r = self.getpath("/request_preview", params=dict(spec="get:/:b@5k"))
+ assert r.status_code == 200
+ assert 'Request' in r.content
+
+ r = self.getpath("/request_preview", params=dict(spec=""))
+ assert r.status_code == 200
+ assert 'empty spec' in r.content
diff --git a/test/pathod/test_language_actions.py b/test/pathod/test_language_actions.py
new file mode 100644
index 00000000..755f0d85
--- /dev/null
+++ b/test/pathod/test_language_actions.py
@@ -0,0 +1,135 @@
+import cStringIO
+
+from libpathod.language import actions
+from libpathod import language
+
+
+def parse_request(s):
+ return language.parse_pathoc(s).next()
+
+
+def test_unique_name():
+ assert not actions.PauseAt(0, "f").unique_name
+ assert actions.DisconnectAt(0).unique_name
+
+
+class TestDisconnects:
+
+ def test_parse_pathod(self):
+ a = language.parse_pathod("400:d0").next().actions[0]
+ assert a.spec() == "d0"
+ a = language.parse_pathod("400:dr").next().actions[0]
+ assert a.spec() == "dr"
+
+ def test_at(self):
+ e = actions.DisconnectAt.expr()
+ v = e.parseString("d0")[0]
+ assert isinstance(v, actions.DisconnectAt)
+ assert v.offset == 0
+
+ v = e.parseString("d100")[0]
+ assert v.offset == 100
+
+ e = actions.DisconnectAt.expr()
+ v = e.parseString("dr")[0]
+ assert v.offset == "r"
+
+ def test_spec(self):
+ assert actions.DisconnectAt("r").spec() == "dr"
+ assert actions.DisconnectAt(10).spec() == "d10"
+
+
+class TestInject:
+
+ def test_parse_pathod(self):
+ a = language.parse_pathod("400:ir,@100").next().actions[0]
+ assert a.offset == "r"
+ assert a.value.datatype == "bytes"
+ assert a.value.usize == 100
+
+ a = language.parse_pathod("400:ia,@100").next().actions[0]
+ assert a.offset == "a"
+
+ def test_at(self):
+ e = actions.InjectAt.expr()
+ v = e.parseString("i0,'foo'")[0]
+ assert v.value.val == "foo"
+ assert v.offset == 0
+ assert isinstance(v, actions.InjectAt)
+
+ v = e.parseString("ir,'foo'")[0]
+ assert v.offset == "r"
+
+ def test_serve(self):
+ s = cStringIO.StringIO()
+ r = language.parse_pathod("400:i0,'foo'").next()
+ assert language.serve(r, s, {})
+
+ def test_spec(self):
+ e = actions.InjectAt.expr()
+ v = e.parseString("i0,'foo'")[0]
+ assert v.spec() == 'i0,"foo"'
+
+ def test_spec(self):
+ e = actions.InjectAt.expr()
+ v = e.parseString("i0,@100")[0]
+ v2 = v.freeze({})
+ v3 = v2.freeze({})
+ assert v2.value.val == v3.value.val
+
+
+class TestPauses:
+
+ def test_parse_pathod(self):
+ e = actions.PauseAt.expr()
+ v = e.parseString("p10,10")[0]
+ assert v.seconds == 10
+ assert v.offset == 10
+
+ v = e.parseString("p10,f")[0]
+ assert v.seconds == "f"
+
+ v = e.parseString("pr,f")[0]
+ assert v.offset == "r"
+
+ v = e.parseString("pa,f")[0]
+ assert v.offset == "a"
+
+ def test_request(self):
+ r = language.parse_pathod('400:p10,10').next()
+ assert r.actions[0].spec() == "p10,10"
+
+ def test_spec(self):
+ assert actions.PauseAt("r", 5).spec() == "pr,5"
+ assert actions.PauseAt(0, 5).spec() == "p0,5"
+ assert actions.PauseAt(0, "f").spec() == "p0,f"
+
+ def test_freeze(self):
+ l = actions.PauseAt("r", 5)
+ assert l.freeze({}).spec() == l.spec()
+
+
+class Test_Action:
+
+ def test_cmp(self):
+ a = actions.DisconnectAt(0)
+ b = actions.DisconnectAt(1)
+ c = actions.DisconnectAt(0)
+ assert a < b
+ assert a == c
+ l = sorted([b, a])
+ assert l[0].offset == 0
+
+ def test_resolve(self):
+ r = parse_request('GET:"/foo"')
+ e = actions.DisconnectAt("r")
+ ret = e.resolve({}, r)
+ assert isinstance(ret.offset, int)
+
+ def test_repr(self):
+ e = actions.DisconnectAt("r")
+ assert repr(e)
+
+ def test_freeze(self):
+ l = actions.DisconnectAt(5)
+ assert l.freeze({}).spec() == l.spec()
diff --git a/test/pathod/test_language_base.py b/test/pathod/test_language_base.py
new file mode 100644
index 00000000..b18ee5b2
--- /dev/null
+++ b/test/pathod/test_language_base.py
@@ -0,0 +1,352 @@
+import os
+from libpathod import language
+from libpathod.language import base, exceptions
+import tutils
+
+
+def parse_request(s):
+ return language.parse_pathoc(s).next()
+
+
+def test_times():
+ reqs = list(language.parse_pathoc("get:/:x5"))
+ assert len(reqs) == 5
+ assert not reqs[0].times
+
+
+def test_caseless_literal():
+ class CL(base.CaselessLiteral):
+ TOK = "foo"
+ v = CL("foo")
+ assert v.expr()
+ assert v.values(language.Settings())
+
+
+class TestTokValueNakedLiteral:
+
+ def test_expr(self):
+ v = base.TokValueNakedLiteral("foo")
+ assert v.expr()
+
+ def test_spec(self):
+ v = base.TokValueNakedLiteral("foo")
+ assert v.spec() == repr(v) == "foo"
+
+ v = base.TokValueNakedLiteral("f\x00oo")
+ assert v.spec() == repr(v) == r"f\x00oo"
+
+
+class TestTokValueLiteral:
+
+ def test_espr(self):
+ v = base.TokValueLiteral("foo")
+ assert v.expr()
+ assert v.val == "foo"
+
+ v = base.TokValueLiteral("foo\n")
+ assert v.expr()
+ assert v.val == "foo\n"
+ assert repr(v)
+
+ def test_spec(self):
+ v = base.TokValueLiteral("foo")
+ assert v.spec() == r"'foo'"
+
+ v = base.TokValueLiteral("f\x00oo")
+ assert v.spec() == repr(v) == r"'f\x00oo'"
+
+ v = base.TokValueLiteral("\"")
+ assert v.spec() == repr(v) == '\'"\''
+
+ def roundtrip(self, spec):
+ e = base.TokValueLiteral.expr()
+ v = base.TokValueLiteral(spec)
+ v2 = e.parseString(v.spec())
+ assert v.val == v2[0].val
+ assert v.spec() == v2[0].spec()
+
+ def test_roundtrip(self):
+ self.roundtrip("'")
+ self.roundtrip('\'')
+ self.roundtrip("a")
+ self.roundtrip("\"")
+ # self.roundtrip("\\")
+ self.roundtrip("200:b'foo':i23,'\\''")
+ self.roundtrip("\a")
+
+
+class TestTokValueGenerate:
+
+ def test_basic(self):
+ v = base.TokValue.parseString("@10b")[0]
+ assert v.usize == 10
+ assert v.unit == "b"
+ assert v.bytes() == 10
+ v = base.TokValue.parseString("@10")[0]
+ assert v.unit == "b"
+ v = base.TokValue.parseString("@10k")[0]
+ assert v.bytes() == 10240
+ v = base.TokValue.parseString("@10g")[0]
+ assert v.bytes() == 1024 ** 3 * 10
+
+ v = base.TokValue.parseString("@10g,digits")[0]
+ assert v.datatype == "digits"
+ g = v.get_generator({})
+ assert g[:100]
+
+ v = base.TokValue.parseString("@10,digits")[0]
+ assert v.unit == "b"
+ assert v.datatype == "digits"
+
+ def test_spec(self):
+ v = base.TokValueGenerate(1, "b", "bytes")
+ assert v.spec() == repr(v) == "@1"
+
+ v = base.TokValueGenerate(1, "k", "bytes")
+ assert v.spec() == repr(v) == "@1k"
+
+ v = base.TokValueGenerate(1, "k", "ascii")
+ assert v.spec() == repr(v) == "@1k,ascii"
+
+ v = base.TokValueGenerate(1, "b", "ascii")
+ assert v.spec() == repr(v) == "@1,ascii"
+
+ def test_freeze(self):
+ v = base.TokValueGenerate(100, "b", "ascii")
+ f = v.freeze(language.Settings())
+ assert len(f.val) == 100
+
+
+class TestTokValueFile:
+
+ def test_file_value(self):
+ v = base.TokValue.parseString("<'one two'")[0]
+ assert str(v)
+ assert v.path == "one two"
+
+ v = base.TokValue.parseString("<path")[0]
+ assert v.path == "path"
+
+ def test_access_control(self):
+ v = base.TokValue.parseString("<path")[0]
+ with tutils.tmpdir() as t:
+ p = os.path.join(t, "path")
+ with open(p, "wb") as f:
+ f.write("x" * 10000)
+
+ assert v.get_generator(language.Settings(staticdir=t))
+
+ v = base.TokValue.parseString("<path2")[0]
+ tutils.raises(
+ exceptions.FileAccessDenied,
+ v.get_generator,
+ language.Settings(staticdir=t)
+ )
+ tutils.raises(
+ "access disabled",
+ v.get_generator,
+ language.Settings()
+ )
+
+ v = base.TokValue.parseString("</outside")[0]
+ tutils.raises(
+ "outside",
+ v.get_generator,
+ language.Settings(staticdir=t)
+ )
+
+ def test_spec(self):
+ v = base.TokValue.parseString("<'one two'")[0]
+ v2 = base.TokValue.parseString(v.spec())[0]
+ assert v2.path == "one two"
+
+ def test_freeze(self):
+ v = base.TokValue.parseString("<'one two'")[0]
+ v2 = v.freeze({})
+ assert v2.path == v.path
+
+
+class TestMisc:
+
+ def test_generators(self):
+ v = base.TokValue.parseString("'val'")[0]
+ g = v.get_generator({})
+ assert g[:] == "val"
+
+ def test_value(self):
+ assert base.TokValue.parseString("'val'")[0].val == "val"
+ assert base.TokValue.parseString('"val"')[0].val == "val"
+ assert base.TokValue.parseString('"\'val\'"')[0].val == "'val'"
+
+ def test_value(self):
+ class TT(base.Value):
+ preamble = "m"
+ e = TT.expr()
+ v = e.parseString("m'msg'")[0]
+ assert v.value.val == "msg"
+
+ s = v.spec()
+ assert s == e.parseString(s)[0].spec()
+
+ v = e.parseString("m@100")[0]
+ v2 = v.freeze({})
+ v3 = v2.freeze({})
+ assert v2.value.val == v3.value.val
+
+ def test_fixedlengthvalue(self):
+ class TT(base.FixedLengthValue):
+ preamble = "m"
+ length = 4
+
+ e = TT.expr()
+ assert e.parseString("m@4")
+ tutils.raises("invalid value length", e.parseString, "m@100")
+ tutils.raises("invalid value length", e.parseString, "m@1")
+
+ with tutils.tmpdir() as t:
+ p = os.path.join(t, "path")
+ s = base.Settings(staticdir=t)
+ with open(p, "wb") as f:
+ f.write("a" * 20)
+ v = e.parseString("m<path")[0]
+ tutils.raises("invalid value length", v.values, s)
+
+ p = os.path.join(t, "path")
+ with open(p, "wb") as f:
+ f.write("a" * 4)
+ v = e.parseString("m<path")[0]
+ assert v.values(s)
+
+
+class TKeyValue(base.KeyValue):
+ preamble = "h"
+
+ def values(self, settings):
+ return [
+ self.key.get_generator(settings),
+ ": ",
+ self.value.get_generator(settings),
+ "\r\n",
+ ]
+
+
+class TestKeyValue:
+
+ def test_simple(self):
+ e = TKeyValue.expr()
+ v = e.parseString("h'foo'='bar'")[0]
+ assert v.key.val == "foo"
+ assert v.value.val == "bar"
+
+ v2 = e.parseString(v.spec())[0]
+ assert v2.key.val == v.key.val
+ assert v2.value.val == v.value.val
+
+ s = v.spec()
+ assert s == e.parseString(s)[0].spec()
+
+ def test_freeze(self):
+ e = TKeyValue.expr()
+ v = e.parseString("h@10=@10'")[0]
+ v2 = v.freeze({})
+ v3 = v2.freeze({})
+ assert v2.key.val == v3.key.val
+ assert v2.value.val == v3.value.val
+
+
+def test_intfield():
+ class TT(base.IntField):
+ preamble = "t"
+ names = {
+ "one": 1,
+ "two": 2,
+ "three": 3
+ }
+ max = 4
+ e = TT.expr()
+
+ v = e.parseString("tone")[0]
+ assert v.value == 1
+ assert v.spec() == "tone"
+ assert v.values(language.Settings())
+
+ v = e.parseString("t1")[0]
+ assert v.value == 1
+ assert v.spec() == "t1"
+
+ v = e.parseString("t4")[0]
+ assert v.value == 4
+ assert v.spec() == "t4"
+
+ tutils.raises("can't exceed", e.parseString, "t5")
+
+
+def test_options_or_value():
+ class TT(base.OptionsOrValue):
+ options = [
+ "one",
+ "two",
+ "three"
+ ]
+ e = TT.expr()
+ assert e.parseString("one")[0].value.val == "one"
+ assert e.parseString("'foo'")[0].value.val == "foo"
+ assert e.parseString("'get'")[0].value.val == "get"
+
+ assert e.parseString("one")[0].spec() == "one"
+ assert e.parseString("'foo'")[0].spec() == "'foo'"
+
+ s = e.parseString("one")[0].spec()
+ assert s == e.parseString(s)[0].spec()
+
+ s = e.parseString("'foo'")[0].spec()
+ assert s == e.parseString(s)[0].spec()
+
+ v = e.parseString("@100")[0]
+ v2 = v.freeze({})
+ v3 = v2.freeze({})
+ assert v2.value.val == v3.value.val
+
+
+def test_integer():
+ e = base.Integer.expr()
+ v = e.parseString("200")[0]
+ assert v.string() == "200"
+ assert v.spec() == "200"
+
+ assert v.freeze({}).value == v.value
+
+ class BInt(base.Integer):
+ bounds = (1, 5)
+
+ tutils.raises("must be between", BInt, 0)
+ tutils.raises("must be between", BInt, 6)
+ assert BInt(5)
+ assert BInt(1)
+ assert BInt(3)
+
+
+class TBoolean(base.Boolean):
+ name = "test"
+
+
+def test_unique_name():
+ b = TBoolean(True)
+ assert b.unique_name
+
+
+class test_boolean():
+ e = TBoolean.expr()
+ assert e.parseString("test")[0].value
+ assert not e.parseString("-test")[0].value
+
+ def roundtrip(s):
+ e = TBoolean.expr()
+ s2 = e.parseString(s)[0].spec()
+ v1 = e.parseString(s)[0].value
+ v2 = e.parseString(s2)[0].value
+ assert s == s2
+ assert v1 == v2
+
+ roundtrip("test")
+ roundtrip("-test")
diff --git a/test/pathod/test_language_generators.py b/test/pathod/test_language_generators.py
new file mode 100644
index 00000000..945560c3
--- /dev/null
+++ b/test/pathod/test_language_generators.py
@@ -0,0 +1,42 @@
+import os
+
+from libpathod.language import generators
+import tutils
+
+
+def test_randomgenerator():
+ g = generators.RandomGenerator("bytes", 100)
+ assert repr(g)
+ assert len(g[:10]) == 10
+ assert len(g[1:10]) == 9
+ assert len(g[:1000]) == 100
+ assert len(g[1000:1001]) == 0
+ assert g[0]
+
+
+def test_filegenerator():
+ with tutils.tmpdir() as t:
+ path = os.path.join(t, "foo")
+ f = open(path, "wb")
+ f.write("x" * 10000)
+ f.close()
+ g = generators.FileGenerator(path)
+ assert len(g) == 10000
+ assert g[0] == "x"
+ assert g[-1] == "x"
+ assert g[0:5] == "xxxxx"
+ assert repr(g)
+ # remove all references to FileGenerator instance to close the file
+ # handle.
+ del g
+
+
+def test_transform_generator():
+ def trans(offset, data):
+ return "a" * len(data)
+ g = "one"
+ t = generators.TransformGenerator(g, trans)
+ assert len(t) == len(g)
+ assert t[0] == "a"
+ assert t[:] == "a" * len(g)
+ assert repr(t)
diff --git a/test/pathod/test_language_http.py b/test/pathod/test_language_http.py
new file mode 100644
index 00000000..26bb6a45
--- /dev/null
+++ b/test/pathod/test_language_http.py
@@ -0,0 +1,358 @@
+import cStringIO
+
+from libpathod import language
+from libpathod.language import http, base
+import tutils
+
+
+def parse_request(s):
+ return language.parse_pathoc(s).next()
+
+
+def test_make_error_response():
+ d = cStringIO.StringIO()
+ s = http.make_error_response("foo")
+ language.serve(s, d, {})
+
+
+class TestRequest:
+
+ def test_nonascii(self):
+ tutils.raises("ascii", parse_request, "get:\xf0")
+
+ def test_err(self):
+ tutils.raises(language.ParseException, parse_request, 'GET')
+
+ def test_simple(self):
+ r = parse_request('GET:"/foo"')
+ assert r.method.string() == "GET"
+ assert r.path.string() == "/foo"
+ r = parse_request('GET:/foo')
+ assert r.path.string() == "/foo"
+ r = parse_request('GET:@1k')
+ assert len(r.path.string()) == 1024
+
+ def test_multiple(self):
+ r = list(language.parse_pathoc("GET:/ PUT:/"))
+ assert r[0].method.string() == "GET"
+ assert r[1].method.string() == "PUT"
+ assert len(r) == 2
+
+ l = """
+ GET
+ "/foo"
+ ir,@1
+
+ PUT
+
+ "/foo
+
+
+
+ bar"
+
+ ir,@1
+ """
+ r = list(language.parse_pathoc(l))
+ assert len(r) == 2
+ assert r[0].method.string() == "GET"
+ assert r[1].method.string() == "PUT"
+
+ l = """
+ get:"http://localhost:9999/p/200":ir,@1
+ get:"http://localhost:9999/p/200":ir,@2
+ """
+ r = list(language.parse_pathoc(l))
+ assert len(r) == 2
+ assert r[0].method.string() == "GET"
+ assert r[1].method.string() == "GET"
+
+ def test_nested_response(self):
+ l = "get:/p:s'200'"
+ r = list(language.parse_pathoc(l))
+ assert len(r) == 1
+ assert len(r[0].tokens) == 3
+ assert isinstance(r[0].tokens[2], http.NestedResponse)
+ assert r[0].values({})
+
+ def test_render(self):
+ s = cStringIO.StringIO()
+ r = parse_request("GET:'/foo'")
+ assert language.serve(
+ r,
+ s,
+ language.Settings(request_host="foo.com")
+ )
+
+ def test_multiline(self):
+ l = """
+ GET
+ "/foo"
+ ir,@1
+ """
+ r = parse_request(l)
+ assert r.method.string() == "GET"
+ assert r.path.string() == "/foo"
+ assert r.actions
+
+ l = """
+ GET
+
+ "/foo
+
+
+
+ bar"
+
+ ir,@1
+ """
+ r = parse_request(l)
+ assert r.method.string() == "GET"
+ assert r.path.string().endswith("bar")
+ assert r.actions
+
+ def test_spec(self):
+ def rt(s):
+ s = parse_request(s).spec()
+ assert parse_request(s).spec() == s
+ rt("get:/foo")
+ rt("get:/foo:da")
+
+ def test_freeze(self):
+ r = parse_request("GET:/:b@100").freeze(language.Settings())
+ assert len(r.spec()) > 100
+
+ def test_path_generator(self):
+ r = parse_request("GET:@100").freeze(language.Settings())
+ assert len(r.spec()) > 100
+
+ def test_websocket(self):
+ r = parse_request('ws:/path/')
+ res = r.resolve(language.Settings())
+ assert res.method.string().lower() == "get"
+ assert res.tok(http.Path).value.val == "/path/"
+ assert res.tok(http.Method).value.val.lower() == "get"
+ assert http.get_header("Upgrade", res.headers).value.val == "websocket"
+
+ r = parse_request('ws:put:/path/')
+ res = r.resolve(language.Settings())
+ assert r.method.string().lower() == "put"
+ assert res.tok(http.Path).value.val == "/path/"
+ assert res.tok(http.Method).value.val.lower() == "put"
+ assert http.get_header("Upgrade", res.headers).value.val == "websocket"
+
+
+class TestResponse:
+
+ def dummy_response(self):
+ return language.parse_pathod("400'msg'").next()
+
+ def test_response(self):
+ r = language.parse_pathod("400:m'msg'").next()
+ assert r.status_code.string() == "400"
+ assert r.reason.string() == "msg"
+
+ r = language.parse_pathod("400:m'msg':b@100b").next()
+ assert r.reason.string() == "msg"
+ assert r.body.values({})
+ assert str(r)
+
+ r = language.parse_pathod("200").next()
+ assert r.status_code.string() == "200"
+ assert not r.reason
+ assert "OK" in [i[:] for i in r.preamble({})]
+
+ def test_render(self):
+ s = cStringIO.StringIO()
+ r = language.parse_pathod("400:m'msg'").next()
+ assert language.serve(r, s, {})
+
+ r = language.parse_pathod("400:p0,100:dr").next()
+ assert "p0" in r.spec()
+ s = r.preview_safe()
+ assert "p0" not in s.spec()
+
+ def test_raw(self):
+ s = cStringIO.StringIO()
+ r = language.parse_pathod("400:b'foo'").next()
+ language.serve(r, s, {})
+ v = s.getvalue()
+ assert "Content-Length" in v
+
+ s = cStringIO.StringIO()
+ r = language.parse_pathod("400:b'foo':r").next()
+ language.serve(r, s, {})
+ v = s.getvalue()
+ assert "Content-Length" not in v
+
+ def test_length(self):
+ def testlen(x):
+ s = cStringIO.StringIO()
+ x = x.next()
+ language.serve(x, s, language.Settings())
+ assert x.length(language.Settings()) == len(s.getvalue())
+ testlen(language.parse_pathod("400:m'msg':r"))
+ testlen(language.parse_pathod("400:m'msg':h'foo'='bar':r"))
+ testlen(language.parse_pathod("400:m'msg':h'foo'='bar':b@100b:r"))
+
+ def test_maximum_length(self):
+ def testlen(x):
+ x = x.next()
+ s = cStringIO.StringIO()
+ m = x.maximum_length({})
+ language.serve(x, s, {})
+ assert m >= len(s.getvalue())
+
+ r = language.parse_pathod("400:m'msg':b@100:d0")
+ testlen(r)
+
+ r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'")
+ testlen(r)
+
+ r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'")
+ testlen(r)
+
+ def test_parse_err(self):
+ tutils.raises(
+ language.ParseException, language.parse_pathod, "400:msg,b:"
+ )
+ try:
+ language.parse_pathod("400'msg':b:")
+ except language.ParseException as v:
+ assert v.marked()
+ assert str(v)
+
+ def test_nonascii(self):
+ tutils.raises("ascii", language.parse_pathod, "foo:b\xf0")
+
+ def test_parse_header(self):
+ r = language.parse_pathod('400:h"foo"="bar"').next()
+ assert http.get_header("foo", r.headers)
+
+ def test_parse_pause_before(self):
+ r = language.parse_pathod("400:p0,10").next()
+ assert r.actions[0].spec() == "p0,10"
+
+ def test_parse_pause_after(self):
+ r = language.parse_pathod("400:pa,10").next()
+ assert r.actions[0].spec() == "pa,10"
+
+ def test_parse_pause_random(self):
+ r = language.parse_pathod("400:pr,10").next()
+ assert r.actions[0].spec() == "pr,10"
+
+ def test_parse_stress(self):
+ # While larger values are known to work on linux, len() technically
+ # returns an int and a python 2.7 int on windows has 32bit precision.
+ # Therefore, we should keep the body length < 2147483647 bytes in our
+ # tests.
+ r = language.parse_pathod("400:b@1g").next()
+ assert r.length({})
+
+ def test_spec(self):
+ def rt(s):
+ s = language.parse_pathod(s).next().spec()
+ assert language.parse_pathod(s).next().spec() == s
+ rt("400:b@100g")
+ rt("400")
+ rt("400:da")
+
+ def test_websockets(self):
+ r = language.parse_pathod("ws").next()
+ tutils.raises("no websocket key", r.resolve, language.Settings())
+ res = r.resolve(language.Settings(websocket_key="foo"))
+ assert res.status_code.string() == "101"
+
+
+def test_ctype_shortcut():
+ e = http.ShortcutContentType.expr()
+ v = e.parseString("c'foo'")[0]
+ assert v.key.val == "Content-Type"
+ assert v.value.val == "foo"
+
+ s = v.spec()
+ assert s == e.parseString(s)[0].spec()
+
+ e = http.ShortcutContentType.expr()
+ v = e.parseString("c@100")[0]
+ v2 = v.freeze({})
+ v3 = v2.freeze({})
+ assert v2.value.val == v3.value.val
+
+
+def test_location_shortcut():
+ e = http.ShortcutLocation.expr()
+ v = e.parseString("l'foo'")[0]
+ assert v.key.val == "Location"
+ assert v.value.val == "foo"
+
+ s = v.spec()
+ assert s == e.parseString(s)[0].spec()
+
+ e = http.ShortcutLocation.expr()
+ v = e.parseString("l@100")[0]
+ v2 = v.freeze({})
+ v3 = v2.freeze({})
+ assert v2.value.val == v3.value.val
+
+
+def test_shortcuts():
+ assert language.parse_pathod(
+ "400:c'foo'").next().headers[0].key.val == "Content-Type"
+ assert language.parse_pathod(
+ "400:l'foo'").next().headers[0].key.val == "Location"
+
+ assert "Android" in tutils.render(parse_request("get:/:ua"))
+ assert "User-Agent" in tutils.render(parse_request("get:/:ua"))
+
+
+def test_user_agent():
+ e = http.ShortcutUserAgent.expr()
+ v = e.parseString("ua")[0]
+ assert "Android" in v.string()
+
+ e = http.ShortcutUserAgent.expr()
+ v = e.parseString("u'a'")[0]
+ assert "Android" not in v.string()
+
+ v = e.parseString("u@100'")[0]
+ assert len(str(v.freeze({}).value)) > 100
+ v2 = v.freeze({})
+ v3 = v2.freeze({})
+ assert v2.value.val == v3.value.val
+
+
+def test_nested_response():
+ e = http.NestedResponse.expr()
+ v = e.parseString("s'200'")[0]
+ assert v.value.val == "200"
+ tutils.raises(
+ language.ParseException,
+ e.parseString,
+ "s'foo'"
+ )
+
+ v = e.parseString('s"200:b@1"')[0]
+ assert "@1" in v.spec()
+ f = v.freeze({})
+ assert "@1" not in f.spec()
+
+
+def test_nested_response_freeze():
+ e = http.NestedResponse(
+ base.TokValueLiteral(
+ "200:b'foo':i10,'\\x27'".encode(
+ "string_escape"
+ )
+ )
+ )
+ assert e.freeze({})
+ assert e.values({})
+
+
+def test_unique_components():
+ tutils.raises(
+ "multiple body clauses",
+ language.parse_pathod,
+ "400:b@1:b@1"
+ )
diff --git a/test/pathod/test_language_http2.py b/test/pathod/test_language_http2.py
new file mode 100644
index 00000000..9be49452
--- /dev/null
+++ b/test/pathod/test_language_http2.py
@@ -0,0 +1,233 @@
+import cStringIO
+
+import netlib
+from netlib import tcp
+from netlib.http import user_agents
+
+from libpathod import language
+from libpathod.language import http2, base
+import tutils
+
+
+def parse_request(s):
+ return language.parse_pathoc(s, True).next()
+
+
+def parse_response(s):
+ return language.parse_pathod(s, True).next()
+
+
+def default_settings():
+ return language.Settings(
+ request_host="foo.com",
+ protocol=netlib.http.http2.HTTP2Protocol(tcp.TCPClient(('localhost', 1234)))
+ )
+
+
+def test_make_error_response():
+ d = cStringIO.StringIO()
+ s = http2.make_error_response("foo", "bar")
+ language.serve(s, d, default_settings())
+
+
+class TestRequest:
+
+ def test_cached_values(self):
+ req = parse_request("get:/")
+ req_id = id(req)
+ assert req_id == id(req.resolve(default_settings()))
+ assert req.values(default_settings()) == req.values(default_settings())
+
+ def test_nonascii(self):
+ tutils.raises("ascii", parse_request, "get:\xf0")
+
+ def test_err(self):
+ tutils.raises(language.ParseException, parse_request, 'GET')
+
+ def test_simple(self):
+ r = parse_request('GET:"/foo"')
+ assert r.method.string() == "GET"
+ assert r.path.string() == "/foo"
+ r = parse_request('GET:/foo')
+ assert r.path.string() == "/foo"
+
+ def test_multiple(self):
+ r = list(language.parse_pathoc("GET:/ PUT:/"))
+ assert r[0].method.string() == "GET"
+ assert r[1].method.string() == "PUT"
+ assert len(r) == 2
+
+ l = """
+ GET
+ "/foo"
+
+ PUT
+
+ "/foo
+
+
+
+ bar"
+ """
+ r = list(language.parse_pathoc(l, True))
+ assert len(r) == 2
+ assert r[0].method.string() == "GET"
+ assert r[1].method.string() == "PUT"
+
+ l = """
+ get:"http://localhost:9999/p/200"
+ get:"http://localhost:9999/p/200"
+ """
+ r = list(language.parse_pathoc(l, True))
+ assert len(r) == 2
+ assert r[0].method.string() == "GET"
+ assert r[1].method.string() == "GET"
+
+ def test_render_simple(self):
+ s = cStringIO.StringIO()
+ r = parse_request("GET:'/foo'")
+ assert language.serve(
+ r,
+ s,
+ default_settings(),
+ )
+
+ def test_raw_content_length(self):
+ r = parse_request('GET:/:r')
+ assert len(r.headers) == 0
+
+ r = parse_request('GET:/:r:b"foobar"')
+ assert len(r.headers) == 0
+
+ r = parse_request('GET:/')
+ assert len(r.headers) == 1
+ assert r.headers[0].values(default_settings()) == ("content-length", "0")
+
+ r = parse_request('GET:/:b"foobar"')
+ assert len(r.headers) == 1
+ assert r.headers[0].values(default_settings()) == ("content-length", "6")
+
+ r = parse_request('GET:/:b"foobar":h"content-length"="42"')
+ assert len(r.headers) == 1
+ assert r.headers[0].values(default_settings()) == ("content-length", "42")
+
+ r = parse_request('GET:/:r:b"foobar":h"content-length"="42"')
+ assert len(r.headers) == 1
+ assert r.headers[0].values(default_settings()) == ("content-length", "42")
+
+ def test_content_type(self):
+ r = parse_request('GET:/:r:c"foobar"')
+ assert len(r.headers) == 1
+ assert r.headers[0].values(default_settings()) == ("content-type", "foobar")
+
+ def test_user_agent(self):
+ r = parse_request('GET:/:r:ua')
+ assert len(r.headers) == 1
+ assert r.headers[0].values(default_settings()) == ("user-agent", user_agents.get_by_shortcut('a')[2])
+
+ def test_render_with_headers(self):
+ s = cStringIO.StringIO()
+ r = parse_request('GET:/foo:h"foo"="bar"')
+ assert language.serve(
+ r,
+ s,
+ default_settings(),
+ )
+
+ def test_nested_response(self):
+ l = "get:/p/:s'200'"
+ r = parse_request(l)
+ assert len(r.tokens) == 3
+ assert isinstance(r.tokens[2], http2.NestedResponse)
+ assert r.values(default_settings())
+
+
+ def test_render_with_body(self):
+ s = cStringIO.StringIO()
+ r = parse_request("GET:'/foo':bfoobar")
+ assert language.serve(
+ r,
+ s,
+ default_settings(),
+ )
+
+ def test_spec(self):
+ def rt(s):
+ s = parse_request(s).spec()
+ assert parse_request(s).spec() == s
+ rt("get:/foo")
+
+
+class TestResponse:
+
+ def test_cached_values(self):
+ res = parse_response("200")
+ res_id = id(res)
+ assert res_id == id(res.resolve(default_settings()))
+ assert res.values(default_settings()) == res.values(default_settings())
+
+ def test_nonascii(self):
+ tutils.raises("ascii", parse_response, "200:\xf0")
+
+ def test_err(self):
+ tutils.raises(language.ParseException, parse_response, 'GET:/')
+
+ def test_raw_content_length(self):
+ r = parse_response('200:r')
+ assert len(r.headers) == 0
+
+ r = parse_response('200')
+ assert len(r.headers) == 1
+ assert r.headers[0].values(default_settings()) == ("content-length", "0")
+
+ def test_content_type(self):
+ r = parse_response('200:r:c"foobar"')
+ assert len(r.headers) == 1
+ assert r.headers[0].values(default_settings()) == ("content-type", "foobar")
+
+ def test_simple(self):
+ r = parse_response('200:r:h"foo"="bar"')
+ assert r.status_code.string() == "200"
+ assert len(r.headers) == 1
+ assert r.headers[0].values(default_settings()) == ("foo", "bar")
+ assert r.body is None
+
+ r = parse_response('200:r:h"foo"="bar":bfoobar:h"bla"="fasel"')
+ assert r.status_code.string() == "200"
+ assert len(r.headers) == 2
+ assert r.headers[0].values(default_settings()) == ("foo", "bar")
+ assert r.headers[1].values(default_settings()) == ("bla", "fasel")
+ assert r.body.string() == "foobar"
+
+ def test_render_simple(self):
+ s = cStringIO.StringIO()
+ r = parse_response('200')
+ assert language.serve(
+ r,
+ s,
+ default_settings(),
+ )
+
+ def test_render_with_headers(self):
+ s = cStringIO.StringIO()
+ r = parse_response('200:h"foo"="bar"')
+ assert language.serve(
+ r,
+ s,
+ default_settings(),
+ )
+
+ def test_render_with_body(self):
+ s = cStringIO.StringIO()
+ r = parse_response('200:bfoobar')
+ assert language.serve(
+ r,
+ s,
+ default_settings(),
+ )
+
+ def test_spec(self):
+ def rt(s):
+ s = parse_response(s).spec()
+ assert parse_response(s).spec() == s
+ rt("200:bfoobar")
diff --git a/test/pathod/test_language_websocket.py b/test/pathod/test_language_websocket.py
new file mode 100644
index 00000000..d98fd33e
--- /dev/null
+++ b/test/pathod/test_language_websocket.py
@@ -0,0 +1,142 @@
+
+from libpathod import language
+from libpathod.language import websockets
+import netlib.websockets
+import tutils
+
+
+def parse_request(s):
+ return language.parse_pathoc(s).next()
+
+
+class TestWebsocketFrame:
+
+ def _test_messages(self, specs, message_klass):
+ for i in specs:
+ wf = parse_request(i)
+ assert isinstance(wf, message_klass)
+ assert wf
+ assert wf.values(language.Settings())
+ assert wf.resolve(language.Settings())
+
+ spec = wf.spec()
+ wf2 = parse_request(spec)
+ assert wf2.spec() == spec
+
+ def test_server_values(self):
+ specs = [
+ "wf",
+ "wf:dr",
+ "wf:b'foo'",
+ "wf:mask:r'foo'",
+ "wf:l1024:b'foo'",
+ "wf:cbinary",
+ "wf:c1",
+ "wf:mask:knone",
+ "wf:fin",
+ "wf:fin:rsv1:rsv2:rsv3:mask",
+ "wf:-fin:-rsv1:-rsv2:-rsv3:-mask",
+ "wf:k@4",
+ "wf:x10",
+ ]
+ self._test_messages(specs, websockets.WebsocketFrame)
+
+ def test_parse_websocket_frames(self):
+ wf = language.parse_websocket_frame("wf:x10")
+ assert len(list(wf)) == 10
+ tutils.raises(
+ language.ParseException,
+ language.parse_websocket_frame,
+ "wf:x"
+ )
+
+ def test_client_values(self):
+ specs = [
+ "wf:f'wf'",
+ ]
+ self._test_messages(specs, websockets.WebsocketClientFrame)
+
+ def test_nested_frame(self):
+ wf = parse_request("wf:f'wf'")
+ assert wf.nested_frame
+
+ def test_flags(self):
+ wf = parse_request("wf:fin:mask:rsv1:rsv2:rsv3")
+ frm = netlib.websockets.Frame.from_bytes(tutils.render(wf))
+ assert frm.header.fin
+ assert frm.header.mask
+ assert frm.header.rsv1
+ assert frm.header.rsv2
+ assert frm.header.rsv3
+
+ wf = parse_request("wf:-fin:-mask:-rsv1:-rsv2:-rsv3")
+ frm = netlib.websockets.Frame.from_bytes(tutils.render(wf))
+ assert not frm.header.fin
+ assert not frm.header.mask
+ assert not frm.header.rsv1
+ assert not frm.header.rsv2
+ assert not frm.header.rsv3
+
+ def fr(self, spec, **kwargs):
+ settings = language.base.Settings(**kwargs)
+ wf = parse_request(spec)
+ return netlib.websockets.Frame.from_bytes(tutils.render(wf, settings))
+
+ def test_construction(self):
+ assert self.fr("wf:c1").header.opcode == 1
+ assert self.fr("wf:c0").header.opcode == 0
+ assert self.fr("wf:cbinary").header.opcode ==\
+ netlib.websockets.OPCODE.BINARY
+ assert self.fr("wf:ctext").header.opcode ==\
+ netlib.websockets.OPCODE.TEXT
+
+ def test_rawbody(self):
+ frm = self.fr("wf:mask:r'foo'")
+ assert len(frm.payload) == 3
+ assert frm.payload != "foo"
+
+ assert self.fr("wf:r'foo'").payload == "foo"
+
+ def test_construction(self):
+ # Simple server frame
+ frm = self.fr("wf:b'foo'")
+ assert not frm.header.mask
+ assert not frm.header.masking_key
+
+ # Simple client frame
+ frm = self.fr("wf:b'foo'", is_client=True)
+ assert frm.header.mask
+ assert frm.header.masking_key
+ frm = self.fr("wf:b'foo':k'abcd'", is_client=True)
+ assert frm.header.mask
+ assert frm.header.masking_key == 'abcd'
+
+ # Server frame, mask explicitly set
+ frm = self.fr("wf:b'foo':mask")
+ assert frm.header.mask
+ assert frm.header.masking_key
+ frm = self.fr("wf:b'foo':k'abcd'")
+ assert frm.header.mask
+ assert frm.header.masking_key == 'abcd'
+
+ # Client frame, mask explicitly unset
+ frm = self.fr("wf:b'foo':-mask", is_client=True)
+ assert not frm.header.mask
+ assert not frm.header.masking_key
+
+ frm = self.fr("wf:b'foo':-mask:k'abcd'", is_client=True)
+ assert not frm.header.mask
+ # We're reading back a corrupted frame - the first 3 characters of the
+ # mask is mis-interpreted as the payload
+ assert frm.payload == "abc"
+
+ def test_knone(self):
+ with tutils.raises("expected 4 bytes"):
+ self.fr("wf:b'foo':mask:knone")
+
+ def test_length(self):
+ assert self.fr("wf:l3:b'foo'").header.payload_length == 3
+ frm = self.fr("wf:l2:b'foo'")
+ assert frm.header.payload_length == 2
+ assert frm.payload == "fo"
+ tutils.raises("expected 1024 bytes", self.fr, "wf:l1024:b'foo'")
diff --git a/test/pathod/test_language_writer.py b/test/pathod/test_language_writer.py
new file mode 100644
index 00000000..1a532903
--- /dev/null
+++ b/test/pathod/test_language_writer.py
@@ -0,0 +1,91 @@
+import cStringIO
+
+from libpathod import language
+from libpathod.language import writer
+
+
+def test_send_chunk():
+ v = "foobarfoobar"
+ for bs in range(1, len(v) + 2):
+ s = cStringIO.StringIO()
+ writer.send_chunk(s, v, bs, 0, len(v))
+ assert s.getvalue() == v
+ for start in range(len(v)):
+ for end in range(len(v)):
+ s = cStringIO.StringIO()
+ writer.send_chunk(s, v, bs, start, end)
+ assert s.getvalue() == v[start:end]
+
+
+def test_write_values_inject():
+ tst = "foo"
+
+ s = cStringIO.StringIO()
+ writer.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5)
+ assert s.getvalue() == "aaafoo"
+
+ s = cStringIO.StringIO()
+ writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
+ assert s.getvalue() == "faaaoo"
+
+ s = cStringIO.StringIO()
+ writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
+ assert s.getvalue() == "faaaoo"
+
+
+def test_write_values_disconnects():
+ s = cStringIO.StringIO()
+ tst = "foo" * 100
+ writer.write_values(s, [tst], [(0, "disconnect")], blocksize=5)
+ assert not s.getvalue()
+
+
+def test_write_values():
+ tst = "foobarvoing"
+ s = cStringIO.StringIO()
+ writer.write_values(s, [tst], [])
+ assert s.getvalue() == tst
+
+ for bs in range(1, len(tst) + 2):
+ for off in range(len(tst)):
+ s = cStringIO.StringIO()
+ writer.write_values(
+ s, [tst], [(off, "disconnect")], blocksize=bs
+ )
+ assert s.getvalue() == tst[:off]
+
+
+def test_write_values_pauses():
+ tst = "".join(str(i) for i in range(10))
+ for i in range(2, 10):
+ s = cStringIO.StringIO()
+ writer.write_values(
+ s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i
+ )
+ assert s.getvalue() == tst
+
+ for i in range(2, 10):
+ s = cStringIO.StringIO()
+ writer.write_values(s, [tst], [(1, "pause", 0)], blocksize=i)
+ assert s.getvalue() == tst
+
+ tst = ["".join(str(i) for i in range(10))] * 5
+ for i in range(2, 10):
+ s = cStringIO.StringIO()
+ writer.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i)
+ assert s.getvalue() == "".join(tst)
+
+
+def test_write_values_after():
+ s = cStringIO.StringIO()
+ r = language.parse_pathod("400:da").next()
+ language.serve(r, s, {})
+
+ s = cStringIO.StringIO()
+ r = language.parse_pathod("400:pa,0").next()
+ language.serve(r, s, {})
+
+ s = cStringIO.StringIO()
+ r = language.parse_pathod("400:ia,'xx'").next()
+ language.serve(r, s, {})
+ assert s.getvalue().endswith('xx')
diff --git a/test/pathod/test_log.py b/test/pathod/test_log.py
new file mode 100644
index 00000000..8f38c040
--- /dev/null
+++ b/test/pathod/test_log.py
@@ -0,0 +1,25 @@
+import StringIO
+from libpathod import log
+from netlib.exceptions import TcpDisconnect
+import netlib.tcp
+
+
+class DummyIO(StringIO.StringIO):
+
+ def start_log(self, *args, **kwargs):
+ pass
+
+ def get_log(self, *args, **kwargs):
+ return ""
+
+
+def test_disconnect():
+ outf = DummyIO()
+ rw = DummyIO()
+ l = log.ConnectionLogger(outf, False, rw, rw)
+ try:
+ with l.ctx() as lg:
+ lg("Test")
+ except TcpDisconnect:
+ pass
+ assert "Test" in outf.getvalue()
diff --git a/test/pathod/test_pathoc.py b/test/pathod/test_pathoc.py
new file mode 100644
index 00000000..62696a64
--- /dev/null
+++ b/test/pathod/test_pathoc.py
@@ -0,0 +1,308 @@
+import json
+import cStringIO
+import re
+import OpenSSL
+from mock import Mock
+
+from netlib import tcp, http, socks
+from netlib.exceptions import HttpException, TcpException, NetlibException
+from netlib.http import http1, http2
+
+from libpathod import pathoc, test, version, pathod, language
+from netlib.tutils import raises
+import tutils
+
+
+def test_response():
+ r = http.Response("HTTP/1.1", 200, "Message", {}, None, None)
+ assert repr(r)
+
+
+class _TestDaemon:
+ ssloptions = pathod.SSLOptions()
+
+ @classmethod
+ def setup_class(cls):
+ cls.d = test.Daemon(
+ ssl=cls.ssl,
+ ssloptions=cls.ssloptions,
+ staticdir=tutils.test_data.path("data"),
+ anchors=[
+ (re.compile("/anchor/.*"), "202")
+ ]
+ )
+
+ @classmethod
+ def teardown_class(cls):
+ cls.d.shutdown()
+
+ def setUp(self):
+ self.d.clear_log()
+
+ def test_info(self):
+ c = pathoc.Pathoc(
+ ("127.0.0.1", self.d.port),
+ ssl=self.ssl,
+ fp=None
+ )
+ c.connect()
+ resp = c.request("get:/api/info")
+ assert tuple(json.loads(resp.content)["version"]) == version.IVERSION
+
+ def tval(
+ self,
+ requests,
+ showreq=False,
+ showresp=False,
+ explain=False,
+ showssl=False,
+ hexdump=False,
+ timeout=None,
+ ignorecodes=(),
+ ignoretimeout=None,
+ showsummary=True
+ ):
+ s = cStringIO.StringIO()
+ c = pathoc.Pathoc(
+ ("127.0.0.1", self.d.port),
+ ssl=self.ssl,
+ showreq=showreq,
+ showresp=showresp,
+ explain=explain,
+ hexdump=hexdump,
+ ignorecodes=ignorecodes,
+ ignoretimeout=ignoretimeout,
+ showsummary=showsummary,
+ fp=s
+ )
+ c.connect(showssl=showssl, fp=s)
+ if timeout:
+ c.settimeout(timeout)
+ for i in requests:
+ r = language.parse_pathoc(i).next()
+ if explain:
+ r = r.freeze(language.Settings())
+ try:
+ c.request(r)
+ except NetlibException:
+ pass
+ return s.getvalue()
+
+
+class TestDaemonSSL(_TestDaemon):
+ ssl = True
+ ssloptions = pathod.SSLOptions(
+ request_client_cert=True,
+ sans=["test1.com", "test2.com"],
+ alpn_select=b'h2',
+ )
+
+ def test_sni(self):
+ c = pathoc.Pathoc(
+ ("127.0.0.1", self.d.port),
+ ssl=True,
+ sni="foobar.com",
+ fp=None
+ )
+ c.connect()
+ c.request("get:/p/200")
+ r = c.request("get:/api/log")
+ d = json.loads(r.content)
+ assert d["log"][0]["request"]["sni"] == "foobar.com"
+
+ def test_showssl(self):
+ assert "certificate chain" in self.tval(["get:/p/200"], showssl=True)
+
+ def test_clientcert(self):
+ c = pathoc.Pathoc(
+ ("127.0.0.1", self.d.port),
+ ssl=True,
+ clientcert=tutils.test_data.path("data/clientcert/client.pem"),
+ fp=None
+ )
+ c.connect()
+ c.request("get:/p/200")
+ r = c.request("get:/api/log")
+ d = json.loads(r.content)
+ assert d["log"][0]["request"]["clientcert"]["keyinfo"]
+
+ def test_http2_without_ssl(self):
+ fp = cStringIO.StringIO()
+ c = pathoc.Pathoc(
+ ("127.0.0.1", self.d.port),
+ use_http2=True,
+ ssl=False,
+ fp = fp
+ )
+ tutils.raises(NotImplementedError, c.connect)
+
+
+class TestDaemon(_TestDaemon):
+ ssl = False
+
+ def test_ssl_error(self):
+ c = pathoc.Pathoc(("127.0.0.1", self.d.port), ssl=True, fp=None)
+ tutils.raises("ssl handshake", c.connect)
+
+ def test_showssl(self):
+ assert not "certificate chain" in self.tval(
+ ["get:/p/200"],
+ showssl=True)
+
+ def test_ignorecodes(self):
+ assert "200" in self.tval(["get:'/p/200:b@1'"])
+ assert "200" in self.tval(["get:'/p/200:b@1'"])
+ assert "200" in self.tval(["get:'/p/200:b@1'"])
+ assert "200" not in self.tval(["get:'/p/200:b@1'"], ignorecodes=[200])
+ assert "200" not in self.tval(
+ ["get:'/p/200:b@1'"],
+ ignorecodes=[
+ 200,
+ 201])
+ assert "202" in self.tval(["get:'/p/202:b@1'"], ignorecodes=[200, 201])
+
+ def test_timeout(self):
+ assert "Timeout" in self.tval(["get:'/p/200:p0,100'"], timeout=0.01)
+ assert "HTTP" in self.tval(
+ ["get:'/p/200:p5,100'"],
+ showresp=True,
+ timeout=1
+ )
+ assert not "HTTP" in self.tval(
+ ["get:'/p/200:p3,100'"],
+ showresp=True,
+ timeout=1,
+ ignoretimeout=True
+ )
+
+ def test_showresp(self):
+ reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"]
+ assert self.tval(reqs).count("200") == 2
+ assert self.tval(reqs, showresp=True).count("HTTP/1.1 200 OK") == 2
+ assert self.tval(
+ reqs, showresp=True, hexdump=True
+ ).count("0000000000") == 2
+
+ def test_showresp_httperr(self):
+ v = self.tval(["get:'/p/200:d20'"], showresp=True, showsummary=True)
+ assert "Invalid headers" in v
+ assert "HTTP/" in v
+
+ def test_explain(self):
+ reqs = ["get:/p/200:b@100"]
+ assert "b@100" not in self.tval(reqs, explain=True)
+
+ def test_showreq(self):
+ reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"]
+ assert self.tval(reqs, showreq=True).count("GET /api") == 2
+ assert self.tval(
+ reqs, showreq=True, hexdump=True
+ ).count("0000000000") == 2
+
+ def test_conn_err(self):
+ assert "Invalid server response" in self.tval(["get:'/p/200:d2'"])
+
+ def test_websocket_shutdown(self):
+ c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
+ c.connect()
+ c.request("ws:/")
+ c.stop()
+
+ def test_wait_finish(self):
+ c = pathoc.Pathoc(
+ ("127.0.0.1", self.d.port),
+ fp=None,
+ ws_read_limit=1
+ )
+ c.connect()
+ c.request("ws:/")
+ c.request("wf:f'wf:x100'")
+ [i for i in c.wait(timeout=0, finish=False)]
+ [i for i in c.wait(timeout=0)]
+
+ def test_connect_fail(self):
+ to = ("foobar", 80)
+ c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
+ c.rfile, c.wfile = cStringIO.StringIO(), cStringIO.StringIO()
+ with raises("connect failed"):
+ c.http_connect(to)
+ c.rfile = cStringIO.StringIO(
+ "HTTP/1.1 500 OK\r\n"
+ )
+ with raises("connect failed"):
+ c.http_connect(to)
+ c.rfile = cStringIO.StringIO(
+ "HTTP/1.1 200 OK\r\n"
+ )
+ c.http_connect(to)
+
+ def test_socks_connect(self):
+ to = ("foobar", 80)
+ c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
+ c.rfile, c.wfile = tutils.treader(""), cStringIO.StringIO()
+ tutils.raises(pathoc.PathocError, c.socks_connect, to)
+
+ c.rfile = tutils.treader(
+ "\x05\xEE"
+ )
+ tutils.raises("SOCKS without authentication", c.socks_connect, ("example.com", 0xDEAD))
+
+ c.rfile = tutils.treader(
+ "\x05\x00" +
+ "\x05\xEE\x00\x03\x0bexample.com\xDE\xAD"
+ )
+ tutils.raises("SOCKS server error", c.socks_connect, ("example.com", 0xDEAD))
+
+ c.rfile = tutils.treader(
+ "\x05\x00" +
+ "\x05\x00\x00\x03\x0bexample.com\xDE\xAD"
+ )
+ c.socks_connect(("example.com", 0xDEAD))
+
+
+class TestDaemonHTTP2(_TestDaemon):
+ ssl = True
+
+ if OpenSSL._util.lib.Cryptography_HAS_ALPN:
+
+ def test_http2(self):
+ c = pathoc.Pathoc(
+ ("127.0.0.1", self.d.port),
+ fp=None,
+ ssl=True,
+ use_http2=True,
+ )
+ assert isinstance(c.protocol, http2.HTTP2Protocol)
+
+ c = pathoc.Pathoc(
+ ("127.0.0.1", self.d.port),
+ )
+ assert c.protocol == http1
+
+ def test_http2_alpn(self):
+ c = pathoc.Pathoc(
+ ("127.0.0.1", self.d.port),
+ fp=None,
+ ssl=True,
+ use_http2=True,
+ http2_skip_connection_preface=True,
+ )
+
+ tmp_convert_to_ssl = c.convert_to_ssl
+ c.convert_to_ssl = Mock()
+ c.convert_to_ssl.side_effect = tmp_convert_to_ssl
+ c.connect()
+
+ _, kwargs = c.convert_to_ssl.call_args
+ assert set(kwargs['alpn_protos']) == set([b'http/1.1', b'h2'])
+
+ def test_request(self):
+ c = pathoc.Pathoc(
+ ("127.0.0.1", self.d.port),
+ fp=None,
+ ssl=True,
+ use_http2=True,
+ )
+ c.connect()
+ resp = c.request("get:/p/200")
+ assert resp.status_code == 200
diff --git a/test/pathod/test_pathoc_cmdline.py b/test/pathod/test_pathoc_cmdline.py
new file mode 100644
index 00000000..74dfef57
--- /dev/null
+++ b/test/pathod/test_pathoc_cmdline.py
@@ -0,0 +1,59 @@
+from libpathod import pathoc_cmdline as cmdline
+import tutils
+import cStringIO
+import mock
+
+
+@mock.patch("argparse.ArgumentParser.error")
+def test_pathoc(perror):
+ assert cmdline.args_pathoc(["pathoc", "foo.com", "get:/"])
+ s = cStringIO.StringIO()
+ with tutils.raises(SystemExit):
+ cmdline.args_pathoc(["pathoc", "--show-uas"], s, s)
+
+ a = cmdline.args_pathoc(["pathoc", "foo.com:8888", "get:/"])
+ assert a.port == 8888
+
+ a = cmdline.args_pathoc(["pathoc", "foo.com:xxx", "get:/"])
+ assert perror.called
+ perror.reset_mock()
+
+ a = cmdline.args_pathoc(["pathoc", "-I", "10, 20", "foo.com:8888", "get:/"])
+ assert a.ignorecodes == [10, 20]
+
+ a = cmdline.args_pathoc(["pathoc", "-I", "xx, 20", "foo.com:8888", "get:/"])
+ assert perror.called
+ perror.reset_mock()
+
+ a = cmdline.args_pathoc(["pathoc", "-c", "foo:10", "foo.com:8888", "get:/"])
+ assert a.connect_to == ["foo", 10]
+
+ a = cmdline.args_pathoc(["pathoc", "foo.com", "get:/", "--http2"])
+ assert a.use_http2 == True
+ assert a.ssl == True
+
+ a = cmdline.args_pathoc(["pathoc", "foo.com", "get:/", "--http2-skip-connection-preface"])
+ assert a.use_http2 == True
+ assert a.ssl == True
+ assert a.http2_skip_connection_preface == True
+
+ a = cmdline.args_pathoc(["pathoc", "-c", "foo", "foo.com:8888", "get:/"])
+ assert perror.called
+ perror.reset_mock()
+
+ a = cmdline.args_pathoc(
+ ["pathoc", "-c", "foo:bar", "foo.com:8888", "get:/"])
+ assert perror.called
+ perror.reset_mock()
+
+ a = cmdline.args_pathoc(
+ [
+ "pathoc",
+ "foo.com:8888",
+ tutils.test_data.path("data/request")
+ ]
+ )
+ assert len(list(a.requests)) == 1
+
+ with tutils.raises(SystemExit):
+ cmdline.args_pathoc(["pathoc", "foo.com", "invalid"], s, s)
diff --git a/test/pathod/test_pathod.py b/test/pathod/test_pathod.py
new file mode 100644
index 00000000..98da7d28
--- /dev/null
+++ b/test/pathod/test_pathod.py
@@ -0,0 +1,289 @@
+import sys
+import cStringIO
+import OpenSSL
+
+from libpathod import pathod, version
+from netlib import tcp, http
+from netlib.exceptions import HttpException, TlsException
+import tutils
+
+
+class TestPathod(object):
+
+ def test_logging(self):
+ s = cStringIO.StringIO()
+ p = pathod.Pathod(("127.0.0.1", 0), logfp=s)
+ assert len(p.get_log()) == 0
+ id = p.add_log(dict(s="foo"))
+ assert p.log_by_id(id)
+ assert len(p.get_log()) == 1
+ p.clear_log()
+ assert len(p.get_log()) == 0
+
+ for _ in range(p.LOGBUF + 1):
+ p.add_log(dict(s="foo"))
+ assert len(p.get_log()) <= p.LOGBUF
+
+
+class TestNoWeb(tutils.DaemonTests):
+ noweb = True
+
+ def test_noweb(self):
+ assert self.get("200:da").status_code == 200
+ assert self.getpath("/").status_code == 800
+
+
+class TestTimeout(tutils.DaemonTests):
+ timeout = 0.01
+
+ def test_noweb(self):
+ # FIXME: Add float values to spec language, reduce test timeout to
+ # increase test performance
+ # This is a bodge - we have some platform difference that causes
+ # different exceptions to be raised here.
+ tutils.raises(Exception, self.pathoc, ["get:/:p1,1"])
+ assert self.d.last_log()["type"] == "timeout"
+
+
+class TestNoApi(tutils.DaemonTests):
+ noapi = True
+
+ def test_noapi(self):
+ assert self.getpath("/log").status_code == 404
+ r = self.getpath("/")
+ assert r.status_code == 200
+ assert not "Log" in r.content
+
+
+class TestNotAfterConnect(tutils.DaemonTests):
+ ssl = False
+ ssloptions = dict(
+ not_after_connect=True
+ )
+
+ def test_connect(self):
+ r, _ = self.pathoc(
+ [r"get:'http://foo.com/p/202':da"],
+ connect_to=("localhost", self.d.port)
+ )
+ assert r[0].status_code == 202
+
+
+class TestCustomCert(tutils.DaemonTests):
+ ssl = True
+ ssloptions = dict(
+ certs=[("*", tutils.test_data.path("data/testkey.pem"))],
+ )
+
+ def test_connect(self):
+ r, _ = self.pathoc([r"get:/p/202"])
+ r = r[0]
+ assert r.status_code == 202
+ assert r.sslinfo
+ assert "test.com" in str(r.sslinfo.certchain[0].get_subject())
+
+
+class TestSSLCN(tutils.DaemonTests):
+ ssl = True
+ ssloptions = dict(
+ cn="foo.com"
+ )
+
+ def test_connect(self):
+ r, _ = self.pathoc([r"get:/p/202"])
+ r = r[0]
+ assert r.status_code == 202
+ assert r.sslinfo
+ assert r.sslinfo.certchain[0].get_subject().CN == "foo.com"
+
+
+class TestNohang(tutils.DaemonTests):
+ nohang = True
+
+ def test_nohang(self):
+ r = self.get("200:p0,0")
+ assert r.status_code == 800
+ l = self.d.last_log()
+ assert "Pauses have been disabled" in l["response"]["msg"]
+
+
+class TestHexdump(tutils.DaemonTests):
+ hexdump = True
+
+ def test_hexdump(self):
+ r = self.get(r"200:b'\xf0'")
+
+
+class TestNocraft(tutils.DaemonTests):
+ nocraft = True
+
+ def test_nocraft(self):
+ r = self.get(r"200:b'\xf0'")
+ assert r.status_code == 800
+ assert "Crafting disabled" in r.content
+
+
+class CommonTests(tutils.DaemonTests):
+
+ def test_binarydata(self):
+ r = self.get(r"200:b'\xf0'")
+ l = self.d.last_log()
+ # FIXME: Other binary data elements
+
+ def test_sizelimit(self):
+ r = self.get("200:b@1g")
+ assert r.status_code == 800
+ l = self.d.last_log()
+ assert "too large" in l["response"]["msg"]
+
+ def test_preline(self):
+ r, _ = self.pathoc([r"get:'/p/200':i0,'\r\n'"])
+ assert r[0].status_code == 200
+
+ def test_info(self):
+ assert tuple(self.d.info()["version"]) == version.IVERSION
+
+ def test_logs(self):
+ assert self.d.clear_log()
+ assert not self.d.last_log()
+ rsp = self.get("202:da")
+ assert len(self.d.log()) == 1
+ assert self.d.clear_log()
+ assert len(self.d.log()) == 0
+
+ def test_disconnect(self):
+ rsp = self.get("202:b@100k:d200")
+ assert len(rsp.content) < 200
+
+ def test_parserr(self):
+ rsp = self.get("400:msg,b:")
+ assert rsp.status_code == 800
+
+ def test_static(self):
+ rsp = self.get("200:b<file")
+ assert rsp.status_code == 200
+ assert rsp.content.strip() == "testfile"
+
+ def test_anchor(self):
+ rsp = self.getpath("anchor/foo")
+ assert rsp.status_code == 202
+
+ def test_invalid_first_line(self):
+ c = tcp.TCPClient(("localhost", self.d.port))
+ c.connect()
+ if self.ssl:
+ c.convert_to_ssl()
+ c.wfile.write("foo\n\n\n")
+ c.wfile.flush()
+ l = self.d.last_log()
+ assert l["type"] == "error"
+ assert "foo" in l["msg"]
+
+ def test_invalid_content_length(self):
+ tutils.raises(
+ HttpException,
+ self.pathoc,
+ ["get:/:h'content-length'='foo'"]
+ )
+ l = self.d.last_log()
+ assert l["type"] == "error"
+ assert "Unparseable Content Length" in l["msg"]
+
+ def test_invalid_headers(self):
+ tutils.raises(HttpException, self.pathoc, ["get:/:h'\t'='foo'"])
+ l = self.d.last_log()
+ assert l["type"] == "error"
+ assert "Invalid headers" in l["msg"]
+
+ def test_access_denied(self):
+ rsp = self.get("=nonexistent")
+ assert rsp.status_code == 800
+
+ def test_source_access_denied(self):
+ rsp = self.get("200:b</foo")
+ assert rsp.status_code == 800
+ assert "File access denied" in rsp.content
+
+ def test_proxy(self):
+ r, _ = self.pathoc([r"get:'http://foo.com/p/202':da"])
+ assert r[0].status_code == 202
+
+ def test_websocket(self):
+ r, _ = self.pathoc(["ws:/p/"], ws_read_limit=0)
+ assert r[0].status_code == 101
+
+ r, _ = self.pathoc(["ws:/p/ws"], ws_read_limit=0)
+ assert r[0].status_code == 101
+
+ def test_websocket_frame(self):
+ r, _ = self.pathoc(
+ ["ws:/p/", "wf:f'wf:b\"test\"':pa,1"],
+ ws_read_limit=1
+ )
+ assert r[1].payload == "test"
+
+ def test_websocket_frame_reflect_error(self):
+ r, _ = self.pathoc(
+ ["ws:/p/", "wf:-mask:knone:f'wf:b@10':i13,'a'"],
+ ws_read_limit=1,
+ timeout=1
+ )
+ # FIXME: Race Condition?
+ assert "Parse error" in self.d.text_log()
+
+ def test_websocket_frame_disconnect_error(self):
+ self.pathoc(["ws:/p/", "wf:b@10:d3"], ws_read_limit=0)
+ assert self.d.last_log()
+
+
+class TestDaemon(CommonTests):
+ ssl = False
+
+ def test_connect(self):
+ r, _ = self.pathoc(
+ [r"get:'http://foo.com/p/202':da"],
+ connect_to=("localhost", self.d.port),
+ ssl=True
+ )
+ assert r[0].status_code == 202
+
+ def test_connect_err(self):
+ tutils.raises(
+ HttpException,
+ self.pathoc,
+ [r"get:'http://foo.com/p/202':da"],
+ connect_to=("localhost", self.d.port)
+ )
+
+
+class TestDaemonSSL(CommonTests):
+ ssl = True
+
+ def test_ssl_conn_failure(self):
+ c = tcp.TCPClient(("localhost", self.d.port))
+ c.rbufsize = 0
+ c.wbufsize = 0
+ c.connect()
+ c.wfile.write("\0\0\0\0")
+ tutils.raises(TlsException, c.convert_to_ssl)
+ l = self.d.last_log()
+ assert l["type"] == "error"
+ assert "SSL" in l["msg"]
+
+ def test_ssl_cipher(self):
+ r, _ = self.pathoc([r"get:/p/202"])
+ assert r[0].status_code == 202
+ assert self.d.last_log()["cipher"][1] > 0
+
+
+class TestHTTP2(tutils.DaemonTests):
+ ssl = True
+ noweb = True
+ noapi = True
+ nohang = True
+
+ if OpenSSL._util.lib.Cryptography_HAS_ALPN:
+
+ def test_http2(self):
+ r, _ = self.pathoc(["GET:/"], ssl=True, use_http2=True)
+ assert r[0].status_code == 800
diff --git a/test/pathod/test_pathod_cmdline.py b/test/pathod/test_pathod_cmdline.py
new file mode 100644
index 00000000..829c4b32
--- /dev/null
+++ b/test/pathod/test_pathod_cmdline.py
@@ -0,0 +1,85 @@
+from libpathod import pathod_cmdline as cmdline
+import tutils
+import cStringIO
+import mock
+
+
+@mock.patch("argparse.ArgumentParser.error")
+def test_pathod(perror):
+ assert cmdline.args_pathod(["pathod"])
+
+ a = cmdline.args_pathod(
+ [
+ "pathod",
+ "--cert",
+ tutils.test_data.path("data/testkey.pem")
+ ]
+ )
+ assert a.ssl_certs
+
+ a = cmdline.args_pathod(
+ [
+ "pathod",
+ "--cert",
+ "nonexistent"
+ ]
+ )
+ assert perror.called
+ perror.reset_mock()
+
+ a = cmdline.args_pathod(
+ [
+ "pathod",
+ "-a",
+ "foo=200"
+ ]
+ )
+ assert a.anchors
+
+ a = cmdline.args_pathod(
+ [
+ "pathod",
+ "-a",
+ "foo=" + tutils.test_data.path("data/response")
+ ]
+ )
+ assert a.anchors
+
+ a = cmdline.args_pathod(
+ [
+ "pathod",
+ "-a",
+ "?=200"
+ ]
+ )
+ assert perror.called
+ perror.reset_mock()
+
+ a = cmdline.args_pathod(
+ [
+ "pathod",
+ "-a",
+ "foo"
+ ]
+ )
+ assert perror.called
+ perror.reset_mock()
+
+ a = cmdline.args_pathod(
+ [
+ "pathod",
+ "--limit-size",
+ "200k"
+ ]
+ )
+ assert a.sizelimit
+
+ a = cmdline.args_pathod(
+ [
+ "pathod",
+ "--limit-size",
+ "q"
+ ]
+ )
+ assert perror.called
+ perror.reset_mock()
diff --git a/test/pathod/test_test.py b/test/pathod/test_test.py
new file mode 100644
index 00000000..bd92d864
--- /dev/null
+++ b/test/pathod/test_test.py
@@ -0,0 +1,45 @@
+import logging
+import requests
+from libpathod import test
+import tutils
+logging.disable(logging.CRITICAL)
+
+
+class TestDaemonManual:
+
+ def test_simple(self):
+ with test.Daemon() as d:
+ rsp = requests.get("http://localhost:%s/p/202:da" % d.port)
+ assert rsp.ok
+ assert rsp.status_code == 202
+ with tutils.raises(requests.ConnectionError):
+ requests.get("http://localhost:%s/p/202:da" % d.port)
+
+ def test_startstop_ssl(self):
+ d = test.Daemon(ssl=True)
+ rsp = requests.get(
+ "https://localhost:%s/p/202:da" %
+ d.port,
+ verify=False)
+ assert rsp.ok
+ assert rsp.status_code == 202
+ d.shutdown()
+ with tutils.raises(requests.ConnectionError):
+ requests.get("http://localhost:%s/p/202:da" % d.port)
+
+ def test_startstop_ssl_explicit(self):
+ ssloptions = dict(
+ certfile=tutils.test_data.path("data/testkey.pem"),
+ cacert=tutils.test_data.path("data/testkey.pem"),
+ ssl_after_connect=False
+ )
+ d = test.Daemon(ssl=ssloptions)
+ rsp = requests.get(
+ "https://localhost:%s/p/202:da" %
+ d.port,
+ verify=False)
+ assert rsp.ok
+ assert rsp.status_code == 202
+ d.shutdown()
+ with tutils.raises(requests.ConnectionError):
+ requests.get("http://localhost:%s/p/202:da" % d.port)
diff --git a/test/pathod/test_utils.py b/test/pathod/test_utils.py
new file mode 100644
index 00000000..7d24e9e4
--- /dev/null
+++ b/test/pathod/test_utils.py
@@ -0,0 +1,39 @@
+from libpathod import utils
+import tutils
+
+
+def test_membool():
+ m = utils.MemBool()
+ assert not m.v
+ assert m(1)
+ assert m.v == 1
+ assert m(2)
+ assert m.v == 2
+
+
+def test_parse_size():
+ assert utils.parse_size("100") == 100
+ assert utils.parse_size("100k") == 100 * 1024
+ tutils.raises("invalid size spec", utils.parse_size, "foo")
+ tutils.raises("invalid size spec", utils.parse_size, "100kk")
+
+
+def test_parse_anchor_spec():
+ assert utils.parse_anchor_spec("foo=200") == ("foo", "200")
+ assert utils.parse_anchor_spec("foo") is None
+
+
+def test_data_path():
+ tutils.raises(ValueError, utils.data.path, "nonexistent")
+
+
+def test_inner_repr():
+ assert utils.inner_repr("\x66") == "\x66"
+ assert utils.inner_repr(u"foo") == "foo"
+
+
+def test_escape_unprintables():
+ s = "".join([chr(i) for i in range(255)])
+ e = utils.escape_unprintables(s)
+ assert e.encode('ascii')
+ assert not "PATHOD_MARKER" in e
diff --git a/test/pathod/tutils.py b/test/pathod/tutils.py
new file mode 100644
index 00000000..664cdd52
--- /dev/null
+++ b/test/pathod/tutils.py
@@ -0,0 +1,128 @@
+import tempfile
+import os
+import re
+import shutil
+import cStringIO
+from contextlib import contextmanager
+
+import netlib
+from libpathod import utils, test, pathoc, pathod, language
+from netlib import tcp
+import requests
+
+def treader(bytes):
+ """
+ Construct a tcp.Read object from bytes.
+ """
+ fp = cStringIO.StringIO(bytes)
+ return tcp.Reader(fp)
+
+
+class DaemonTests(object):
+ noweb = False
+ noapi = False
+ nohang = False
+ ssl = False
+ timeout = None
+ hexdump = False
+ ssloptions = None
+ nocraft = False
+
+ @classmethod
+ def setup_class(cls):
+ opts = cls.ssloptions or {}
+ cls.confdir = tempfile.mkdtemp()
+ opts["confdir"] = cls.confdir
+ so = pathod.SSLOptions(**opts)
+ cls.d = test.Daemon(
+ staticdir=test_data.path("data"),
+ anchors=[
+ (re.compile("/anchor/.*"), "202:da")
+ ],
+ ssl=cls.ssl,
+ ssloptions=so,
+ sizelimit=1 * 1024 * 1024,
+ noweb=cls.noweb,
+ noapi=cls.noapi,
+ nohang=cls.nohang,
+ timeout=cls.timeout,
+ hexdump=cls.hexdump,
+ nocraft=cls.nocraft,
+ logreq=True,
+ logresp=True,
+ explain=True
+ )
+
+ @classmethod
+ def teardown_class(cls):
+ cls.d.shutdown()
+ shutil.rmtree(cls.confdir)
+
+ def teardown(self):
+ if not (self.noweb or self.noapi):
+ self.d.clear_log()
+
+ def getpath(self, path, params=None):
+ scheme = "https" if self.ssl else "http"
+ resp = requests.get(
+ "%s://localhost:%s/%s" % (
+ scheme,
+ self.d.port,
+ path
+ ),
+ verify=False,
+ params=params
+ )
+ return resp
+
+ def get(self, spec):
+ resp = requests.get(self.d.p(spec), verify=False)
+ return resp
+
+ def pathoc(
+ self,
+ specs,
+ timeout=None,
+ connect_to=None,
+ ssl=None,
+ ws_read_limit=None,
+ use_http2=False,
+ ):
+ """
+ Returns a (messages, text log) tuple.
+ """
+ if ssl is None:
+ ssl = self.ssl
+ logfp = cStringIO.StringIO()
+ c = pathoc.Pathoc(
+ ("localhost", self.d.port),
+ ssl=ssl,
+ ws_read_limit=ws_read_limit,
+ timeout=timeout,
+ fp=logfp,
+ use_http2=use_http2,
+ )
+ c.connect(connect_to)
+ ret = []
+ for i in specs:
+ resp = c.request(i)
+ if resp:
+ ret.append(resp)
+ for frm in c.wait():
+ ret.append(frm)
+ c.stop()
+ return ret, logfp.getvalue()
+
+
+tmpdir = netlib.tutils.tmpdir
+
+raises = netlib.tutils.raises
+
+test_data = utils.Data(__name__)
+
+
+def render(r, settings=language.Settings()):
+ r = r.resolve(settings)
+ s = cStringIO.StringIO()
+ assert language.serve(r, s, settings)
+ return s.getvalue()