aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/data/clientcert/.gitignore3
-rw-r--r--test/data/clientcert/client.cnf5
-rw-r--r--test/data/clientcert/client.pem42
-rwxr-xr-xtest/data/clientcert/make8
-rw-r--r--test/data/file1
-rw-r--r--test/data/request1
-rw-r--r--test/data/response1
-rw-r--r--test/data/testkey.pem68
-rwxr-xr-xtest/scripts/generate.sh17
-rw-r--r--test/scripts/openssl.cnf39
-rw-r--r--test/test_app.py85
-rw-r--r--test/test_language_actions.py135
-rw-r--r--test/test_language_base.py352
-rw-r--r--test/test_language_generators.py42
-rw-r--r--test/test_language_http.py358
-rw-r--r--test/test_language_http2.py233
-rw-r--r--test/test_language_websocket.py142
-rw-r--r--test/test_language_writer.py91
-rw-r--r--test/test_log.py25
-rw-r--r--test/test_pathoc.py308
-rw-r--r--test/test_pathoc_cmdline.py59
-rw-r--r--test/test_pathod.py289
-rw-r--r--test/test_pathod_cmdline.py85
-rw-r--r--test/test_test.py45
-rw-r--r--test/test_utils.py39
-rw-r--r--test/tutils.py128
26 files changed, 0 insertions, 2601 deletions
diff --git a/test/data/clientcert/.gitignore b/test/data/clientcert/.gitignore
deleted file mode 100644
index 07bc53d2..00000000
--- a/test/data/clientcert/.gitignore
+++ /dev/null
@@ -1,3 +0,0 @@
-client.crt
-client.key
-client.req
diff --git a/test/data/clientcert/client.cnf b/test/data/clientcert/client.cnf
deleted file mode 100644
index 5046a944..00000000
--- a/test/data/clientcert/client.cnf
+++ /dev/null
@@ -1,5 +0,0 @@
-[ ssl_client ]
-basicConstraints = CA:FALSE
-nsCertType = client
-keyUsage = digitalSignature, keyEncipherment
-extendedKeyUsage = clientAuth
diff --git a/test/data/clientcert/client.pem b/test/data/clientcert/client.pem
deleted file mode 100644
index 4927bca2..00000000
--- a/test/data/clientcert/client.pem
+++ /dev/null
@@ -1,42 +0,0 @@
------BEGIN RSA PRIVATE KEY-----
-MIIEpAIBAAKCAQEAzCpoRjSTfIN24kkNap/GYmP9zVWj0Gk8R5BB/PvvN0OB1Zk0
-EEYPsWCcuhEdK0ehiDZX030doF0DOncKKa6mop/d0x2o+ts42peDhZM6JNUrm6d+
-ZWQVtio33mpp77UMhR093vaA+ExDnmE26kBTVijJ1+fRAVDXG/cmQINEri91Kk/G
-3YJ5e45UrohGI5seBZ4vV0xbHtmczFRhYFlGOvYsoIe4Lvz/eFS2pIrTIpYQ2VM/
-SQQl+JFy+NlQRsWG2NrxtKOzMnnDE7YN4I3z5D5eZFo1EtwZ48LNCeSwrEOdfuzP
-G5q5qbs5KpE/x85H9umuRwSCIArbMwBYV8a8JwIDAQABAoIBAFE3FV/IDltbmHEP
-iky93hbJm+6QgKepFReKpRVTyqb7LaygUvueQyPWQMIriKTsy675nxo8DQr7tQsO
-y3YlSZgra/xNMikIB6e82c7K8DgyrDQw/rCqjZB3Xt4VCqsWJDLXnQMSn98lx0g7
-d7Lbf8soUpKWXqfdVpSDTi4fibSX6kshXyfSTpcz4AdoncEpViUfU1xkEEmZrjT8
-1GcCsDC41xdNmzCpqRuZX7DKSFRoB+0hUzsC1oiqM7FD5kixonRd4F5PbRXImIzt
-6YCsT2okxTA04jX7yByis7LlOLTlkmLtKQYuc3erOFvwx89s4vW+AeFei+GGNitn
-tHfSwbECgYEA7SzV+nN62hAERHlg8cEQT4TxnsWvbronYWcc/ev44eHSPDWL5tPi
-GHfSbW6YAq5Wa0I9jMWfXyhOYEC3MZTC5EEeLOB71qVrTwcy/sY66rOrcgjFI76Q
-5JFHQ4wy3SWU50KxE0oWJO9LIowprG+pW1vzqC3VF0T7q0FqESrY4LUCgYEA3F7Z
-80ndnCUlooJAb+Hfotv7peFf1o6+m1PTRcz1lLnVt5R5lXj86kn+tXEpYZo1RiGR
-2rE2N0seeznWCooakHcsBN7/qmFIhhooJNF7yW+JP2I4P2UV5+tJ+8bcs/voUkQD
-1x+rGOuMn8nvHBd2+Vharft8eGL2mgooPVI2XusCgYEAlMZpO3+w8pTVeHaDP2MR
-7i/AuQ3cbCLNjSX3Y7jgGCFllWspZRRIYXzYPNkA9b2SbBnTLjjRLgnEkFBIGgvs
-7O2EFjaCuDRvydUEQhjq4ErwIsopj7B8h0QyZcbOKTbn3uFQ3n68wVJx2Sv/ADHT
-FIHrp/WIE96r19Niy34LKXkCgYB2W59VsuOKnMz01l5DeR5C+0HSWxS9SReIl2IO
-yEFSKullWyJeLIgyUaGy0990430feKI8whcrZXYumuah7IDN/KOwzhCk8vEfzWao
-N7bzfqtJVrh9HA7C7DVlO+6H4JFrtcoWPZUIomJ549w/yz6EN3ckoMC+a/Ck1TW9
-ka1QFwKBgQCywG6TrZz0UmOjyLQZ+8Q4uvZklSW5NAKBkNnyuQ2kd5rzyYgMPE8C
-Er8T88fdVIKvkhDyHhwcI7n58xE5Gr7wkwsrk/Hbd9/ZB2GgAPY3cATskK1v1McU
-YeX38CU0fUS4aoy26hWQXkViB47IGQ3jWo3ZCtzIJl8DI9/RsBWTnw==
------END RSA PRIVATE KEY-----
------BEGIN CERTIFICATE-----
-MIICYDCCAckCAQEwDQYJKoZIhvcNAQEFBQAwKDESMBAGA1UEAxMJbWl0bXByb3h5
-MRIwEAYDVQQKEwltaXRtcHJveHkwHhcNMTMwMTIwMDEwODEzWhcNMTUxMDE3MDEw
-ODEzWjBFMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UE
-ChMYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOC
-AQ8AMIIBCgKCAQEAzCpoRjSTfIN24kkNap/GYmP9zVWj0Gk8R5BB/PvvN0OB1Zk0
-EEYPsWCcuhEdK0ehiDZX030doF0DOncKKa6mop/d0x2o+ts42peDhZM6JNUrm6d+
-ZWQVtio33mpp77UMhR093vaA+ExDnmE26kBTVijJ1+fRAVDXG/cmQINEri91Kk/G
-3YJ5e45UrohGI5seBZ4vV0xbHtmczFRhYFlGOvYsoIe4Lvz/eFS2pIrTIpYQ2VM/
-SQQl+JFy+NlQRsWG2NrxtKOzMnnDE7YN4I3z5D5eZFo1EtwZ48LNCeSwrEOdfuzP
-G5q5qbs5KpE/x85H9umuRwSCIArbMwBYV8a8JwIDAQABMA0GCSqGSIb3DQEBBQUA
-A4GBAFvI+cd47B85PQ970n2dU/PlA2/Hb1ldrrXh2guR4hX6vYx/uuk5yRI/n0Rd
-KOXJ3czO0bd2Fpe3ZoNpkW0pOSDej/Q+58ScuJd0gWCT/Sh1eRk6ZdC0kusOuWoY
-bPOPMkG45LPgUMFOnZEsfJP6P5mZIxlbCvSMFC25nPHWlct7
------END CERTIFICATE-----
diff --git a/test/data/clientcert/make b/test/data/clientcert/make
deleted file mode 100755
index d1caea81..00000000
--- a/test/data/clientcert/make
+++ /dev/null
@@ -1,8 +0,0 @@
-#!/bin/sh
-
-openssl genrsa -out client.key 2048
-openssl req -key client.key -new -out client.req
-openssl x509 -req -days 365 -in client.req -signkey client.key -out client.crt -extfile client.cnf -extensions ssl_client
-openssl x509 -req -days 1000 -in client.req -CA ~/.mitmproxy/mitmproxy-ca.pem -CAkey ~/.mitmproxy/mitmproxy-ca.pem -set_serial 00001 -out client.crt -extensions ssl_client
-cat client.key client.crt > client.pem
-openssl x509 -text -noout -in client.pem
diff --git a/test/data/file b/test/data/file
deleted file mode 100644
index 26918572..00000000
--- a/test/data/file
+++ /dev/null
@@ -1 +0,0 @@
-testfile
diff --git a/test/data/request b/test/data/request
deleted file mode 100644
index c4c90e76..00000000
--- a/test/data/request
+++ /dev/null
@@ -1 +0,0 @@
-get:/foo
diff --git a/test/data/response b/test/data/response
deleted file mode 100644
index 8f897c85..00000000
--- a/test/data/response
+++ /dev/null
@@ -1 +0,0 @@
-202
diff --git a/test/data/testkey.pem b/test/data/testkey.pem
deleted file mode 100644
index b804bd4c..00000000
--- a/test/data/testkey.pem
+++ /dev/null
@@ -1,68 +0,0 @@
------BEGIN RSA PRIVATE KEY-----
-MIIG5QIBAAKCAYEAwvtKxoZvBV2AxPAkCx8PXbuE7KeqK9bBvk8x+JchPMdf/KZj
-sdu2v6Gm8Hi053i7ZGxouFvonJxHAiK6cwk9OYQwa9fbOFf2mgWKEBO4fbCH93tW
-DCTdWVxFyNViAvxGHlJs3/IU03pIG29AgUnhRW8pGbabAfx8emcOZJZ3ykEuimaC
-4s7mRwdc63GXnbcjTtRkrJsBATI+xvPwuR2+4daX7sPCf0kel3bN2jMpwXfvk/Ww
-kJ2BIEeZCg0qIvyMjH9qrUirUnsmQnpPln0CGBbQEBsW9yMfGoFdREiMYls5jZeq
-NxjWNv1RTRIm/4RjMwyxnoTA9eDS9wwO2NnJS4vfXAnUTP4BYx8Pe4ZMA2Gm6YrC
-ysT6YA1xdHNpcuHXClxwmPj/cm8Z5kIg5clbNIK60ts9yFr/Ao3KPPYJ2GBv8/Oe
-ApPBJuubews+/9/13Ew/SJ1t2u28+sPbgXUG8dC2n4vWTvJwKf6Duqxgnm82zdzj
-SZoXRQsP984qiN7NAgMBAAECggGBALB6rqWdzCL5DLI0AQun40qdjaR95UKksNvF
-5p7we379nl2ZZKb5DSHJ+MWzG1pfJo2wqeAkIBiQQp0mPcgdVrMWeJVD3QHUbDng
-RaRjlRr+izJvCeUYANj+8ZLjwECfgf+z7yOLg1oeVeGvAp2C90jXYkYJx6c2lpxb
-ZuWYY3hHIw7V1iXfywIDIhFg0TBJMMYK68xmx7QDfFqrNPj4eWsDxqSvvv1iezPw
-rkWPBX49RjWPrW5XgSZsZ5J3c+oS1rZmIY7EAgopTWB/3wJjZR1Idz/9l9LIWlBP
-6zVC27CIZzSEeGguqNVeyzJ0TPWh5idYNRmSZr6eTUF0245LNO/gqvWKgRSNIZko
-HoBa2F1AvCiB67S1kxjwS5y3VkudZE4jkgGKcC2Ws/9QmOZ0HAsjI8VAMp2hj6iN
-0HdPMTNtsLgbhKyXsoZuW4YmwfSTPxGi2gvcI7GUozpTz84n1cOneJnz1ygx6Uru
-v8DpQg+VX6xTy4X6AK1F8OYNMZ/jaQKBwQDv30NevQStnGbTmcSr+0fd4rmWFklK
-V6B2X7zWynVpSGvCWkqVSp3mG6aiZItAltVMRL/9LT6zIheyClyd+vXIjR2+W210
-XMxrvz7A8qCXkvB2dyEhrMdCfZ7p8+kf+eD2c/Mnxb7VpmDfHYLx30JeQoBwjrwU
-Eul+dE1P+r8bWBaLTjlsipTya74yItWWAToXAo+s1BXBtXhEsLoe4FghlC0u724d
-ucjDaeICdLcerApdvg6Q6p4kVHaoF6ka6I8CgcEA0Bdc05ery9gLC6CclV+BhA5Q
-dfDq2P7qhc7e1ipwNRrQo2gy5HhgOkTL3dJWc+8rV6CBP/JfchnsW40tDOnPCTLT
-gg3n7vv3RHrtncApXuhIFR+B5xjohTPBzxRUMiAOre2d0F5b6eBXFjptf/1i2tQ+
-qdqJoyOGOZP0hKVslGIfz+CKc6WEkIqX7c91Msdr5myeaWDI5TsurfuKRBH395T3
-BMAi6oinAAEb1rdySenLO2A/0kVmBVlTpaN3TNjjAoHBAMvS4uQ1qSv8okNbfgrF
-UqPwa9JkzZImM2tinovFLU9xAl/7aTTCWrmU9Vs4JDuV71kHcjwnngeJCKl4tIpp
-HUB06Lk/5xnhYLKNpz087cjeSwXe5IBA2HBfXhFd+NH6+nVwwUUieq4A2n+8C/CK
-zVJbH9iE8Lv99fpFyQwU/R63EzD8Hz9j4ny7oLnpb6QvFrVGr98jt/kJwlBb+0sR
-RtIBnwMq4F7R5w5lgm6jzpZ5ibVuMeJh+k7Ulp7uu/rpcQKBwQDE3sWIvf7f7PaO
-OpbJz0CmYjCHVLWrNIlGrPAv6Jid9U+cuXEkrCpGFl5V77CxIH59+bEuga0BMztl
-ZkxP4khoqHhom6VpeWJ3nGGAFJRPYS0JJvTsYalilBPxSYdaoO+iZ6MdxpfozcE2
-m3KLW3uSEqlyYvpCqNJNWQhGEoeGXstADWyPevHPGgAhElwL/ZW8u9inU9Tc4sAI
-BGnMer+BsaJ+ERU3lK+Clony+z2aZiFLfIUE93lM6DT2CZBN2QcCgcAVk4L0bfA6
-HFnP/ZWNlnYWpOVFKcq57PX+J5/k7Tf34e2cYM2P0eqYggWZbzVd8qoCOQCHrAx0
-aZSSvEyKAVvzRNeqbm1oXaMojksMnrSX5henHjPbZlr1EmM7+zMnSTMkfVOx/6g1
-97sASej31XdOAgKCBJGymrwvYrCLW+P5cHqd+D8v/PvfpRIQM54p5ixRt3EYZvtR
-zGrzsr0OGyOLZtj1DB0a3kvajAAOCl3TawJSzviKo2mwc+/xj28MCQM=
------END RSA PRIVATE KEY-----
------BEGIN CERTIFICATE-----
-MIIE4TCCA0mgAwIBAgIJALONCAWZxPhUMA0GCSqGSIb3DQEBCwUAMEExCzAJBgNV
-BAYTAk5aMQ4wDAYDVQQIDAVPdGFnbzEPMA0GA1UECgwGUGF0aG9kMREwDwYDVQQD
-DAh0ZXN0LmNvbTAeFw0xNTA0MTgyMjA0NTNaFw00MjA5MDIyMjA0NTNaMEExCzAJ
-BgNVBAYTAk5aMQ4wDAYDVQQIDAVPdGFnbzEPMA0GA1UECgwGUGF0aG9kMREwDwYD
-VQQDDAh0ZXN0LmNvbTCCAaIwDQYJKoZIhvcNAQEBBQADggGPADCCAYoCggGBAML7
-SsaGbwVdgMTwJAsfD127hOynqivWwb5PMfiXITzHX/ymY7Hbtr+hpvB4tOd4u2Rs
-aLhb6JycRwIiunMJPTmEMGvX2zhX9poFihATuH2wh/d7Vgwk3VlcRcjVYgL8Rh5S
-bN/yFNN6SBtvQIFJ4UVvKRm2mwH8fHpnDmSWd8pBLopmguLO5kcHXOtxl523I07U
-ZKybAQEyPsbz8LkdvuHWl+7Dwn9JHpd2zdozKcF375P1sJCdgSBHmQoNKiL8jIx/
-aq1Iq1J7JkJ6T5Z9AhgW0BAbFvcjHxqBXURIjGJbOY2XqjcY1jb9UU0SJv+EYzMM
-sZ6EwPXg0vcMDtjZyUuL31wJ1Ez+AWMfD3uGTANhpumKwsrE+mANcXRzaXLh1wpc
-cJj4/3JvGeZCIOXJWzSCutLbPcha/wKNyjz2Cdhgb/PzngKTwSbrm3sLPv/f9dxM
-P0idbdrtvPrD24F1BvHQtp+L1k7ycCn+g7qsYJ5vNs3c40maF0ULD/fOKojezQID
-AQABo4HbMIHYMAsGA1UdDwQEAwIFoDAdBgNVHQ4EFgQUbEgfTauEqEP/bnBtby1K
-bihJvcswcQYDVR0jBGowaIAUbEgfTauEqEP/bnBtby1KbihJvcuhRaRDMEExCzAJ
-BgNVBAYTAk5aMQ4wDAYDVQQIDAVPdGFnbzEPMA0GA1UECgwGUGF0aG9kMREwDwYD
-VQQDDAh0ZXN0LmNvbYIJALONCAWZxPhUMAwGA1UdEwQFMAMBAf8wKQYDVR0RBCIw
-IIIIdGVzdC5jb22CCXRlc3QyLmNvbYIJdGVzdDMuY29tMA0GCSqGSIb3DQEBCwUA
-A4IBgQBcTedXtUb91DxQRtg73iomz7cQ4niZntUBW8iE5rpoA7prtQNGHMCbHwaX
-tbWFkzBmL5JTBWvd/6AQ2LtiB3rYB3W/iRhbpsNJ501xaoOguPEQ9720Ph8TEveM
-208gNzGsEOcNALwyXj2y9M19NGu9zMa8eu1Tc3IsQaVaGKHx8XZn5HTNUx8EdcwI
-Z/Ji9ETDCL7+e5INv0tqfFSazWaQUwxM4IzPMkKTYRcMuN/6eog609k9r9pp32Ut
-rKlzc6GIkAlgJJ0Wkoz1V46DmJNJdJG7eLu/mtsB85j6hytIQeWTf1fll5YnMZLF
-HgNZtfYn8Q0oTdBQ0ZOaZeQCfZ8emYBdLJf2YB83uGRMjQ1FoeIxzQqiRq8WHRdb
-9Q45i0DINMnNp0DbLMA4numZ7wT9SQb6sql9eUyuCNDw7nGIWTHUNfLtU1Er3h1d
-icJuApx9+//UN/pGh0yTXb3fZbiI4IehRmkpnIWonIAwaVGm6JZU04wiIn8CuBho
-/qQdlS8=
------END CERTIFICATE-----
diff --git a/test/scripts/generate.sh b/test/scripts/generate.sh
deleted file mode 100755
index eec3077d..00000000
--- a/test/scripts/generate.sh
+++ /dev/null
@@ -1,17 +0,0 @@
-#!/bin/bash
-
-if [ ! -f ./private.key ]
-then
- openssl genrsa -out private.key 3072
-fi
-openssl req \
- -batch \
- -new -x509 \
- -key private.key \
- -sha256 \
- -out cert.pem \
- -days 9999 \
- -config ./openssl.cnf
-openssl x509 -in cert.pem -text -noout
-cat ./private.key ./cert.pem > testcert.pem
-rm ./private.key ./cert.pem
diff --git a/test/scripts/openssl.cnf b/test/scripts/openssl.cnf
deleted file mode 100644
index 5c890354..00000000
--- a/test/scripts/openssl.cnf
+++ /dev/null
@@ -1,39 +0,0 @@
-[ req ]
-default_bits = 1024
-default_keyfile = privkey.pem
-distinguished_name = req_distinguished_name
-x509_extensions = v3_ca
-
-[ req_distinguished_name ]
-countryName = Country Name (2 letter code)
-countryName_default = NZ
-countryName_min = 2
-countryName_max = 2
-stateOrProvinceName = State or Province Name (full name)
-stateOrProvinceName_default = Otago
-localityName = Locality Name (eg, city)
-0.organizationName = Organization Name (eg, company)
-0.organizationName_default = Pathod
-commonName = Common Name (e.g. server FQDN or YOUR name)
-commonName_default = test.com
-commonName_max = 64
-
-[ v3_req ]
-
-basicConstraints = CA:FALSE
-keyUsage = nonRepudiation, digitalSignature, keyEncipherment
-
-[ v3_ca ]
-
-keyUsage = digitalSignature, keyEncipherment
-subjectKeyIdentifier=hash
-authorityKeyIdentifier=keyid:always,issuer:always
-basicConstraints = CA:true
-subjectAltName = @alternate_names
-
-
-[ alternate_names ]
-
-DNS.1 = test.com
-DNS.2 = test2.com
-DNS.3 = test3.com
diff --git a/test/test_app.py b/test/test_app.py
deleted file mode 100644
index 4536db8e..00000000
--- a/test/test_app.py
+++ /dev/null
@@ -1,85 +0,0 @@
-import tutils
-
-
-class TestApp(tutils.DaemonTests):
- SSL = False
-
- def test_index(self):
- r = self.getpath("/")
- assert r.status_code == 200
- assert r.content
-
- def test_about(self):
- r = self.getpath("/about")
- assert r.ok
-
- def test_download(self):
- r = self.getpath("/download")
- assert r.ok
-
- def test_docs(self):
- assert self.getpath("/docs/pathod").status_code == 200
- assert self.getpath("/docs/pathoc").status_code == 200
- assert self.getpath("/docs/language").status_code == 200
- assert self.getpath("/docs/libpathod").status_code == 200
- assert self.getpath("/docs/test").status_code == 200
-
- def test_log(self):
- assert self.getpath("/log").status_code == 200
- assert self.get("200:da").status_code == 200
- id = self.d.log()[0]["id"]
- assert self.getpath("/log").status_code == 200
- assert self.getpath("/log/%s" % id).status_code == 200
- assert self.getpath("/log/9999999").status_code == 404
-
- def test_log_binary(self):
- assert self.get("200:h@10b=@10b:da")
-
- def test_response_preview(self):
- r = self.getpath("/response_preview", params=dict(spec="200"))
- assert r.status_code == 200
- assert 'Response' in r.content
-
- r = self.getpath("/response_preview", params=dict(spec="foo"))
- assert r.status_code == 200
- assert 'Error' in r.content
-
- r = self.getpath("/response_preview", params=dict(spec="200:b@100m"))
- assert r.status_code == 200
- assert "too large" in r.content
-
- r = self.getpath("/response_preview", params=dict(spec="200:b@5k"))
- assert r.status_code == 200
- assert 'Response' in r.content
-
- r = self.getpath(
- "/response_preview",
- params=dict(
- spec="200:b<nonexistent"))
- assert r.status_code == 200
- assert 'File access denied' in r.content
-
- r = self.getpath("/response_preview", params=dict(spec="200:b<file"))
- assert r.status_code == 200
- assert 'testfile' in r.content
-
- def test_request_preview(self):
- r = self.getpath("/request_preview", params=dict(spec="get:/"))
- assert r.status_code == 200
- assert 'Request' in r.content
-
- r = self.getpath("/request_preview", params=dict(spec="foo"))
- assert r.status_code == 200
- assert 'Error' in r.content
-
- r = self.getpath("/request_preview", params=dict(spec="get:/:b@100m"))
- assert r.status_code == 200
- assert "too large" in r.content
-
- r = self.getpath("/request_preview", params=dict(spec="get:/:b@5k"))
- assert r.status_code == 200
- assert 'Request' in r.content
-
- r = self.getpath("/request_preview", params=dict(spec=""))
- assert r.status_code == 200
- assert 'empty spec' in r.content
diff --git a/test/test_language_actions.py b/test/test_language_actions.py
deleted file mode 100644
index 755f0d85..00000000
--- a/test/test_language_actions.py
+++ /dev/null
@@ -1,135 +0,0 @@
-import cStringIO
-
-from libpathod.language import actions
-from libpathod import language
-
-
-def parse_request(s):
- return language.parse_pathoc(s).next()
-
-
-def test_unique_name():
- assert not actions.PauseAt(0, "f").unique_name
- assert actions.DisconnectAt(0).unique_name
-
-
-class TestDisconnects:
-
- def test_parse_pathod(self):
- a = language.parse_pathod("400:d0").next().actions[0]
- assert a.spec() == "d0"
- a = language.parse_pathod("400:dr").next().actions[0]
- assert a.spec() == "dr"
-
- def test_at(self):
- e = actions.DisconnectAt.expr()
- v = e.parseString("d0")[0]
- assert isinstance(v, actions.DisconnectAt)
- assert v.offset == 0
-
- v = e.parseString("d100")[0]
- assert v.offset == 100
-
- e = actions.DisconnectAt.expr()
- v = e.parseString("dr")[0]
- assert v.offset == "r"
-
- def test_spec(self):
- assert actions.DisconnectAt("r").spec() == "dr"
- assert actions.DisconnectAt(10).spec() == "d10"
-
-
-class TestInject:
-
- def test_parse_pathod(self):
- a = language.parse_pathod("400:ir,@100").next().actions[0]
- assert a.offset == "r"
- assert a.value.datatype == "bytes"
- assert a.value.usize == 100
-
- a = language.parse_pathod("400:ia,@100").next().actions[0]
- assert a.offset == "a"
-
- def test_at(self):
- e = actions.InjectAt.expr()
- v = e.parseString("i0,'foo'")[0]
- assert v.value.val == "foo"
- assert v.offset == 0
- assert isinstance(v, actions.InjectAt)
-
- v = e.parseString("ir,'foo'")[0]
- assert v.offset == "r"
-
- def test_serve(self):
- s = cStringIO.StringIO()
- r = language.parse_pathod("400:i0,'foo'").next()
- assert language.serve(r, s, {})
-
- def test_spec(self):
- e = actions.InjectAt.expr()
- v = e.parseString("i0,'foo'")[0]
- assert v.spec() == 'i0,"foo"'
-
- def test_spec(self):
- e = actions.InjectAt.expr()
- v = e.parseString("i0,@100")[0]
- v2 = v.freeze({})
- v3 = v2.freeze({})
- assert v2.value.val == v3.value.val
-
-
-class TestPauses:
-
- def test_parse_pathod(self):
- e = actions.PauseAt.expr()
- v = e.parseString("p10,10")[0]
- assert v.seconds == 10
- assert v.offset == 10
-
- v = e.parseString("p10,f")[0]
- assert v.seconds == "f"
-
- v = e.parseString("pr,f")[0]
- assert v.offset == "r"
-
- v = e.parseString("pa,f")[0]
- assert v.offset == "a"
-
- def test_request(self):
- r = language.parse_pathod('400:p10,10').next()
- assert r.actions[0].spec() == "p10,10"
-
- def test_spec(self):
- assert actions.PauseAt("r", 5).spec() == "pr,5"
- assert actions.PauseAt(0, 5).spec() == "p0,5"
- assert actions.PauseAt(0, "f").spec() == "p0,f"
-
- def test_freeze(self):
- l = actions.PauseAt("r", 5)
- assert l.freeze({}).spec() == l.spec()
-
-
-class Test_Action:
-
- def test_cmp(self):
- a = actions.DisconnectAt(0)
- b = actions.DisconnectAt(1)
- c = actions.DisconnectAt(0)
- assert a < b
- assert a == c
- l = sorted([b, a])
- assert l[0].offset == 0
-
- def test_resolve(self):
- r = parse_request('GET:"/foo"')
- e = actions.DisconnectAt("r")
- ret = e.resolve({}, r)
- assert isinstance(ret.offset, int)
-
- def test_repr(self):
- e = actions.DisconnectAt("r")
- assert repr(e)
-
- def test_freeze(self):
- l = actions.DisconnectAt(5)
- assert l.freeze({}).spec() == l.spec()
diff --git a/test/test_language_base.py b/test/test_language_base.py
deleted file mode 100644
index b18ee5b2..00000000
--- a/test/test_language_base.py
+++ /dev/null
@@ -1,352 +0,0 @@
-import os
-from libpathod import language
-from libpathod.language import base, exceptions
-import tutils
-
-
-def parse_request(s):
- return language.parse_pathoc(s).next()
-
-
-def test_times():
- reqs = list(language.parse_pathoc("get:/:x5"))
- assert len(reqs) == 5
- assert not reqs[0].times
-
-
-def test_caseless_literal():
- class CL(base.CaselessLiteral):
- TOK = "foo"
- v = CL("foo")
- assert v.expr()
- assert v.values(language.Settings())
-
-
-class TestTokValueNakedLiteral:
-
- def test_expr(self):
- v = base.TokValueNakedLiteral("foo")
- assert v.expr()
-
- def test_spec(self):
- v = base.TokValueNakedLiteral("foo")
- assert v.spec() == repr(v) == "foo"
-
- v = base.TokValueNakedLiteral("f\x00oo")
- assert v.spec() == repr(v) == r"f\x00oo"
-
-
-class TestTokValueLiteral:
-
- def test_espr(self):
- v = base.TokValueLiteral("foo")
- assert v.expr()
- assert v.val == "foo"
-
- v = base.TokValueLiteral("foo\n")
- assert v.expr()
- assert v.val == "foo\n"
- assert repr(v)
-
- def test_spec(self):
- v = base.TokValueLiteral("foo")
- assert v.spec() == r"'foo'"
-
- v = base.TokValueLiteral("f\x00oo")
- assert v.spec() == repr(v) == r"'f\x00oo'"
-
- v = base.TokValueLiteral("\"")
- assert v.spec() == repr(v) == '\'"\''
-
- def roundtrip(self, spec):
- e = base.TokValueLiteral.expr()
- v = base.TokValueLiteral(spec)
- v2 = e.parseString(v.spec())
- assert v.val == v2[0].val
- assert v.spec() == v2[0].spec()
-
- def test_roundtrip(self):
- self.roundtrip("'")
- self.roundtrip('\'')
- self.roundtrip("a")
- self.roundtrip("\"")
- # self.roundtrip("\\")
- self.roundtrip("200:b'foo':i23,'\\''")
- self.roundtrip("\a")
-
-
-class TestTokValueGenerate:
-
- def test_basic(self):
- v = base.TokValue.parseString("@10b")[0]
- assert v.usize == 10
- assert v.unit == "b"
- assert v.bytes() == 10
- v = base.TokValue.parseString("@10")[0]
- assert v.unit == "b"
- v = base.TokValue.parseString("@10k")[0]
- assert v.bytes() == 10240
- v = base.TokValue.parseString("@10g")[0]
- assert v.bytes() == 1024 ** 3 * 10
-
- v = base.TokValue.parseString("@10g,digits")[0]
- assert v.datatype == "digits"
- g = v.get_generator({})
- assert g[:100]
-
- v = base.TokValue.parseString("@10,digits")[0]
- assert v.unit == "b"
- assert v.datatype == "digits"
-
- def test_spec(self):
- v = base.TokValueGenerate(1, "b", "bytes")
- assert v.spec() == repr(v) == "@1"
-
- v = base.TokValueGenerate(1, "k", "bytes")
- assert v.spec() == repr(v) == "@1k"
-
- v = base.TokValueGenerate(1, "k", "ascii")
- assert v.spec() == repr(v) == "@1k,ascii"
-
- v = base.TokValueGenerate(1, "b", "ascii")
- assert v.spec() == repr(v) == "@1,ascii"
-
- def test_freeze(self):
- v = base.TokValueGenerate(100, "b", "ascii")
- f = v.freeze(language.Settings())
- assert len(f.val) == 100
-
-
-class TestTokValueFile:
-
- def test_file_value(self):
- v = base.TokValue.parseString("<'one two'")[0]
- assert str(v)
- assert v.path == "one two"
-
- v = base.TokValue.parseString("<path")[0]
- assert v.path == "path"
-
- def test_access_control(self):
- v = base.TokValue.parseString("<path")[0]
- with tutils.tmpdir() as t:
- p = os.path.join(t, "path")
- with open(p, "wb") as f:
- f.write("x" * 10000)
-
- assert v.get_generator(language.Settings(staticdir=t))
-
- v = base.TokValue.parseString("<path2")[0]
- tutils.raises(
- exceptions.FileAccessDenied,
- v.get_generator,
- language.Settings(staticdir=t)
- )
- tutils.raises(
- "access disabled",
- v.get_generator,
- language.Settings()
- )
-
- v = base.TokValue.parseString("</outside")[0]
- tutils.raises(
- "outside",
- v.get_generator,
- language.Settings(staticdir=t)
- )
-
- def test_spec(self):
- v = base.TokValue.parseString("<'one two'")[0]
- v2 = base.TokValue.parseString(v.spec())[0]
- assert v2.path == "one two"
-
- def test_freeze(self):
- v = base.TokValue.parseString("<'one two'")[0]
- v2 = v.freeze({})
- assert v2.path == v.path
-
-
-class TestMisc:
-
- def test_generators(self):
- v = base.TokValue.parseString("'val'")[0]
- g = v.get_generator({})
- assert g[:] == "val"
-
- def test_value(self):
- assert base.TokValue.parseString("'val'")[0].val == "val"
- assert base.TokValue.parseString('"val"')[0].val == "val"
- assert base.TokValue.parseString('"\'val\'"')[0].val == "'val'"
-
- def test_value(self):
- class TT(base.Value):
- preamble = "m"
- e = TT.expr()
- v = e.parseString("m'msg'")[0]
- assert v.value.val == "msg"
-
- s = v.spec()
- assert s == e.parseString(s)[0].spec()
-
- v = e.parseString("m@100")[0]
- v2 = v.freeze({})
- v3 = v2.freeze({})
- assert v2.value.val == v3.value.val
-
- def test_fixedlengthvalue(self):
- class TT(base.FixedLengthValue):
- preamble = "m"
- length = 4
-
- e = TT.expr()
- assert e.parseString("m@4")
- tutils.raises("invalid value length", e.parseString, "m@100")
- tutils.raises("invalid value length", e.parseString, "m@1")
-
- with tutils.tmpdir() as t:
- p = os.path.join(t, "path")
- s = base.Settings(staticdir=t)
- with open(p, "wb") as f:
- f.write("a" * 20)
- v = e.parseString("m<path")[0]
- tutils.raises("invalid value length", v.values, s)
-
- p = os.path.join(t, "path")
- with open(p, "wb") as f:
- f.write("a" * 4)
- v = e.parseString("m<path")[0]
- assert v.values(s)
-
-
-class TKeyValue(base.KeyValue):
- preamble = "h"
-
- def values(self, settings):
- return [
- self.key.get_generator(settings),
- ": ",
- self.value.get_generator(settings),
- "\r\n",
- ]
-
-
-class TestKeyValue:
-
- def test_simple(self):
- e = TKeyValue.expr()
- v = e.parseString("h'foo'='bar'")[0]
- assert v.key.val == "foo"
- assert v.value.val == "bar"
-
- v2 = e.parseString(v.spec())[0]
- assert v2.key.val == v.key.val
- assert v2.value.val == v.value.val
-
- s = v.spec()
- assert s == e.parseString(s)[0].spec()
-
- def test_freeze(self):
- e = TKeyValue.expr()
- v = e.parseString("h@10=@10'")[0]
- v2 = v.freeze({})
- v3 = v2.freeze({})
- assert v2.key.val == v3.key.val
- assert v2.value.val == v3.value.val
-
-
-def test_intfield():
- class TT(base.IntField):
- preamble = "t"
- names = {
- "one": 1,
- "two": 2,
- "three": 3
- }
- max = 4
- e = TT.expr()
-
- v = e.parseString("tone")[0]
- assert v.value == 1
- assert v.spec() == "tone"
- assert v.values(language.Settings())
-
- v = e.parseString("t1")[0]
- assert v.value == 1
- assert v.spec() == "t1"
-
- v = e.parseString("t4")[0]
- assert v.value == 4
- assert v.spec() == "t4"
-
- tutils.raises("can't exceed", e.parseString, "t5")
-
-
-def test_options_or_value():
- class TT(base.OptionsOrValue):
- options = [
- "one",
- "two",
- "three"
- ]
- e = TT.expr()
- assert e.parseString("one")[0].value.val == "one"
- assert e.parseString("'foo'")[0].value.val == "foo"
- assert e.parseString("'get'")[0].value.val == "get"
-
- assert e.parseString("one")[0].spec() == "one"
- assert e.parseString("'foo'")[0].spec() == "'foo'"
-
- s = e.parseString("one")[0].spec()
- assert s == e.parseString(s)[0].spec()
-
- s = e.parseString("'foo'")[0].spec()
- assert s == e.parseString(s)[0].spec()
-
- v = e.parseString("@100")[0]
- v2 = v.freeze({})
- v3 = v2.freeze({})
- assert v2.value.val == v3.value.val
-
-
-def test_integer():
- e = base.Integer.expr()
- v = e.parseString("200")[0]
- assert v.string() == "200"
- assert v.spec() == "200"
-
- assert v.freeze({}).value == v.value
-
- class BInt(base.Integer):
- bounds = (1, 5)
-
- tutils.raises("must be between", BInt, 0)
- tutils.raises("must be between", BInt, 6)
- assert BInt(5)
- assert BInt(1)
- assert BInt(3)
-
-
-class TBoolean(base.Boolean):
- name = "test"
-
-
-def test_unique_name():
- b = TBoolean(True)
- assert b.unique_name
-
-
-class test_boolean():
- e = TBoolean.expr()
- assert e.parseString("test")[0].value
- assert not e.parseString("-test")[0].value
-
- def roundtrip(s):
- e = TBoolean.expr()
- s2 = e.parseString(s)[0].spec()
- v1 = e.parseString(s)[0].value
- v2 = e.parseString(s2)[0].value
- assert s == s2
- assert v1 == v2
-
- roundtrip("test")
- roundtrip("-test")
diff --git a/test/test_language_generators.py b/test/test_language_generators.py
deleted file mode 100644
index 945560c3..00000000
--- a/test/test_language_generators.py
+++ /dev/null
@@ -1,42 +0,0 @@
-import os
-
-from libpathod.language import generators
-import tutils
-
-
-def test_randomgenerator():
- g = generators.RandomGenerator("bytes", 100)
- assert repr(g)
- assert len(g[:10]) == 10
- assert len(g[1:10]) == 9
- assert len(g[:1000]) == 100
- assert len(g[1000:1001]) == 0
- assert g[0]
-
-
-def test_filegenerator():
- with tutils.tmpdir() as t:
- path = os.path.join(t, "foo")
- f = open(path, "wb")
- f.write("x" * 10000)
- f.close()
- g = generators.FileGenerator(path)
- assert len(g) == 10000
- assert g[0] == "x"
- assert g[-1] == "x"
- assert g[0:5] == "xxxxx"
- assert repr(g)
- # remove all references to FileGenerator instance to close the file
- # handle.
- del g
-
-
-def test_transform_generator():
- def trans(offset, data):
- return "a" * len(data)
- g = "one"
- t = generators.TransformGenerator(g, trans)
- assert len(t) == len(g)
- assert t[0] == "a"
- assert t[:] == "a" * len(g)
- assert repr(t)
diff --git a/test/test_language_http.py b/test/test_language_http.py
deleted file mode 100644
index 26bb6a45..00000000
--- a/test/test_language_http.py
+++ /dev/null
@@ -1,358 +0,0 @@
-import cStringIO
-
-from libpathod import language
-from libpathod.language import http, base
-import tutils
-
-
-def parse_request(s):
- return language.parse_pathoc(s).next()
-
-
-def test_make_error_response():
- d = cStringIO.StringIO()
- s = http.make_error_response("foo")
- language.serve(s, d, {})
-
-
-class TestRequest:
-
- def test_nonascii(self):
- tutils.raises("ascii", parse_request, "get:\xf0")
-
- def test_err(self):
- tutils.raises(language.ParseException, parse_request, 'GET')
-
- def test_simple(self):
- r = parse_request('GET:"/foo"')
- assert r.method.string() == "GET"
- assert r.path.string() == "/foo"
- r = parse_request('GET:/foo')
- assert r.path.string() == "/foo"
- r = parse_request('GET:@1k')
- assert len(r.path.string()) == 1024
-
- def test_multiple(self):
- r = list(language.parse_pathoc("GET:/ PUT:/"))
- assert r[0].method.string() == "GET"
- assert r[1].method.string() == "PUT"
- assert len(r) == 2
-
- l = """
- GET
- "/foo"
- ir,@1
-
- PUT
-
- "/foo
-
-
-
- bar"
-
- ir,@1
- """
- r = list(language.parse_pathoc(l))
- assert len(r) == 2
- assert r[0].method.string() == "GET"
- assert r[1].method.string() == "PUT"
-
- l = """
- get:"http://localhost:9999/p/200":ir,@1
- get:"http://localhost:9999/p/200":ir,@2
- """
- r = list(language.parse_pathoc(l))
- assert len(r) == 2
- assert r[0].method.string() == "GET"
- assert r[1].method.string() == "GET"
-
- def test_nested_response(self):
- l = "get:/p:s'200'"
- r = list(language.parse_pathoc(l))
- assert len(r) == 1
- assert len(r[0].tokens) == 3
- assert isinstance(r[0].tokens[2], http.NestedResponse)
- assert r[0].values({})
-
- def test_render(self):
- s = cStringIO.StringIO()
- r = parse_request("GET:'/foo'")
- assert language.serve(
- r,
- s,
- language.Settings(request_host="foo.com")
- )
-
- def test_multiline(self):
- l = """
- GET
- "/foo"
- ir,@1
- """
- r = parse_request(l)
- assert r.method.string() == "GET"
- assert r.path.string() == "/foo"
- assert r.actions
-
- l = """
- GET
-
- "/foo
-
-
-
- bar"
-
- ir,@1
- """
- r = parse_request(l)
- assert r.method.string() == "GET"
- assert r.path.string().endswith("bar")
- assert r.actions
-
- def test_spec(self):
- def rt(s):
- s = parse_request(s).spec()
- assert parse_request(s).spec() == s
- rt("get:/foo")
- rt("get:/foo:da")
-
- def test_freeze(self):
- r = parse_request("GET:/:b@100").freeze(language.Settings())
- assert len(r.spec()) > 100
-
- def test_path_generator(self):
- r = parse_request("GET:@100").freeze(language.Settings())
- assert len(r.spec()) > 100
-
- def test_websocket(self):
- r = parse_request('ws:/path/')
- res = r.resolve(language.Settings())
- assert res.method.string().lower() == "get"
- assert res.tok(http.Path).value.val == "/path/"
- assert res.tok(http.Method).value.val.lower() == "get"
- assert http.get_header("Upgrade", res.headers).value.val == "websocket"
-
- r = parse_request('ws:put:/path/')
- res = r.resolve(language.Settings())
- assert r.method.string().lower() == "put"
- assert res.tok(http.Path).value.val == "/path/"
- assert res.tok(http.Method).value.val.lower() == "put"
- assert http.get_header("Upgrade", res.headers).value.val == "websocket"
-
-
-class TestResponse:
-
- def dummy_response(self):
- return language.parse_pathod("400'msg'").next()
-
- def test_response(self):
- r = language.parse_pathod("400:m'msg'").next()
- assert r.status_code.string() == "400"
- assert r.reason.string() == "msg"
-
- r = language.parse_pathod("400:m'msg':b@100b").next()
- assert r.reason.string() == "msg"
- assert r.body.values({})
- assert str(r)
-
- r = language.parse_pathod("200").next()
- assert r.status_code.string() == "200"
- assert not r.reason
- assert "OK" in [i[:] for i in r.preamble({})]
-
- def test_render(self):
- s = cStringIO.StringIO()
- r = language.parse_pathod("400:m'msg'").next()
- assert language.serve(r, s, {})
-
- r = language.parse_pathod("400:p0,100:dr").next()
- assert "p0" in r.spec()
- s = r.preview_safe()
- assert "p0" not in s.spec()
-
- def test_raw(self):
- s = cStringIO.StringIO()
- r = language.parse_pathod("400:b'foo'").next()
- language.serve(r, s, {})
- v = s.getvalue()
- assert "Content-Length" in v
-
- s = cStringIO.StringIO()
- r = language.parse_pathod("400:b'foo':r").next()
- language.serve(r, s, {})
- v = s.getvalue()
- assert "Content-Length" not in v
-
- def test_length(self):
- def testlen(x):
- s = cStringIO.StringIO()
- x = x.next()
- language.serve(x, s, language.Settings())
- assert x.length(language.Settings()) == len(s.getvalue())
- testlen(language.parse_pathod("400:m'msg':r"))
- testlen(language.parse_pathod("400:m'msg':h'foo'='bar':r"))
- testlen(language.parse_pathod("400:m'msg':h'foo'='bar':b@100b:r"))
-
- def test_maximum_length(self):
- def testlen(x):
- x = x.next()
- s = cStringIO.StringIO()
- m = x.maximum_length({})
- language.serve(x, s, {})
- assert m >= len(s.getvalue())
-
- r = language.parse_pathod("400:m'msg':b@100:d0")
- testlen(r)
-
- r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'")
- testlen(r)
-
- r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'")
- testlen(r)
-
- def test_parse_err(self):
- tutils.raises(
- language.ParseException, language.parse_pathod, "400:msg,b:"
- )
- try:
- language.parse_pathod("400'msg':b:")
- except language.ParseException as v:
- assert v.marked()
- assert str(v)
-
- def test_nonascii(self):
- tutils.raises("ascii", language.parse_pathod, "foo:b\xf0")
-
- def test_parse_header(self):
- r = language.parse_pathod('400:h"foo"="bar"').next()
- assert http.get_header("foo", r.headers)
-
- def test_parse_pause_before(self):
- r = language.parse_pathod("400:p0,10").next()
- assert r.actions[0].spec() == "p0,10"
-
- def test_parse_pause_after(self):
- r = language.parse_pathod("400:pa,10").next()
- assert r.actions[0].spec() == "pa,10"
-
- def test_parse_pause_random(self):
- r = language.parse_pathod("400:pr,10").next()
- assert r.actions[0].spec() == "pr,10"
-
- def test_parse_stress(self):
- # While larger values are known to work on linux, len() technically
- # returns an int and a python 2.7 int on windows has 32bit precision.
- # Therefore, we should keep the body length < 2147483647 bytes in our
- # tests.
- r = language.parse_pathod("400:b@1g").next()
- assert r.length({})
-
- def test_spec(self):
- def rt(s):
- s = language.parse_pathod(s).next().spec()
- assert language.parse_pathod(s).next().spec() == s
- rt("400:b@100g")
- rt("400")
- rt("400:da")
-
- def test_websockets(self):
- r = language.parse_pathod("ws").next()
- tutils.raises("no websocket key", r.resolve, language.Settings())
- res = r.resolve(language.Settings(websocket_key="foo"))
- assert res.status_code.string() == "101"
-
-
-def test_ctype_shortcut():
- e = http.ShortcutContentType.expr()
- v = e.parseString("c'foo'")[0]
- assert v.key.val == "Content-Type"
- assert v.value.val == "foo"
-
- s = v.spec()
- assert s == e.parseString(s)[0].spec()
-
- e = http.ShortcutContentType.expr()
- v = e.parseString("c@100")[0]
- v2 = v.freeze({})
- v3 = v2.freeze({})
- assert v2.value.val == v3.value.val
-
-
-def test_location_shortcut():
- e = http.ShortcutLocation.expr()
- v = e.parseString("l'foo'")[0]
- assert v.key.val == "Location"
- assert v.value.val == "foo"
-
- s = v.spec()
- assert s == e.parseString(s)[0].spec()
-
- e = http.ShortcutLocation.expr()
- v = e.parseString("l@100")[0]
- v2 = v.freeze({})
- v3 = v2.freeze({})
- assert v2.value.val == v3.value.val
-
-
-def test_shortcuts():
- assert language.parse_pathod(
- "400:c'foo'").next().headers[0].key.val == "Content-Type"
- assert language.parse_pathod(
- "400:l'foo'").next().headers[0].key.val == "Location"
-
- assert "Android" in tutils.render(parse_request("get:/:ua"))
- assert "User-Agent" in tutils.render(parse_request("get:/:ua"))
-
-
-def test_user_agent():
- e = http.ShortcutUserAgent.expr()
- v = e.parseString("ua")[0]
- assert "Android" in v.string()
-
- e = http.ShortcutUserAgent.expr()
- v = e.parseString("u'a'")[0]
- assert "Android" not in v.string()
-
- v = e.parseString("u@100'")[0]
- assert len(str(v.freeze({}).value)) > 100
- v2 = v.freeze({})
- v3 = v2.freeze({})
- assert v2.value.val == v3.value.val
-
-
-def test_nested_response():
- e = http.NestedResponse.expr()
- v = e.parseString("s'200'")[0]
- assert v.value.val == "200"
- tutils.raises(
- language.ParseException,
- e.parseString,
- "s'foo'"
- )
-
- v = e.parseString('s"200:b@1"')[0]
- assert "@1" in v.spec()
- f = v.freeze({})
- assert "@1" not in f.spec()
-
-
-def test_nested_response_freeze():
- e = http.NestedResponse(
- base.TokValueLiteral(
- "200:b'foo':i10,'\\x27'".encode(
- "string_escape"
- )
- )
- )
- assert e.freeze({})
- assert e.values({})
-
-
-def test_unique_components():
- tutils.raises(
- "multiple body clauses",
- language.parse_pathod,
- "400:b@1:b@1"
- )
diff --git a/test/test_language_http2.py b/test/test_language_http2.py
deleted file mode 100644
index 9be49452..00000000
--- a/test/test_language_http2.py
+++ /dev/null
@@ -1,233 +0,0 @@
-import cStringIO
-
-import netlib
-from netlib import tcp
-from netlib.http import user_agents
-
-from libpathod import language
-from libpathod.language import http2, base
-import tutils
-
-
-def parse_request(s):
- return language.parse_pathoc(s, True).next()
-
-
-def parse_response(s):
- return language.parse_pathod(s, True).next()
-
-
-def default_settings():
- return language.Settings(
- request_host="foo.com",
- protocol=netlib.http.http2.HTTP2Protocol(tcp.TCPClient(('localhost', 1234)))
- )
-
-
-def test_make_error_response():
- d = cStringIO.StringIO()
- s = http2.make_error_response("foo", "bar")
- language.serve(s, d, default_settings())
-
-
-class TestRequest:
-
- def test_cached_values(self):
- req = parse_request("get:/")
- req_id = id(req)
- assert req_id == id(req.resolve(default_settings()))
- assert req.values(default_settings()) == req.values(default_settings())
-
- def test_nonascii(self):
- tutils.raises("ascii", parse_request, "get:\xf0")
-
- def test_err(self):
- tutils.raises(language.ParseException, parse_request, 'GET')
-
- def test_simple(self):
- r = parse_request('GET:"/foo"')
- assert r.method.string() == "GET"
- assert r.path.string() == "/foo"
- r = parse_request('GET:/foo')
- assert r.path.string() == "/foo"
-
- def test_multiple(self):
- r = list(language.parse_pathoc("GET:/ PUT:/"))
- assert r[0].method.string() == "GET"
- assert r[1].method.string() == "PUT"
- assert len(r) == 2
-
- l = """
- GET
- "/foo"
-
- PUT
-
- "/foo
-
-
-
- bar"
- """
- r = list(language.parse_pathoc(l, True))
- assert len(r) == 2
- assert r[0].method.string() == "GET"
- assert r[1].method.string() == "PUT"
-
- l = """
- get:"http://localhost:9999/p/200"
- get:"http://localhost:9999/p/200"
- """
- r = list(language.parse_pathoc(l, True))
- assert len(r) == 2
- assert r[0].method.string() == "GET"
- assert r[1].method.string() == "GET"
-
- def test_render_simple(self):
- s = cStringIO.StringIO()
- r = parse_request("GET:'/foo'")
- assert language.serve(
- r,
- s,
- default_settings(),
- )
-
- def test_raw_content_length(self):
- r = parse_request('GET:/:r')
- assert len(r.headers) == 0
-
- r = parse_request('GET:/:r:b"foobar"')
- assert len(r.headers) == 0
-
- r = parse_request('GET:/')
- assert len(r.headers) == 1
- assert r.headers[0].values(default_settings()) == ("content-length", "0")
-
- r = parse_request('GET:/:b"foobar"')
- assert len(r.headers) == 1
- assert r.headers[0].values(default_settings()) == ("content-length", "6")
-
- r = parse_request('GET:/:b"foobar":h"content-length"="42"')
- assert len(r.headers) == 1
- assert r.headers[0].values(default_settings()) == ("content-length", "42")
-
- r = parse_request('GET:/:r:b"foobar":h"content-length"="42"')
- assert len(r.headers) == 1
- assert r.headers[0].values(default_settings()) == ("content-length", "42")
-
- def test_content_type(self):
- r = parse_request('GET:/:r:c"foobar"')
- assert len(r.headers) == 1
- assert r.headers[0].values(default_settings()) == ("content-type", "foobar")
-
- def test_user_agent(self):
- r = parse_request('GET:/:r:ua')
- assert len(r.headers) == 1
- assert r.headers[0].values(default_settings()) == ("user-agent", user_agents.get_by_shortcut('a')[2])
-
- def test_render_with_headers(self):
- s = cStringIO.StringIO()
- r = parse_request('GET:/foo:h"foo"="bar"')
- assert language.serve(
- r,
- s,
- default_settings(),
- )
-
- def test_nested_response(self):
- l = "get:/p/:s'200'"
- r = parse_request(l)
- assert len(r.tokens) == 3
- assert isinstance(r.tokens[2], http2.NestedResponse)
- assert r.values(default_settings())
-
-
- def test_render_with_body(self):
- s = cStringIO.StringIO()
- r = parse_request("GET:'/foo':bfoobar")
- assert language.serve(
- r,
- s,
- default_settings(),
- )
-
- def test_spec(self):
- def rt(s):
- s = parse_request(s).spec()
- assert parse_request(s).spec() == s
- rt("get:/foo")
-
-
-class TestResponse:
-
- def test_cached_values(self):
- res = parse_response("200")
- res_id = id(res)
- assert res_id == id(res.resolve(default_settings()))
- assert res.values(default_settings()) == res.values(default_settings())
-
- def test_nonascii(self):
- tutils.raises("ascii", parse_response, "200:\xf0")
-
- def test_err(self):
- tutils.raises(language.ParseException, parse_response, 'GET:/')
-
- def test_raw_content_length(self):
- r = parse_response('200:r')
- assert len(r.headers) == 0
-
- r = parse_response('200')
- assert len(r.headers) == 1
- assert r.headers[0].values(default_settings()) == ("content-length", "0")
-
- def test_content_type(self):
- r = parse_response('200:r:c"foobar"')
- assert len(r.headers) == 1
- assert r.headers[0].values(default_settings()) == ("content-type", "foobar")
-
- def test_simple(self):
- r = parse_response('200:r:h"foo"="bar"')
- assert r.status_code.string() == "200"
- assert len(r.headers) == 1
- assert r.headers[0].values(default_settings()) == ("foo", "bar")
- assert r.body is None
-
- r = parse_response('200:r:h"foo"="bar":bfoobar:h"bla"="fasel"')
- assert r.status_code.string() == "200"
- assert len(r.headers) == 2
- assert r.headers[0].values(default_settings()) == ("foo", "bar")
- assert r.headers[1].values(default_settings()) == ("bla", "fasel")
- assert r.body.string() == "foobar"
-
- def test_render_simple(self):
- s = cStringIO.StringIO()
- r = parse_response('200')
- assert language.serve(
- r,
- s,
- default_settings(),
- )
-
- def test_render_with_headers(self):
- s = cStringIO.StringIO()
- r = parse_response('200:h"foo"="bar"')
- assert language.serve(
- r,
- s,
- default_settings(),
- )
-
- def test_render_with_body(self):
- s = cStringIO.StringIO()
- r = parse_response('200:bfoobar')
- assert language.serve(
- r,
- s,
- default_settings(),
- )
-
- def test_spec(self):
- def rt(s):
- s = parse_response(s).spec()
- assert parse_response(s).spec() == s
- rt("200:bfoobar")
diff --git a/test/test_language_websocket.py b/test/test_language_websocket.py
deleted file mode 100644
index d98fd33e..00000000
--- a/test/test_language_websocket.py
+++ /dev/null
@@ -1,142 +0,0 @@
-
-from libpathod import language
-from libpathod.language import websockets
-import netlib.websockets
-import tutils
-
-
-def parse_request(s):
- return language.parse_pathoc(s).next()
-
-
-class TestWebsocketFrame:
-
- def _test_messages(self, specs, message_klass):
- for i in specs:
- wf = parse_request(i)
- assert isinstance(wf, message_klass)
- assert wf
- assert wf.values(language.Settings())
- assert wf.resolve(language.Settings())
-
- spec = wf.spec()
- wf2 = parse_request(spec)
- assert wf2.spec() == spec
-
- def test_server_values(self):
- specs = [
- "wf",
- "wf:dr",
- "wf:b'foo'",
- "wf:mask:r'foo'",
- "wf:l1024:b'foo'",
- "wf:cbinary",
- "wf:c1",
- "wf:mask:knone",
- "wf:fin",
- "wf:fin:rsv1:rsv2:rsv3:mask",
- "wf:-fin:-rsv1:-rsv2:-rsv3:-mask",
- "wf:k@4",
- "wf:x10",
- ]
- self._test_messages(specs, websockets.WebsocketFrame)
-
- def test_parse_websocket_frames(self):
- wf = language.parse_websocket_frame("wf:x10")
- assert len(list(wf)) == 10
- tutils.raises(
- language.ParseException,
- language.parse_websocket_frame,
- "wf:x"
- )
-
- def test_client_values(self):
- specs = [
- "wf:f'wf'",
- ]
- self._test_messages(specs, websockets.WebsocketClientFrame)
-
- def test_nested_frame(self):
- wf = parse_request("wf:f'wf'")
- assert wf.nested_frame
-
- def test_flags(self):
- wf = parse_request("wf:fin:mask:rsv1:rsv2:rsv3")
- frm = netlib.websockets.Frame.from_bytes(tutils.render(wf))
- assert frm.header.fin
- assert frm.header.mask
- assert frm.header.rsv1
- assert frm.header.rsv2
- assert frm.header.rsv3
-
- wf = parse_request("wf:-fin:-mask:-rsv1:-rsv2:-rsv3")
- frm = netlib.websockets.Frame.from_bytes(tutils.render(wf))
- assert not frm.header.fin
- assert not frm.header.mask
- assert not frm.header.rsv1
- assert not frm.header.rsv2
- assert not frm.header.rsv3
-
- def fr(self, spec, **kwargs):
- settings = language.base.Settings(**kwargs)
- wf = parse_request(spec)
- return netlib.websockets.Frame.from_bytes(tutils.render(wf, settings))
-
- def test_construction(self):
- assert self.fr("wf:c1").header.opcode == 1
- assert self.fr("wf:c0").header.opcode == 0
- assert self.fr("wf:cbinary").header.opcode ==\
- netlib.websockets.OPCODE.BINARY
- assert self.fr("wf:ctext").header.opcode ==\
- netlib.websockets.OPCODE.TEXT
-
- def test_rawbody(self):
- frm = self.fr("wf:mask:r'foo'")
- assert len(frm.payload) == 3
- assert frm.payload != "foo"
-
- assert self.fr("wf:r'foo'").payload == "foo"
-
- def test_construction(self):
- # Simple server frame
- frm = self.fr("wf:b'foo'")
- assert not frm.header.mask
- assert not frm.header.masking_key
-
- # Simple client frame
- frm = self.fr("wf:b'foo'", is_client=True)
- assert frm.header.mask
- assert frm.header.masking_key
- frm = self.fr("wf:b'foo':k'abcd'", is_client=True)
- assert frm.header.mask
- assert frm.header.masking_key == 'abcd'
-
- # Server frame, mask explicitly set
- frm = self.fr("wf:b'foo':mask")
- assert frm.header.mask
- assert frm.header.masking_key
- frm = self.fr("wf:b'foo':k'abcd'")
- assert frm.header.mask
- assert frm.header.masking_key == 'abcd'
-
- # Client frame, mask explicitly unset
- frm = self.fr("wf:b'foo':-mask", is_client=True)
- assert not frm.header.mask
- assert not frm.header.masking_key
-
- frm = self.fr("wf:b'foo':-mask:k'abcd'", is_client=True)
- assert not frm.header.mask
- # We're reading back a corrupted frame - the first 3 characters of the
- # mask is mis-interpreted as the payload
- assert frm.payload == "abc"
-
- def test_knone(self):
- with tutils.raises("expected 4 bytes"):
- self.fr("wf:b'foo':mask:knone")
-
- def test_length(self):
- assert self.fr("wf:l3:b'foo'").header.payload_length == 3
- frm = self.fr("wf:l2:b'foo'")
- assert frm.header.payload_length == 2
- assert frm.payload == "fo"
- tutils.raises("expected 1024 bytes", self.fr, "wf:l1024:b'foo'")
diff --git a/test/test_language_writer.py b/test/test_language_writer.py
deleted file mode 100644
index 1a532903..00000000
--- a/test/test_language_writer.py
+++ /dev/null
@@ -1,91 +0,0 @@
-import cStringIO
-
-from libpathod import language
-from libpathod.language import writer
-
-
-def test_send_chunk():
- v = "foobarfoobar"
- for bs in range(1, len(v) + 2):
- s = cStringIO.StringIO()
- writer.send_chunk(s, v, bs, 0, len(v))
- assert s.getvalue() == v
- for start in range(len(v)):
- for end in range(len(v)):
- s = cStringIO.StringIO()
- writer.send_chunk(s, v, bs, start, end)
- assert s.getvalue() == v[start:end]
-
-
-def test_write_values_inject():
- tst = "foo"
-
- s = cStringIO.StringIO()
- writer.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5)
- assert s.getvalue() == "aaafoo"
-
- s = cStringIO.StringIO()
- writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
- assert s.getvalue() == "faaaoo"
-
- s = cStringIO.StringIO()
- writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5)
- assert s.getvalue() == "faaaoo"
-
-
-def test_write_values_disconnects():
- s = cStringIO.StringIO()
- tst = "foo" * 100
- writer.write_values(s, [tst], [(0, "disconnect")], blocksize=5)
- assert not s.getvalue()
-
-
-def test_write_values():
- tst = "foobarvoing"
- s = cStringIO.StringIO()
- writer.write_values(s, [tst], [])
- assert s.getvalue() == tst
-
- for bs in range(1, len(tst) + 2):
- for off in range(len(tst)):
- s = cStringIO.StringIO()
- writer.write_values(
- s, [tst], [(off, "disconnect")], blocksize=bs
- )
- assert s.getvalue() == tst[:off]
-
-
-def test_write_values_pauses():
- tst = "".join(str(i) for i in range(10))
- for i in range(2, 10):
- s = cStringIO.StringIO()
- writer.write_values(
- s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i
- )
- assert s.getvalue() == tst
-
- for i in range(2, 10):
- s = cStringIO.StringIO()
- writer.write_values(s, [tst], [(1, "pause", 0)], blocksize=i)
- assert s.getvalue() == tst
-
- tst = ["".join(str(i) for i in range(10))] * 5
- for i in range(2, 10):
- s = cStringIO.StringIO()
- writer.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i)
- assert s.getvalue() == "".join(tst)
-
-
-def test_write_values_after():
- s = cStringIO.StringIO()
- r = language.parse_pathod("400:da").next()
- language.serve(r, s, {})
-
- s = cStringIO.StringIO()
- r = language.parse_pathod("400:pa,0").next()
- language.serve(r, s, {})
-
- s = cStringIO.StringIO()
- r = language.parse_pathod("400:ia,'xx'").next()
- language.serve(r, s, {})
- assert s.getvalue().endswith('xx')
diff --git a/test/test_log.py b/test/test_log.py
deleted file mode 100644
index 8f38c040..00000000
--- a/test/test_log.py
+++ /dev/null
@@ -1,25 +0,0 @@
-import StringIO
-from libpathod import log
-from netlib.exceptions import TcpDisconnect
-import netlib.tcp
-
-
-class DummyIO(StringIO.StringIO):
-
- def start_log(self, *args, **kwargs):
- pass
-
- def get_log(self, *args, **kwargs):
- return ""
-
-
-def test_disconnect():
- outf = DummyIO()
- rw = DummyIO()
- l = log.ConnectionLogger(outf, False, rw, rw)
- try:
- with l.ctx() as lg:
- lg("Test")
- except TcpDisconnect:
- pass
- assert "Test" in outf.getvalue()
diff --git a/test/test_pathoc.py b/test/test_pathoc.py
deleted file mode 100644
index 62696a64..00000000
--- a/test/test_pathoc.py
+++ /dev/null
@@ -1,308 +0,0 @@
-import json
-import cStringIO
-import re
-import OpenSSL
-from mock import Mock
-
-from netlib import tcp, http, socks
-from netlib.exceptions import HttpException, TcpException, NetlibException
-from netlib.http import http1, http2
-
-from libpathod import pathoc, test, version, pathod, language
-from netlib.tutils import raises
-import tutils
-
-
-def test_response():
- r = http.Response("HTTP/1.1", 200, "Message", {}, None, None)
- assert repr(r)
-
-
-class _TestDaemon:
- ssloptions = pathod.SSLOptions()
-
- @classmethod
- def setup_class(cls):
- cls.d = test.Daemon(
- ssl=cls.ssl,
- ssloptions=cls.ssloptions,
- staticdir=tutils.test_data.path("data"),
- anchors=[
- (re.compile("/anchor/.*"), "202")
- ]
- )
-
- @classmethod
- def teardown_class(cls):
- cls.d.shutdown()
-
- def setUp(self):
- self.d.clear_log()
-
- def test_info(self):
- c = pathoc.Pathoc(
- ("127.0.0.1", self.d.port),
- ssl=self.ssl,
- fp=None
- )
- c.connect()
- resp = c.request("get:/api/info")
- assert tuple(json.loads(resp.content)["version"]) == version.IVERSION
-
- def tval(
- self,
- requests,
- showreq=False,
- showresp=False,
- explain=False,
- showssl=False,
- hexdump=False,
- timeout=None,
- ignorecodes=(),
- ignoretimeout=None,
- showsummary=True
- ):
- s = cStringIO.StringIO()
- c = pathoc.Pathoc(
- ("127.0.0.1", self.d.port),
- ssl=self.ssl,
- showreq=showreq,
- showresp=showresp,
- explain=explain,
- hexdump=hexdump,
- ignorecodes=ignorecodes,
- ignoretimeout=ignoretimeout,
- showsummary=showsummary,
- fp=s
- )
- c.connect(showssl=showssl, fp=s)
- if timeout:
- c.settimeout(timeout)
- for i in requests:
- r = language.parse_pathoc(i).next()
- if explain:
- r = r.freeze(language.Settings())
- try:
- c.request(r)
- except NetlibException:
- pass
- return s.getvalue()
-
-
-class TestDaemonSSL(_TestDaemon):
- ssl = True
- ssloptions = pathod.SSLOptions(
- request_client_cert=True,
- sans=["test1.com", "test2.com"],
- alpn_select=b'h2',
- )
-
- def test_sni(self):
- c = pathoc.Pathoc(
- ("127.0.0.1", self.d.port),
- ssl=True,
- sni="foobar.com",
- fp=None
- )
- c.connect()
- c.request("get:/p/200")
- r = c.request("get:/api/log")
- d = json.loads(r.content)
- assert d["log"][0]["request"]["sni"] == "foobar.com"
-
- def test_showssl(self):
- assert "certificate chain" in self.tval(["get:/p/200"], showssl=True)
-
- def test_clientcert(self):
- c = pathoc.Pathoc(
- ("127.0.0.1", self.d.port),
- ssl=True,
- clientcert=tutils.test_data.path("data/clientcert/client.pem"),
- fp=None
- )
- c.connect()
- c.request("get:/p/200")
- r = c.request("get:/api/log")
- d = json.loads(r.content)
- assert d["log"][0]["request"]["clientcert"]["keyinfo"]
-
- def test_http2_without_ssl(self):
- fp = cStringIO.StringIO()
- c = pathoc.Pathoc(
- ("127.0.0.1", self.d.port),
- use_http2=True,
- ssl=False,
- fp = fp
- )
- tutils.raises(NotImplementedError, c.connect)
-
-
-class TestDaemon(_TestDaemon):
- ssl = False
-
- def test_ssl_error(self):
- c = pathoc.Pathoc(("127.0.0.1", self.d.port), ssl=True, fp=None)
- tutils.raises("ssl handshake", c.connect)
-
- def test_showssl(self):
- assert not "certificate chain" in self.tval(
- ["get:/p/200"],
- showssl=True)
-
- def test_ignorecodes(self):
- assert "200" in self.tval(["get:'/p/200:b@1'"])
- assert "200" in self.tval(["get:'/p/200:b@1'"])
- assert "200" in self.tval(["get:'/p/200:b@1'"])
- assert "200" not in self.tval(["get:'/p/200:b@1'"], ignorecodes=[200])
- assert "200" not in self.tval(
- ["get:'/p/200:b@1'"],
- ignorecodes=[
- 200,
- 201])
- assert "202" in self.tval(["get:'/p/202:b@1'"], ignorecodes=[200, 201])
-
- def test_timeout(self):
- assert "Timeout" in self.tval(["get:'/p/200:p0,100'"], timeout=0.01)
- assert "HTTP" in self.tval(
- ["get:'/p/200:p5,100'"],
- showresp=True,
- timeout=1
- )
- assert not "HTTP" in self.tval(
- ["get:'/p/200:p3,100'"],
- showresp=True,
- timeout=1,
- ignoretimeout=True
- )
-
- def test_showresp(self):
- reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"]
- assert self.tval(reqs).count("200") == 2
- assert self.tval(reqs, showresp=True).count("HTTP/1.1 200 OK") == 2
- assert self.tval(
- reqs, showresp=True, hexdump=True
- ).count("0000000000") == 2
-
- def test_showresp_httperr(self):
- v = self.tval(["get:'/p/200:d20'"], showresp=True, showsummary=True)
- assert "Invalid headers" in v
- assert "HTTP/" in v
-
- def test_explain(self):
- reqs = ["get:/p/200:b@100"]
- assert "b@100" not in self.tval(reqs, explain=True)
-
- def test_showreq(self):
- reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"]
- assert self.tval(reqs, showreq=True).count("GET /api") == 2
- assert self.tval(
- reqs, showreq=True, hexdump=True
- ).count("0000000000") == 2
-
- def test_conn_err(self):
- assert "Invalid server response" in self.tval(["get:'/p/200:d2'"])
-
- def test_websocket_shutdown(self):
- c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
- c.connect()
- c.request("ws:/")
- c.stop()
-
- def test_wait_finish(self):
- c = pathoc.Pathoc(
- ("127.0.0.1", self.d.port),
- fp=None,
- ws_read_limit=1
- )
- c.connect()
- c.request("ws:/")
- c.request("wf:f'wf:x100'")
- [i for i in c.wait(timeout=0, finish=False)]
- [i for i in c.wait(timeout=0)]
-
- def test_connect_fail(self):
- to = ("foobar", 80)
- c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
- c.rfile, c.wfile = cStringIO.StringIO(), cStringIO.StringIO()
- with raises("connect failed"):
- c.http_connect(to)
- c.rfile = cStringIO.StringIO(
- "HTTP/1.1 500 OK\r\n"
- )
- with raises("connect failed"):
- c.http_connect(to)
- c.rfile = cStringIO.StringIO(
- "HTTP/1.1 200 OK\r\n"
- )
- c.http_connect(to)
-
- def test_socks_connect(self):
- to = ("foobar", 80)
- c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
- c.rfile, c.wfile = tutils.treader(""), cStringIO.StringIO()
- tutils.raises(pathoc.PathocError, c.socks_connect, to)
-
- c.rfile = tutils.treader(
- "\x05\xEE"
- )
- tutils.raises("SOCKS without authentication", c.socks_connect, ("example.com", 0xDEAD))
-
- c.rfile = tutils.treader(
- "\x05\x00" +
- "\x05\xEE\x00\x03\x0bexample.com\xDE\xAD"
- )
- tutils.raises("SOCKS server error", c.socks_connect, ("example.com", 0xDEAD))
-
- c.rfile = tutils.treader(
- "\x05\x00" +
- "\x05\x00\x00\x03\x0bexample.com\xDE\xAD"
- )
- c.socks_connect(("example.com", 0xDEAD))
-
-
-class TestDaemonHTTP2(_TestDaemon):
- ssl = True
-
- if OpenSSL._util.lib.Cryptography_HAS_ALPN:
-
- def test_http2(self):
- c = pathoc.Pathoc(
- ("127.0.0.1", self.d.port),
- fp=None,
- ssl=True,
- use_http2=True,
- )
- assert isinstance(c.protocol, http2.HTTP2Protocol)
-
- c = pathoc.Pathoc(
- ("127.0.0.1", self.d.port),
- )
- assert c.protocol == http1
-
- def test_http2_alpn(self):
- c = pathoc.Pathoc(
- ("127.0.0.1", self.d.port),
- fp=None,
- ssl=True,
- use_http2=True,
- http2_skip_connection_preface=True,
- )
-
- tmp_convert_to_ssl = c.convert_to_ssl
- c.convert_to_ssl = Mock()
- c.convert_to_ssl.side_effect = tmp_convert_to_ssl
- c.connect()
-
- _, kwargs = c.convert_to_ssl.call_args
- assert set(kwargs['alpn_protos']) == set([b'http/1.1', b'h2'])
-
- def test_request(self):
- c = pathoc.Pathoc(
- ("127.0.0.1", self.d.port),
- fp=None,
- ssl=True,
- use_http2=True,
- )
- c.connect()
- resp = c.request("get:/p/200")
- assert resp.status_code == 200
diff --git a/test/test_pathoc_cmdline.py b/test/test_pathoc_cmdline.py
deleted file mode 100644
index 74dfef57..00000000
--- a/test/test_pathoc_cmdline.py
+++ /dev/null
@@ -1,59 +0,0 @@
-from libpathod import pathoc_cmdline as cmdline
-import tutils
-import cStringIO
-import mock
-
-
-@mock.patch("argparse.ArgumentParser.error")
-def test_pathoc(perror):
- assert cmdline.args_pathoc(["pathoc", "foo.com", "get:/"])
- s = cStringIO.StringIO()
- with tutils.raises(SystemExit):
- cmdline.args_pathoc(["pathoc", "--show-uas"], s, s)
-
- a = cmdline.args_pathoc(["pathoc", "foo.com:8888", "get:/"])
- assert a.port == 8888
-
- a = cmdline.args_pathoc(["pathoc", "foo.com:xxx", "get:/"])
- assert perror.called
- perror.reset_mock()
-
- a = cmdline.args_pathoc(["pathoc", "-I", "10, 20", "foo.com:8888", "get:/"])
- assert a.ignorecodes == [10, 20]
-
- a = cmdline.args_pathoc(["pathoc", "-I", "xx, 20", "foo.com:8888", "get:/"])
- assert perror.called
- perror.reset_mock()
-
- a = cmdline.args_pathoc(["pathoc", "-c", "foo:10", "foo.com:8888", "get:/"])
- assert a.connect_to == ["foo", 10]
-
- a = cmdline.args_pathoc(["pathoc", "foo.com", "get:/", "--http2"])
- assert a.use_http2 == True
- assert a.ssl == True
-
- a = cmdline.args_pathoc(["pathoc", "foo.com", "get:/", "--http2-skip-connection-preface"])
- assert a.use_http2 == True
- assert a.ssl == True
- assert a.http2_skip_connection_preface == True
-
- a = cmdline.args_pathoc(["pathoc", "-c", "foo", "foo.com:8888", "get:/"])
- assert perror.called
- perror.reset_mock()
-
- a = cmdline.args_pathoc(
- ["pathoc", "-c", "foo:bar", "foo.com:8888", "get:/"])
- assert perror.called
- perror.reset_mock()
-
- a = cmdline.args_pathoc(
- [
- "pathoc",
- "foo.com:8888",
- tutils.test_data.path("data/request")
- ]
- )
- assert len(list(a.requests)) == 1
-
- with tutils.raises(SystemExit):
- cmdline.args_pathoc(["pathoc", "foo.com", "invalid"], s, s)
diff --git a/test/test_pathod.py b/test/test_pathod.py
deleted file mode 100644
index 98da7d28..00000000
--- a/test/test_pathod.py
+++ /dev/null
@@ -1,289 +0,0 @@
-import sys
-import cStringIO
-import OpenSSL
-
-from libpathod import pathod, version
-from netlib import tcp, http
-from netlib.exceptions import HttpException, TlsException
-import tutils
-
-
-class TestPathod(object):
-
- def test_logging(self):
- s = cStringIO.StringIO()
- p = pathod.Pathod(("127.0.0.1", 0), logfp=s)
- assert len(p.get_log()) == 0
- id = p.add_log(dict(s="foo"))
- assert p.log_by_id(id)
- assert len(p.get_log()) == 1
- p.clear_log()
- assert len(p.get_log()) == 0
-
- for _ in range(p.LOGBUF + 1):
- p.add_log(dict(s="foo"))
- assert len(p.get_log()) <= p.LOGBUF
-
-
-class TestNoWeb(tutils.DaemonTests):
- noweb = True
-
- def test_noweb(self):
- assert self.get("200:da").status_code == 200
- assert self.getpath("/").status_code == 800
-
-
-class TestTimeout(tutils.DaemonTests):
- timeout = 0.01
-
- def test_noweb(self):
- # FIXME: Add float values to spec language, reduce test timeout to
- # increase test performance
- # This is a bodge - we have some platform difference that causes
- # different exceptions to be raised here.
- tutils.raises(Exception, self.pathoc, ["get:/:p1,1"])
- assert self.d.last_log()["type"] == "timeout"
-
-
-class TestNoApi(tutils.DaemonTests):
- noapi = True
-
- def test_noapi(self):
- assert self.getpath("/log").status_code == 404
- r = self.getpath("/")
- assert r.status_code == 200
- assert not "Log" in r.content
-
-
-class TestNotAfterConnect(tutils.DaemonTests):
- ssl = False
- ssloptions = dict(
- not_after_connect=True
- )
-
- def test_connect(self):
- r, _ = self.pathoc(
- [r"get:'http://foo.com/p/202':da"],
- connect_to=("localhost", self.d.port)
- )
- assert r[0].status_code == 202
-
-
-class TestCustomCert(tutils.DaemonTests):
- ssl = True
- ssloptions = dict(
- certs=[("*", tutils.test_data.path("data/testkey.pem"))],
- )
-
- def test_connect(self):
- r, _ = self.pathoc([r"get:/p/202"])
- r = r[0]
- assert r.status_code == 202
- assert r.sslinfo
- assert "test.com" in str(r.sslinfo.certchain[0].get_subject())
-
-
-class TestSSLCN(tutils.DaemonTests):
- ssl = True
- ssloptions = dict(
- cn="foo.com"
- )
-
- def test_connect(self):
- r, _ = self.pathoc([r"get:/p/202"])
- r = r[0]
- assert r.status_code == 202
- assert r.sslinfo
- assert r.sslinfo.certchain[0].get_subject().CN == "foo.com"
-
-
-class TestNohang(tutils.DaemonTests):
- nohang = True
-
- def test_nohang(self):
- r = self.get("200:p0,0")
- assert r.status_code == 800
- l = self.d.last_log()
- assert "Pauses have been disabled" in l["response"]["msg"]
-
-
-class TestHexdump(tutils.DaemonTests):
- hexdump = True
-
- def test_hexdump(self):
- r = self.get(r"200:b'\xf0'")
-
-
-class TestNocraft(tutils.DaemonTests):
- nocraft = True
-
- def test_nocraft(self):
- r = self.get(r"200:b'\xf0'")
- assert r.status_code == 800
- assert "Crafting disabled" in r.content
-
-
-class CommonTests(tutils.DaemonTests):
-
- def test_binarydata(self):
- r = self.get(r"200:b'\xf0'")
- l = self.d.last_log()
- # FIXME: Other binary data elements
-
- def test_sizelimit(self):
- r = self.get("200:b@1g")
- assert r.status_code == 800
- l = self.d.last_log()
- assert "too large" in l["response"]["msg"]
-
- def test_preline(self):
- r, _ = self.pathoc([r"get:'/p/200':i0,'\r\n'"])
- assert r[0].status_code == 200
-
- def test_info(self):
- assert tuple(self.d.info()["version"]) == version.IVERSION
-
- def test_logs(self):
- assert self.d.clear_log()
- assert not self.d.last_log()
- rsp = self.get("202:da")
- assert len(self.d.log()) == 1
- assert self.d.clear_log()
- assert len(self.d.log()) == 0
-
- def test_disconnect(self):
- rsp = self.get("202:b@100k:d200")
- assert len(rsp.content) < 200
-
- def test_parserr(self):
- rsp = self.get("400:msg,b:")
- assert rsp.status_code == 800
-
- def test_static(self):
- rsp = self.get("200:b<file")
- assert rsp.status_code == 200
- assert rsp.content.strip() == "testfile"
-
- def test_anchor(self):
- rsp = self.getpath("anchor/foo")
- assert rsp.status_code == 202
-
- def test_invalid_first_line(self):
- c = tcp.TCPClient(("localhost", self.d.port))
- c.connect()
- if self.ssl:
- c.convert_to_ssl()
- c.wfile.write("foo\n\n\n")
- c.wfile.flush()
- l = self.d.last_log()
- assert l["type"] == "error"
- assert "foo" in l["msg"]
-
- def test_invalid_content_length(self):
- tutils.raises(
- HttpException,
- self.pathoc,
- ["get:/:h'content-length'='foo'"]
- )
- l = self.d.last_log()
- assert l["type"] == "error"
- assert "Unparseable Content Length" in l["msg"]
-
- def test_invalid_headers(self):
- tutils.raises(HttpException, self.pathoc, ["get:/:h'\t'='foo'"])
- l = self.d.last_log()
- assert l["type"] == "error"
- assert "Invalid headers" in l["msg"]
-
- def test_access_denied(self):
- rsp = self.get("=nonexistent")
- assert rsp.status_code == 800
-
- def test_source_access_denied(self):
- rsp = self.get("200:b</foo")
- assert rsp.status_code == 800
- assert "File access denied" in rsp.content
-
- def test_proxy(self):
- r, _ = self.pathoc([r"get:'http://foo.com/p/202':da"])
- assert r[0].status_code == 202
-
- def test_websocket(self):
- r, _ = self.pathoc(["ws:/p/"], ws_read_limit=0)
- assert r[0].status_code == 101
-
- r, _ = self.pathoc(["ws:/p/ws"], ws_read_limit=0)
- assert r[0].status_code == 101
-
- def test_websocket_frame(self):
- r, _ = self.pathoc(
- ["ws:/p/", "wf:f'wf:b\"test\"':pa,1"],
- ws_read_limit=1
- )
- assert r[1].payload == "test"
-
- def test_websocket_frame_reflect_error(self):
- r, _ = self.pathoc(
- ["ws:/p/", "wf:-mask:knone:f'wf:b@10':i13,'a'"],
- ws_read_limit=1,
- timeout=1
- )
- # FIXME: Race Condition?
- assert "Parse error" in self.d.text_log()
-
- def test_websocket_frame_disconnect_error(self):
- self.pathoc(["ws:/p/", "wf:b@10:d3"], ws_read_limit=0)
- assert self.d.last_log()
-
-
-class TestDaemon(CommonTests):
- ssl = False
-
- def test_connect(self):
- r, _ = self.pathoc(
- [r"get:'http://foo.com/p/202':da"],
- connect_to=("localhost", self.d.port),
- ssl=True
- )
- assert r[0].status_code == 202
-
- def test_connect_err(self):
- tutils.raises(
- HttpException,
- self.pathoc,
- [r"get:'http://foo.com/p/202':da"],
- connect_to=("localhost", self.d.port)
- )
-
-
-class TestDaemonSSL(CommonTests):
- ssl = True
-
- def test_ssl_conn_failure(self):
- c = tcp.TCPClient(("localhost", self.d.port))
- c.rbufsize = 0
- c.wbufsize = 0
- c.connect()
- c.wfile.write("\0\0\0\0")
- tutils.raises(TlsException, c.convert_to_ssl)
- l = self.d.last_log()
- assert l["type"] == "error"
- assert "SSL" in l["msg"]
-
- def test_ssl_cipher(self):
- r, _ = self.pathoc([r"get:/p/202"])
- assert r[0].status_code == 202
- assert self.d.last_log()["cipher"][1] > 0
-
-
-class TestHTTP2(tutils.DaemonTests):
- ssl = True
- noweb = True
- noapi = True
- nohang = True
-
- if OpenSSL._util.lib.Cryptography_HAS_ALPN:
-
- def test_http2(self):
- r, _ = self.pathoc(["GET:/"], ssl=True, use_http2=True)
- assert r[0].status_code == 800
diff --git a/test/test_pathod_cmdline.py b/test/test_pathod_cmdline.py
deleted file mode 100644
index 829c4b32..00000000
--- a/test/test_pathod_cmdline.py
+++ /dev/null
@@ -1,85 +0,0 @@
-from libpathod import pathod_cmdline as cmdline
-import tutils
-import cStringIO
-import mock
-
-
-@mock.patch("argparse.ArgumentParser.error")
-def test_pathod(perror):
- assert cmdline.args_pathod(["pathod"])
-
- a = cmdline.args_pathod(
- [
- "pathod",
- "--cert",
- tutils.test_data.path("data/testkey.pem")
- ]
- )
- assert a.ssl_certs
-
- a = cmdline.args_pathod(
- [
- "pathod",
- "--cert",
- "nonexistent"
- ]
- )
- assert perror.called
- perror.reset_mock()
-
- a = cmdline.args_pathod(
- [
- "pathod",
- "-a",
- "foo=200"
- ]
- )
- assert a.anchors
-
- a = cmdline.args_pathod(
- [
- "pathod",
- "-a",
- "foo=" + tutils.test_data.path("data/response")
- ]
- )
- assert a.anchors
-
- a = cmdline.args_pathod(
- [
- "pathod",
- "-a",
- "?=200"
- ]
- )
- assert perror.called
- perror.reset_mock()
-
- a = cmdline.args_pathod(
- [
- "pathod",
- "-a",
- "foo"
- ]
- )
- assert perror.called
- perror.reset_mock()
-
- a = cmdline.args_pathod(
- [
- "pathod",
- "--limit-size",
- "200k"
- ]
- )
- assert a.sizelimit
-
- a = cmdline.args_pathod(
- [
- "pathod",
- "--limit-size",
- "q"
- ]
- )
- assert perror.called
- perror.reset_mock()
diff --git a/test/test_test.py b/test/test_test.py
deleted file mode 100644
index bd92d864..00000000
--- a/test/test_test.py
+++ /dev/null
@@ -1,45 +0,0 @@
-import logging
-import requests
-from libpathod import test
-import tutils
-logging.disable(logging.CRITICAL)
-
-
-class TestDaemonManual:
-
- def test_simple(self):
- with test.Daemon() as d:
- rsp = requests.get("http://localhost:%s/p/202:da" % d.port)
- assert rsp.ok
- assert rsp.status_code == 202
- with tutils.raises(requests.ConnectionError):
- requests.get("http://localhost:%s/p/202:da" % d.port)
-
- def test_startstop_ssl(self):
- d = test.Daemon(ssl=True)
- rsp = requests.get(
- "https://localhost:%s/p/202:da" %
- d.port,
- verify=False)
- assert rsp.ok
- assert rsp.status_code == 202
- d.shutdown()
- with tutils.raises(requests.ConnectionError):
- requests.get("http://localhost:%s/p/202:da" % d.port)
-
- def test_startstop_ssl_explicit(self):
- ssloptions = dict(
- certfile=tutils.test_data.path("data/testkey.pem"),
- cacert=tutils.test_data.path("data/testkey.pem"),
- ssl_after_connect=False
- )
- d = test.Daemon(ssl=ssloptions)
- rsp = requests.get(
- "https://localhost:%s/p/202:da" %
- d.port,
- verify=False)
- assert rsp.ok
- assert rsp.status_code == 202
- d.shutdown()
- with tutils.raises(requests.ConnectionError):
- requests.get("http://localhost:%s/p/202:da" % d.port)
diff --git a/test/test_utils.py b/test/test_utils.py
deleted file mode 100644
index 7d24e9e4..00000000
--- a/test/test_utils.py
+++ /dev/null
@@ -1,39 +0,0 @@
-from libpathod import utils
-import tutils
-
-
-def test_membool():
- m = utils.MemBool()
- assert not m.v
- assert m(1)
- assert m.v == 1
- assert m(2)
- assert m.v == 2
-
-
-def test_parse_size():
- assert utils.parse_size("100") == 100
- assert utils.parse_size("100k") == 100 * 1024
- tutils.raises("invalid size spec", utils.parse_size, "foo")
- tutils.raises("invalid size spec", utils.parse_size, "100kk")
-
-
-def test_parse_anchor_spec():
- assert utils.parse_anchor_spec("foo=200") == ("foo", "200")
- assert utils.parse_anchor_spec("foo") is None
-
-
-def test_data_path():
- tutils.raises(ValueError, utils.data.path, "nonexistent")
-
-
-def test_inner_repr():
- assert utils.inner_repr("\x66") == "\x66"
- assert utils.inner_repr(u"foo") == "foo"
-
-
-def test_escape_unprintables():
- s = "".join([chr(i) for i in range(255)])
- e = utils.escape_unprintables(s)
- assert e.encode('ascii')
- assert not "PATHOD_MARKER" in e
diff --git a/test/tutils.py b/test/tutils.py
deleted file mode 100644
index 664cdd52..00000000
--- a/test/tutils.py
+++ /dev/null
@@ -1,128 +0,0 @@
-import tempfile
-import os
-import re
-import shutil
-import cStringIO
-from contextlib import contextmanager
-
-import netlib
-from libpathod import utils, test, pathoc, pathod, language
-from netlib import tcp
-import requests
-
-def treader(bytes):
- """
- Construct a tcp.Read object from bytes.
- """
- fp = cStringIO.StringIO(bytes)
- return tcp.Reader(fp)
-
-
-class DaemonTests(object):
- noweb = False
- noapi = False
- nohang = False
- ssl = False
- timeout = None
- hexdump = False
- ssloptions = None
- nocraft = False
-
- @classmethod
- def setup_class(cls):
- opts = cls.ssloptions or {}
- cls.confdir = tempfile.mkdtemp()
- opts["confdir"] = cls.confdir
- so = pathod.SSLOptions(**opts)
- cls.d = test.Daemon(
- staticdir=test_data.path("data"),
- anchors=[
- (re.compile("/anchor/.*"), "202:da")
- ],
- ssl=cls.ssl,
- ssloptions=so,
- sizelimit=1 * 1024 * 1024,
- noweb=cls.noweb,
- noapi=cls.noapi,
- nohang=cls.nohang,
- timeout=cls.timeout,
- hexdump=cls.hexdump,
- nocraft=cls.nocraft,
- logreq=True,
- logresp=True,
- explain=True
- )
-
- @classmethod
- def teardown_class(cls):
- cls.d.shutdown()
- shutil.rmtree(cls.confdir)
-
- def teardown(self):
- if not (self.noweb or self.noapi):
- self.d.clear_log()
-
- def getpath(self, path, params=None):
- scheme = "https" if self.ssl else "http"
- resp = requests.get(
- "%s://localhost:%s/%s" % (
- scheme,
- self.d.port,
- path
- ),
- verify=False,
- params=params
- )
- return resp
-
- def get(self, spec):
- resp = requests.get(self.d.p(spec), verify=False)
- return resp
-
- def pathoc(
- self,
- specs,
- timeout=None,
- connect_to=None,
- ssl=None,
- ws_read_limit=None,
- use_http2=False,
- ):
- """
- Returns a (messages, text log) tuple.
- """
- if ssl is None:
- ssl = self.ssl
- logfp = cStringIO.StringIO()
- c = pathoc.Pathoc(
- ("localhost", self.d.port),
- ssl=ssl,
- ws_read_limit=ws_read_limit,
- timeout=timeout,
- fp=logfp,
- use_http2=use_http2,
- )
- c.connect(connect_to)
- ret = []
- for i in specs:
- resp = c.request(i)
- if resp:
- ret.append(resp)
- for frm in c.wait():
- ret.append(frm)
- c.stop()
- return ret, logfp.getvalue()
-
-
-tmpdir = netlib.tutils.tmpdir
-
-raises = netlib.tutils.raises
-
-test_data = utils.Data(__name__)
-
-
-def render(r, settings=language.Settings()):
- r = r.resolve(settings)
- s = cStringIO.StringIO()
- assert language.serve(r, s, settings)
- return s.getvalue()