diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/data/clientcert/.gitignore | 3 | ||||
-rw-r--r-- | test/data/clientcert/client.cnf | 5 | ||||
-rw-r--r-- | test/data/clientcert/client.pem | 42 | ||||
-rwxr-xr-x | test/data/clientcert/make | 8 | ||||
-rw-r--r-- | test/data/file | 1 | ||||
-rw-r--r-- | test/data/request | 1 | ||||
-rw-r--r-- | test/data/response | 1 | ||||
-rw-r--r-- | test/data/testkey.pem | 68 | ||||
-rwxr-xr-x | test/scripts/generate.sh | 17 | ||||
-rw-r--r-- | test/scripts/openssl.cnf | 39 | ||||
-rw-r--r-- | test/test_app.py | 85 | ||||
-rw-r--r-- | test/test_language_actions.py | 135 | ||||
-rw-r--r-- | test/test_language_base.py | 352 | ||||
-rw-r--r-- | test/test_language_generators.py | 42 | ||||
-rw-r--r-- | test/test_language_http.py | 358 | ||||
-rw-r--r-- | test/test_language_http2.py | 233 | ||||
-rw-r--r-- | test/test_language_websocket.py | 142 | ||||
-rw-r--r-- | test/test_language_writer.py | 91 | ||||
-rw-r--r-- | test/test_log.py | 25 | ||||
-rw-r--r-- | test/test_pathoc.py | 308 | ||||
-rw-r--r-- | test/test_pathoc_cmdline.py | 59 | ||||
-rw-r--r-- | test/test_pathod.py | 289 | ||||
-rw-r--r-- | test/test_pathod_cmdline.py | 85 | ||||
-rw-r--r-- | test/test_test.py | 45 | ||||
-rw-r--r-- | test/test_utils.py | 39 | ||||
-rw-r--r-- | test/tutils.py | 128 |
26 files changed, 0 insertions, 2601 deletions
diff --git a/test/data/clientcert/.gitignore b/test/data/clientcert/.gitignore deleted file mode 100644 index 07bc53d2..00000000 --- a/test/data/clientcert/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -client.crt -client.key -client.req diff --git a/test/data/clientcert/client.cnf b/test/data/clientcert/client.cnf deleted file mode 100644 index 5046a944..00000000 --- a/test/data/clientcert/client.cnf +++ /dev/null @@ -1,5 +0,0 @@ -[ ssl_client ] -basicConstraints = CA:FALSE -nsCertType = client -keyUsage = digitalSignature, keyEncipherment -extendedKeyUsage = clientAuth diff --git a/test/data/clientcert/client.pem b/test/data/clientcert/client.pem deleted file mode 100644 index 4927bca2..00000000 --- a/test/data/clientcert/client.pem +++ /dev/null @@ -1,42 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIEpAIBAAKCAQEAzCpoRjSTfIN24kkNap/GYmP9zVWj0Gk8R5BB/PvvN0OB1Zk0 -EEYPsWCcuhEdK0ehiDZX030doF0DOncKKa6mop/d0x2o+ts42peDhZM6JNUrm6d+ -ZWQVtio33mpp77UMhR093vaA+ExDnmE26kBTVijJ1+fRAVDXG/cmQINEri91Kk/G -3YJ5e45UrohGI5seBZ4vV0xbHtmczFRhYFlGOvYsoIe4Lvz/eFS2pIrTIpYQ2VM/ -SQQl+JFy+NlQRsWG2NrxtKOzMnnDE7YN4I3z5D5eZFo1EtwZ48LNCeSwrEOdfuzP -G5q5qbs5KpE/x85H9umuRwSCIArbMwBYV8a8JwIDAQABAoIBAFE3FV/IDltbmHEP -iky93hbJm+6QgKepFReKpRVTyqb7LaygUvueQyPWQMIriKTsy675nxo8DQr7tQsO -y3YlSZgra/xNMikIB6e82c7K8DgyrDQw/rCqjZB3Xt4VCqsWJDLXnQMSn98lx0g7 -d7Lbf8soUpKWXqfdVpSDTi4fibSX6kshXyfSTpcz4AdoncEpViUfU1xkEEmZrjT8 -1GcCsDC41xdNmzCpqRuZX7DKSFRoB+0hUzsC1oiqM7FD5kixonRd4F5PbRXImIzt -6YCsT2okxTA04jX7yByis7LlOLTlkmLtKQYuc3erOFvwx89s4vW+AeFei+GGNitn -tHfSwbECgYEA7SzV+nN62hAERHlg8cEQT4TxnsWvbronYWcc/ev44eHSPDWL5tPi -GHfSbW6YAq5Wa0I9jMWfXyhOYEC3MZTC5EEeLOB71qVrTwcy/sY66rOrcgjFI76Q -5JFHQ4wy3SWU50KxE0oWJO9LIowprG+pW1vzqC3VF0T7q0FqESrY4LUCgYEA3F7Z -80ndnCUlooJAb+Hfotv7peFf1o6+m1PTRcz1lLnVt5R5lXj86kn+tXEpYZo1RiGR -2rE2N0seeznWCooakHcsBN7/qmFIhhooJNF7yW+JP2I4P2UV5+tJ+8bcs/voUkQD -1x+rGOuMn8nvHBd2+Vharft8eGL2mgooPVI2XusCgYEAlMZpO3+w8pTVeHaDP2MR -7i/AuQ3cbCLNjSX3Y7jgGCFllWspZRRIYXzYPNkA9b2SbBnTLjjRLgnEkFBIGgvs -7O2EFjaCuDRvydUEQhjq4ErwIsopj7B8h0QyZcbOKTbn3uFQ3n68wVJx2Sv/ADHT -FIHrp/WIE96r19Niy34LKXkCgYB2W59VsuOKnMz01l5DeR5C+0HSWxS9SReIl2IO -yEFSKullWyJeLIgyUaGy0990430feKI8whcrZXYumuah7IDN/KOwzhCk8vEfzWao -N7bzfqtJVrh9HA7C7DVlO+6H4JFrtcoWPZUIomJ549w/yz6EN3ckoMC+a/Ck1TW9 -ka1QFwKBgQCywG6TrZz0UmOjyLQZ+8Q4uvZklSW5NAKBkNnyuQ2kd5rzyYgMPE8C -Er8T88fdVIKvkhDyHhwcI7n58xE5Gr7wkwsrk/Hbd9/ZB2GgAPY3cATskK1v1McU -YeX38CU0fUS4aoy26hWQXkViB47IGQ3jWo3ZCtzIJl8DI9/RsBWTnw== ------END RSA PRIVATE KEY----- ------BEGIN CERTIFICATE----- -MIICYDCCAckCAQEwDQYJKoZIhvcNAQEFBQAwKDESMBAGA1UEAxMJbWl0bXByb3h5 -MRIwEAYDVQQKEwltaXRtcHJveHkwHhcNMTMwMTIwMDEwODEzWhcNMTUxMDE3MDEw -ODEzWjBFMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UE -ChMYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOC -AQ8AMIIBCgKCAQEAzCpoRjSTfIN24kkNap/GYmP9zVWj0Gk8R5BB/PvvN0OB1Zk0 -EEYPsWCcuhEdK0ehiDZX030doF0DOncKKa6mop/d0x2o+ts42peDhZM6JNUrm6d+ -ZWQVtio33mpp77UMhR093vaA+ExDnmE26kBTVijJ1+fRAVDXG/cmQINEri91Kk/G -3YJ5e45UrohGI5seBZ4vV0xbHtmczFRhYFlGOvYsoIe4Lvz/eFS2pIrTIpYQ2VM/ -SQQl+JFy+NlQRsWG2NrxtKOzMnnDE7YN4I3z5D5eZFo1EtwZ48LNCeSwrEOdfuzP -G5q5qbs5KpE/x85H9umuRwSCIArbMwBYV8a8JwIDAQABMA0GCSqGSIb3DQEBBQUA -A4GBAFvI+cd47B85PQ970n2dU/PlA2/Hb1ldrrXh2guR4hX6vYx/uuk5yRI/n0Rd -KOXJ3czO0bd2Fpe3ZoNpkW0pOSDej/Q+58ScuJd0gWCT/Sh1eRk6ZdC0kusOuWoY -bPOPMkG45LPgUMFOnZEsfJP6P5mZIxlbCvSMFC25nPHWlct7 ------END CERTIFICATE----- diff --git a/test/data/clientcert/make b/test/data/clientcert/make deleted file mode 100755 index d1caea81..00000000 --- a/test/data/clientcert/make +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/sh - -openssl genrsa -out client.key 2048 -openssl req -key client.key -new -out client.req -openssl x509 -req -days 365 -in client.req -signkey client.key -out client.crt -extfile client.cnf -extensions ssl_client -openssl x509 -req -days 1000 -in client.req -CA ~/.mitmproxy/mitmproxy-ca.pem -CAkey ~/.mitmproxy/mitmproxy-ca.pem -set_serial 00001 -out client.crt -extensions ssl_client -cat client.key client.crt > client.pem -openssl x509 -text -noout -in client.pem diff --git a/test/data/file b/test/data/file deleted file mode 100644 index 26918572..00000000 --- a/test/data/file +++ /dev/null @@ -1 +0,0 @@ -testfile diff --git a/test/data/request b/test/data/request deleted file mode 100644 index c4c90e76..00000000 --- a/test/data/request +++ /dev/null @@ -1 +0,0 @@ -get:/foo diff --git a/test/data/response b/test/data/response deleted file mode 100644 index 8f897c85..00000000 --- a/test/data/response +++ /dev/null @@ -1 +0,0 @@ -202 diff --git a/test/data/testkey.pem b/test/data/testkey.pem deleted file mode 100644 index b804bd4c..00000000 --- a/test/data/testkey.pem +++ /dev/null @@ -1,68 +0,0 @@ ------BEGIN RSA PRIVATE KEY----- -MIIG5QIBAAKCAYEAwvtKxoZvBV2AxPAkCx8PXbuE7KeqK9bBvk8x+JchPMdf/KZj -sdu2v6Gm8Hi053i7ZGxouFvonJxHAiK6cwk9OYQwa9fbOFf2mgWKEBO4fbCH93tW -DCTdWVxFyNViAvxGHlJs3/IU03pIG29AgUnhRW8pGbabAfx8emcOZJZ3ykEuimaC -4s7mRwdc63GXnbcjTtRkrJsBATI+xvPwuR2+4daX7sPCf0kel3bN2jMpwXfvk/Ww -kJ2BIEeZCg0qIvyMjH9qrUirUnsmQnpPln0CGBbQEBsW9yMfGoFdREiMYls5jZeq -NxjWNv1RTRIm/4RjMwyxnoTA9eDS9wwO2NnJS4vfXAnUTP4BYx8Pe4ZMA2Gm6YrC -ysT6YA1xdHNpcuHXClxwmPj/cm8Z5kIg5clbNIK60ts9yFr/Ao3KPPYJ2GBv8/Oe -ApPBJuubews+/9/13Ew/SJ1t2u28+sPbgXUG8dC2n4vWTvJwKf6Duqxgnm82zdzj -SZoXRQsP984qiN7NAgMBAAECggGBALB6rqWdzCL5DLI0AQun40qdjaR95UKksNvF -5p7we379nl2ZZKb5DSHJ+MWzG1pfJo2wqeAkIBiQQp0mPcgdVrMWeJVD3QHUbDng -RaRjlRr+izJvCeUYANj+8ZLjwECfgf+z7yOLg1oeVeGvAp2C90jXYkYJx6c2lpxb -ZuWYY3hHIw7V1iXfywIDIhFg0TBJMMYK68xmx7QDfFqrNPj4eWsDxqSvvv1iezPw -rkWPBX49RjWPrW5XgSZsZ5J3c+oS1rZmIY7EAgopTWB/3wJjZR1Idz/9l9LIWlBP -6zVC27CIZzSEeGguqNVeyzJ0TPWh5idYNRmSZr6eTUF0245LNO/gqvWKgRSNIZko -HoBa2F1AvCiB67S1kxjwS5y3VkudZE4jkgGKcC2Ws/9QmOZ0HAsjI8VAMp2hj6iN -0HdPMTNtsLgbhKyXsoZuW4YmwfSTPxGi2gvcI7GUozpTz84n1cOneJnz1ygx6Uru -v8DpQg+VX6xTy4X6AK1F8OYNMZ/jaQKBwQDv30NevQStnGbTmcSr+0fd4rmWFklK -V6B2X7zWynVpSGvCWkqVSp3mG6aiZItAltVMRL/9LT6zIheyClyd+vXIjR2+W210 -XMxrvz7A8qCXkvB2dyEhrMdCfZ7p8+kf+eD2c/Mnxb7VpmDfHYLx30JeQoBwjrwU -Eul+dE1P+r8bWBaLTjlsipTya74yItWWAToXAo+s1BXBtXhEsLoe4FghlC0u724d -ucjDaeICdLcerApdvg6Q6p4kVHaoF6ka6I8CgcEA0Bdc05ery9gLC6CclV+BhA5Q -dfDq2P7qhc7e1ipwNRrQo2gy5HhgOkTL3dJWc+8rV6CBP/JfchnsW40tDOnPCTLT -gg3n7vv3RHrtncApXuhIFR+B5xjohTPBzxRUMiAOre2d0F5b6eBXFjptf/1i2tQ+ -qdqJoyOGOZP0hKVslGIfz+CKc6WEkIqX7c91Msdr5myeaWDI5TsurfuKRBH395T3 -BMAi6oinAAEb1rdySenLO2A/0kVmBVlTpaN3TNjjAoHBAMvS4uQ1qSv8okNbfgrF -UqPwa9JkzZImM2tinovFLU9xAl/7aTTCWrmU9Vs4JDuV71kHcjwnngeJCKl4tIpp -HUB06Lk/5xnhYLKNpz087cjeSwXe5IBA2HBfXhFd+NH6+nVwwUUieq4A2n+8C/CK -zVJbH9iE8Lv99fpFyQwU/R63EzD8Hz9j4ny7oLnpb6QvFrVGr98jt/kJwlBb+0sR -RtIBnwMq4F7R5w5lgm6jzpZ5ibVuMeJh+k7Ulp7uu/rpcQKBwQDE3sWIvf7f7PaO -OpbJz0CmYjCHVLWrNIlGrPAv6Jid9U+cuXEkrCpGFl5V77CxIH59+bEuga0BMztl -ZkxP4khoqHhom6VpeWJ3nGGAFJRPYS0JJvTsYalilBPxSYdaoO+iZ6MdxpfozcE2 -m3KLW3uSEqlyYvpCqNJNWQhGEoeGXstADWyPevHPGgAhElwL/ZW8u9inU9Tc4sAI -BGnMer+BsaJ+ERU3lK+Clony+z2aZiFLfIUE93lM6DT2CZBN2QcCgcAVk4L0bfA6 -HFnP/ZWNlnYWpOVFKcq57PX+J5/k7Tf34e2cYM2P0eqYggWZbzVd8qoCOQCHrAx0 -aZSSvEyKAVvzRNeqbm1oXaMojksMnrSX5henHjPbZlr1EmM7+zMnSTMkfVOx/6g1 -97sASej31XdOAgKCBJGymrwvYrCLW+P5cHqd+D8v/PvfpRIQM54p5ixRt3EYZvtR -zGrzsr0OGyOLZtj1DB0a3kvajAAOCl3TawJSzviKo2mwc+/xj28MCQM= ------END RSA PRIVATE KEY----- ------BEGIN CERTIFICATE----- -MIIE4TCCA0mgAwIBAgIJALONCAWZxPhUMA0GCSqGSIb3DQEBCwUAMEExCzAJBgNV -BAYTAk5aMQ4wDAYDVQQIDAVPdGFnbzEPMA0GA1UECgwGUGF0aG9kMREwDwYDVQQD -DAh0ZXN0LmNvbTAeFw0xNTA0MTgyMjA0NTNaFw00MjA5MDIyMjA0NTNaMEExCzAJ -BgNVBAYTAk5aMQ4wDAYDVQQIDAVPdGFnbzEPMA0GA1UECgwGUGF0aG9kMREwDwYD -VQQDDAh0ZXN0LmNvbTCCAaIwDQYJKoZIhvcNAQEBBQADggGPADCCAYoCggGBAML7 -SsaGbwVdgMTwJAsfD127hOynqivWwb5PMfiXITzHX/ymY7Hbtr+hpvB4tOd4u2Rs -aLhb6JycRwIiunMJPTmEMGvX2zhX9poFihATuH2wh/d7Vgwk3VlcRcjVYgL8Rh5S -bN/yFNN6SBtvQIFJ4UVvKRm2mwH8fHpnDmSWd8pBLopmguLO5kcHXOtxl523I07U -ZKybAQEyPsbz8LkdvuHWl+7Dwn9JHpd2zdozKcF375P1sJCdgSBHmQoNKiL8jIx/ -aq1Iq1J7JkJ6T5Z9AhgW0BAbFvcjHxqBXURIjGJbOY2XqjcY1jb9UU0SJv+EYzMM -sZ6EwPXg0vcMDtjZyUuL31wJ1Ez+AWMfD3uGTANhpumKwsrE+mANcXRzaXLh1wpc -cJj4/3JvGeZCIOXJWzSCutLbPcha/wKNyjz2Cdhgb/PzngKTwSbrm3sLPv/f9dxM -P0idbdrtvPrD24F1BvHQtp+L1k7ycCn+g7qsYJ5vNs3c40maF0ULD/fOKojezQID -AQABo4HbMIHYMAsGA1UdDwQEAwIFoDAdBgNVHQ4EFgQUbEgfTauEqEP/bnBtby1K -bihJvcswcQYDVR0jBGowaIAUbEgfTauEqEP/bnBtby1KbihJvcuhRaRDMEExCzAJ -BgNVBAYTAk5aMQ4wDAYDVQQIDAVPdGFnbzEPMA0GA1UECgwGUGF0aG9kMREwDwYD -VQQDDAh0ZXN0LmNvbYIJALONCAWZxPhUMAwGA1UdEwQFMAMBAf8wKQYDVR0RBCIw -IIIIdGVzdC5jb22CCXRlc3QyLmNvbYIJdGVzdDMuY29tMA0GCSqGSIb3DQEBCwUA -A4IBgQBcTedXtUb91DxQRtg73iomz7cQ4niZntUBW8iE5rpoA7prtQNGHMCbHwaX -tbWFkzBmL5JTBWvd/6AQ2LtiB3rYB3W/iRhbpsNJ501xaoOguPEQ9720Ph8TEveM -208gNzGsEOcNALwyXj2y9M19NGu9zMa8eu1Tc3IsQaVaGKHx8XZn5HTNUx8EdcwI -Z/Ji9ETDCL7+e5INv0tqfFSazWaQUwxM4IzPMkKTYRcMuN/6eog609k9r9pp32Ut -rKlzc6GIkAlgJJ0Wkoz1V46DmJNJdJG7eLu/mtsB85j6hytIQeWTf1fll5YnMZLF -HgNZtfYn8Q0oTdBQ0ZOaZeQCfZ8emYBdLJf2YB83uGRMjQ1FoeIxzQqiRq8WHRdb -9Q45i0DINMnNp0DbLMA4numZ7wT9SQb6sql9eUyuCNDw7nGIWTHUNfLtU1Er3h1d -icJuApx9+//UN/pGh0yTXb3fZbiI4IehRmkpnIWonIAwaVGm6JZU04wiIn8CuBho -/qQdlS8= ------END CERTIFICATE----- diff --git a/test/scripts/generate.sh b/test/scripts/generate.sh deleted file mode 100755 index eec3077d..00000000 --- a/test/scripts/generate.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash - -if [ ! -f ./private.key ] -then - openssl genrsa -out private.key 3072 -fi -openssl req \ - -batch \ - -new -x509 \ - -key private.key \ - -sha256 \ - -out cert.pem \ - -days 9999 \ - -config ./openssl.cnf -openssl x509 -in cert.pem -text -noout -cat ./private.key ./cert.pem > testcert.pem -rm ./private.key ./cert.pem diff --git a/test/scripts/openssl.cnf b/test/scripts/openssl.cnf deleted file mode 100644 index 5c890354..00000000 --- a/test/scripts/openssl.cnf +++ /dev/null @@ -1,39 +0,0 @@ -[ req ] -default_bits = 1024 -default_keyfile = privkey.pem -distinguished_name = req_distinguished_name -x509_extensions = v3_ca - -[ req_distinguished_name ] -countryName = Country Name (2 letter code) -countryName_default = NZ -countryName_min = 2 -countryName_max = 2 -stateOrProvinceName = State or Province Name (full name) -stateOrProvinceName_default = Otago -localityName = Locality Name (eg, city) -0.organizationName = Organization Name (eg, company) -0.organizationName_default = Pathod -commonName = Common Name (e.g. server FQDN or YOUR name) -commonName_default = test.com -commonName_max = 64 - -[ v3_req ] - -basicConstraints = CA:FALSE -keyUsage = nonRepudiation, digitalSignature, keyEncipherment - -[ v3_ca ] - -keyUsage = digitalSignature, keyEncipherment -subjectKeyIdentifier=hash -authorityKeyIdentifier=keyid:always,issuer:always -basicConstraints = CA:true -subjectAltName = @alternate_names - - -[ alternate_names ] - -DNS.1 = test.com -DNS.2 = test2.com -DNS.3 = test3.com diff --git a/test/test_app.py b/test/test_app.py deleted file mode 100644 index 4536db8e..00000000 --- a/test/test_app.py +++ /dev/null @@ -1,85 +0,0 @@ -import tutils - - -class TestApp(tutils.DaemonTests): - SSL = False - - def test_index(self): - r = self.getpath("/") - assert r.status_code == 200 - assert r.content - - def test_about(self): - r = self.getpath("/about") - assert r.ok - - def test_download(self): - r = self.getpath("/download") - assert r.ok - - def test_docs(self): - assert self.getpath("/docs/pathod").status_code == 200 - assert self.getpath("/docs/pathoc").status_code == 200 - assert self.getpath("/docs/language").status_code == 200 - assert self.getpath("/docs/libpathod").status_code == 200 - assert self.getpath("/docs/test").status_code == 200 - - def test_log(self): - assert self.getpath("/log").status_code == 200 - assert self.get("200:da").status_code == 200 - id = self.d.log()[0]["id"] - assert self.getpath("/log").status_code == 200 - assert self.getpath("/log/%s" % id).status_code == 200 - assert self.getpath("/log/9999999").status_code == 404 - - def test_log_binary(self): - assert self.get("200:h@10b=@10b:da") - - def test_response_preview(self): - r = self.getpath("/response_preview", params=dict(spec="200")) - assert r.status_code == 200 - assert 'Response' in r.content - - r = self.getpath("/response_preview", params=dict(spec="foo")) - assert r.status_code == 200 - assert 'Error' in r.content - - r = self.getpath("/response_preview", params=dict(spec="200:b@100m")) - assert r.status_code == 200 - assert "too large" in r.content - - r = self.getpath("/response_preview", params=dict(spec="200:b@5k")) - assert r.status_code == 200 - assert 'Response' in r.content - - r = self.getpath( - "/response_preview", - params=dict( - spec="200:b<nonexistent")) - assert r.status_code == 200 - assert 'File access denied' in r.content - - r = self.getpath("/response_preview", params=dict(spec="200:b<file")) - assert r.status_code == 200 - assert 'testfile' in r.content - - def test_request_preview(self): - r = self.getpath("/request_preview", params=dict(spec="get:/")) - assert r.status_code == 200 - assert 'Request' in r.content - - r = self.getpath("/request_preview", params=dict(spec="foo")) - assert r.status_code == 200 - assert 'Error' in r.content - - r = self.getpath("/request_preview", params=dict(spec="get:/:b@100m")) - assert r.status_code == 200 - assert "too large" in r.content - - r = self.getpath("/request_preview", params=dict(spec="get:/:b@5k")) - assert r.status_code == 200 - assert 'Request' in r.content - - r = self.getpath("/request_preview", params=dict(spec="")) - assert r.status_code == 200 - assert 'empty spec' in r.content diff --git a/test/test_language_actions.py b/test/test_language_actions.py deleted file mode 100644 index 755f0d85..00000000 --- a/test/test_language_actions.py +++ /dev/null @@ -1,135 +0,0 @@ -import cStringIO - -from libpathod.language import actions -from libpathod import language - - -def parse_request(s): - return language.parse_pathoc(s).next() - - -def test_unique_name(): - assert not actions.PauseAt(0, "f").unique_name - assert actions.DisconnectAt(0).unique_name - - -class TestDisconnects: - - def test_parse_pathod(self): - a = language.parse_pathod("400:d0").next().actions[0] - assert a.spec() == "d0" - a = language.parse_pathod("400:dr").next().actions[0] - assert a.spec() == "dr" - - def test_at(self): - e = actions.DisconnectAt.expr() - v = e.parseString("d0")[0] - assert isinstance(v, actions.DisconnectAt) - assert v.offset == 0 - - v = e.parseString("d100")[0] - assert v.offset == 100 - - e = actions.DisconnectAt.expr() - v = e.parseString("dr")[0] - assert v.offset == "r" - - def test_spec(self): - assert actions.DisconnectAt("r").spec() == "dr" - assert actions.DisconnectAt(10).spec() == "d10" - - -class TestInject: - - def test_parse_pathod(self): - a = language.parse_pathod("400:ir,@100").next().actions[0] - assert a.offset == "r" - assert a.value.datatype == "bytes" - assert a.value.usize == 100 - - a = language.parse_pathod("400:ia,@100").next().actions[0] - assert a.offset == "a" - - def test_at(self): - e = actions.InjectAt.expr() - v = e.parseString("i0,'foo'")[0] - assert v.value.val == "foo" - assert v.offset == 0 - assert isinstance(v, actions.InjectAt) - - v = e.parseString("ir,'foo'")[0] - assert v.offset == "r" - - def test_serve(self): - s = cStringIO.StringIO() - r = language.parse_pathod("400:i0,'foo'").next() - assert language.serve(r, s, {}) - - def test_spec(self): - e = actions.InjectAt.expr() - v = e.parseString("i0,'foo'")[0] - assert v.spec() == 'i0,"foo"' - - def test_spec(self): - e = actions.InjectAt.expr() - v = e.parseString("i0,@100")[0] - v2 = v.freeze({}) - v3 = v2.freeze({}) - assert v2.value.val == v3.value.val - - -class TestPauses: - - def test_parse_pathod(self): - e = actions.PauseAt.expr() - v = e.parseString("p10,10")[0] - assert v.seconds == 10 - assert v.offset == 10 - - v = e.parseString("p10,f")[0] - assert v.seconds == "f" - - v = e.parseString("pr,f")[0] - assert v.offset == "r" - - v = e.parseString("pa,f")[0] - assert v.offset == "a" - - def test_request(self): - r = language.parse_pathod('400:p10,10').next() - assert r.actions[0].spec() == "p10,10" - - def test_spec(self): - assert actions.PauseAt("r", 5).spec() == "pr,5" - assert actions.PauseAt(0, 5).spec() == "p0,5" - assert actions.PauseAt(0, "f").spec() == "p0,f" - - def test_freeze(self): - l = actions.PauseAt("r", 5) - assert l.freeze({}).spec() == l.spec() - - -class Test_Action: - - def test_cmp(self): - a = actions.DisconnectAt(0) - b = actions.DisconnectAt(1) - c = actions.DisconnectAt(0) - assert a < b - assert a == c - l = sorted([b, a]) - assert l[0].offset == 0 - - def test_resolve(self): - r = parse_request('GET:"/foo"') - e = actions.DisconnectAt("r") - ret = e.resolve({}, r) - assert isinstance(ret.offset, int) - - def test_repr(self): - e = actions.DisconnectAt("r") - assert repr(e) - - def test_freeze(self): - l = actions.DisconnectAt(5) - assert l.freeze({}).spec() == l.spec() diff --git a/test/test_language_base.py b/test/test_language_base.py deleted file mode 100644 index b18ee5b2..00000000 --- a/test/test_language_base.py +++ /dev/null @@ -1,352 +0,0 @@ -import os -from libpathod import language -from libpathod.language import base, exceptions -import tutils - - -def parse_request(s): - return language.parse_pathoc(s).next() - - -def test_times(): - reqs = list(language.parse_pathoc("get:/:x5")) - assert len(reqs) == 5 - assert not reqs[0].times - - -def test_caseless_literal(): - class CL(base.CaselessLiteral): - TOK = "foo" - v = CL("foo") - assert v.expr() - assert v.values(language.Settings()) - - -class TestTokValueNakedLiteral: - - def test_expr(self): - v = base.TokValueNakedLiteral("foo") - assert v.expr() - - def test_spec(self): - v = base.TokValueNakedLiteral("foo") - assert v.spec() == repr(v) == "foo" - - v = base.TokValueNakedLiteral("f\x00oo") - assert v.spec() == repr(v) == r"f\x00oo" - - -class TestTokValueLiteral: - - def test_espr(self): - v = base.TokValueLiteral("foo") - assert v.expr() - assert v.val == "foo" - - v = base.TokValueLiteral("foo\n") - assert v.expr() - assert v.val == "foo\n" - assert repr(v) - - def test_spec(self): - v = base.TokValueLiteral("foo") - assert v.spec() == r"'foo'" - - v = base.TokValueLiteral("f\x00oo") - assert v.spec() == repr(v) == r"'f\x00oo'" - - v = base.TokValueLiteral("\"") - assert v.spec() == repr(v) == '\'"\'' - - def roundtrip(self, spec): - e = base.TokValueLiteral.expr() - v = base.TokValueLiteral(spec) - v2 = e.parseString(v.spec()) - assert v.val == v2[0].val - assert v.spec() == v2[0].spec() - - def test_roundtrip(self): - self.roundtrip("'") - self.roundtrip('\'') - self.roundtrip("a") - self.roundtrip("\"") - # self.roundtrip("\\") - self.roundtrip("200:b'foo':i23,'\\''") - self.roundtrip("\a") - - -class TestTokValueGenerate: - - def test_basic(self): - v = base.TokValue.parseString("@10b")[0] - assert v.usize == 10 - assert v.unit == "b" - assert v.bytes() == 10 - v = base.TokValue.parseString("@10")[0] - assert v.unit == "b" - v = base.TokValue.parseString("@10k")[0] - assert v.bytes() == 10240 - v = base.TokValue.parseString("@10g")[0] - assert v.bytes() == 1024 ** 3 * 10 - - v = base.TokValue.parseString("@10g,digits")[0] - assert v.datatype == "digits" - g = v.get_generator({}) - assert g[:100] - - v = base.TokValue.parseString("@10,digits")[0] - assert v.unit == "b" - assert v.datatype == "digits" - - def test_spec(self): - v = base.TokValueGenerate(1, "b", "bytes") - assert v.spec() == repr(v) == "@1" - - v = base.TokValueGenerate(1, "k", "bytes") - assert v.spec() == repr(v) == "@1k" - - v = base.TokValueGenerate(1, "k", "ascii") - assert v.spec() == repr(v) == "@1k,ascii" - - v = base.TokValueGenerate(1, "b", "ascii") - assert v.spec() == repr(v) == "@1,ascii" - - def test_freeze(self): - v = base.TokValueGenerate(100, "b", "ascii") - f = v.freeze(language.Settings()) - assert len(f.val) == 100 - - -class TestTokValueFile: - - def test_file_value(self): - v = base.TokValue.parseString("<'one two'")[0] - assert str(v) - assert v.path == "one two" - - v = base.TokValue.parseString("<path")[0] - assert v.path == "path" - - def test_access_control(self): - v = base.TokValue.parseString("<path")[0] - with tutils.tmpdir() as t: - p = os.path.join(t, "path") - with open(p, "wb") as f: - f.write("x" * 10000) - - assert v.get_generator(language.Settings(staticdir=t)) - - v = base.TokValue.parseString("<path2")[0] - tutils.raises( - exceptions.FileAccessDenied, - v.get_generator, - language.Settings(staticdir=t) - ) - tutils.raises( - "access disabled", - v.get_generator, - language.Settings() - ) - - v = base.TokValue.parseString("</outside")[0] - tutils.raises( - "outside", - v.get_generator, - language.Settings(staticdir=t) - ) - - def test_spec(self): - v = base.TokValue.parseString("<'one two'")[0] - v2 = base.TokValue.parseString(v.spec())[0] - assert v2.path == "one two" - - def test_freeze(self): - v = base.TokValue.parseString("<'one two'")[0] - v2 = v.freeze({}) - assert v2.path == v.path - - -class TestMisc: - - def test_generators(self): - v = base.TokValue.parseString("'val'")[0] - g = v.get_generator({}) - assert g[:] == "val" - - def test_value(self): - assert base.TokValue.parseString("'val'")[0].val == "val" - assert base.TokValue.parseString('"val"')[0].val == "val" - assert base.TokValue.parseString('"\'val\'"')[0].val == "'val'" - - def test_value(self): - class TT(base.Value): - preamble = "m" - e = TT.expr() - v = e.parseString("m'msg'")[0] - assert v.value.val == "msg" - - s = v.spec() - assert s == e.parseString(s)[0].spec() - - v = e.parseString("m@100")[0] - v2 = v.freeze({}) - v3 = v2.freeze({}) - assert v2.value.val == v3.value.val - - def test_fixedlengthvalue(self): - class TT(base.FixedLengthValue): - preamble = "m" - length = 4 - - e = TT.expr() - assert e.parseString("m@4") - tutils.raises("invalid value length", e.parseString, "m@100") - tutils.raises("invalid value length", e.parseString, "m@1") - - with tutils.tmpdir() as t: - p = os.path.join(t, "path") - s = base.Settings(staticdir=t) - with open(p, "wb") as f: - f.write("a" * 20) - v = e.parseString("m<path")[0] - tutils.raises("invalid value length", v.values, s) - - p = os.path.join(t, "path") - with open(p, "wb") as f: - f.write("a" * 4) - v = e.parseString("m<path")[0] - assert v.values(s) - - -class TKeyValue(base.KeyValue): - preamble = "h" - - def values(self, settings): - return [ - self.key.get_generator(settings), - ": ", - self.value.get_generator(settings), - "\r\n", - ] - - -class TestKeyValue: - - def test_simple(self): - e = TKeyValue.expr() - v = e.parseString("h'foo'='bar'")[0] - assert v.key.val == "foo" - assert v.value.val == "bar" - - v2 = e.parseString(v.spec())[0] - assert v2.key.val == v.key.val - assert v2.value.val == v.value.val - - s = v.spec() - assert s == e.parseString(s)[0].spec() - - def test_freeze(self): - e = TKeyValue.expr() - v = e.parseString("h@10=@10'")[0] - v2 = v.freeze({}) - v3 = v2.freeze({}) - assert v2.key.val == v3.key.val - assert v2.value.val == v3.value.val - - -def test_intfield(): - class TT(base.IntField): - preamble = "t" - names = { - "one": 1, - "two": 2, - "three": 3 - } - max = 4 - e = TT.expr() - - v = e.parseString("tone")[0] - assert v.value == 1 - assert v.spec() == "tone" - assert v.values(language.Settings()) - - v = e.parseString("t1")[0] - assert v.value == 1 - assert v.spec() == "t1" - - v = e.parseString("t4")[0] - assert v.value == 4 - assert v.spec() == "t4" - - tutils.raises("can't exceed", e.parseString, "t5") - - -def test_options_or_value(): - class TT(base.OptionsOrValue): - options = [ - "one", - "two", - "three" - ] - e = TT.expr() - assert e.parseString("one")[0].value.val == "one" - assert e.parseString("'foo'")[0].value.val == "foo" - assert e.parseString("'get'")[0].value.val == "get" - - assert e.parseString("one")[0].spec() == "one" - assert e.parseString("'foo'")[0].spec() == "'foo'" - - s = e.parseString("one")[0].spec() - assert s == e.parseString(s)[0].spec() - - s = e.parseString("'foo'")[0].spec() - assert s == e.parseString(s)[0].spec() - - v = e.parseString("@100")[0] - v2 = v.freeze({}) - v3 = v2.freeze({}) - assert v2.value.val == v3.value.val - - -def test_integer(): - e = base.Integer.expr() - v = e.parseString("200")[0] - assert v.string() == "200" - assert v.spec() == "200" - - assert v.freeze({}).value == v.value - - class BInt(base.Integer): - bounds = (1, 5) - - tutils.raises("must be between", BInt, 0) - tutils.raises("must be between", BInt, 6) - assert BInt(5) - assert BInt(1) - assert BInt(3) - - -class TBoolean(base.Boolean): - name = "test" - - -def test_unique_name(): - b = TBoolean(True) - assert b.unique_name - - -class test_boolean(): - e = TBoolean.expr() - assert e.parseString("test")[0].value - assert not e.parseString("-test")[0].value - - def roundtrip(s): - e = TBoolean.expr() - s2 = e.parseString(s)[0].spec() - v1 = e.parseString(s)[0].value - v2 = e.parseString(s2)[0].value - assert s == s2 - assert v1 == v2 - - roundtrip("test") - roundtrip("-test") diff --git a/test/test_language_generators.py b/test/test_language_generators.py deleted file mode 100644 index 945560c3..00000000 --- a/test/test_language_generators.py +++ /dev/null @@ -1,42 +0,0 @@ -import os - -from libpathod.language import generators -import tutils - - -def test_randomgenerator(): - g = generators.RandomGenerator("bytes", 100) - assert repr(g) - assert len(g[:10]) == 10 - assert len(g[1:10]) == 9 - assert len(g[:1000]) == 100 - assert len(g[1000:1001]) == 0 - assert g[0] - - -def test_filegenerator(): - with tutils.tmpdir() as t: - path = os.path.join(t, "foo") - f = open(path, "wb") - f.write("x" * 10000) - f.close() - g = generators.FileGenerator(path) - assert len(g) == 10000 - assert g[0] == "x" - assert g[-1] == "x" - assert g[0:5] == "xxxxx" - assert repr(g) - # remove all references to FileGenerator instance to close the file - # handle. - del g - - -def test_transform_generator(): - def trans(offset, data): - return "a" * len(data) - g = "one" - t = generators.TransformGenerator(g, trans) - assert len(t) == len(g) - assert t[0] == "a" - assert t[:] == "a" * len(g) - assert repr(t) diff --git a/test/test_language_http.py b/test/test_language_http.py deleted file mode 100644 index 26bb6a45..00000000 --- a/test/test_language_http.py +++ /dev/null @@ -1,358 +0,0 @@ -import cStringIO - -from libpathod import language -from libpathod.language import http, base -import tutils - - -def parse_request(s): - return language.parse_pathoc(s).next() - - -def test_make_error_response(): - d = cStringIO.StringIO() - s = http.make_error_response("foo") - language.serve(s, d, {}) - - -class TestRequest: - - def test_nonascii(self): - tutils.raises("ascii", parse_request, "get:\xf0") - - def test_err(self): - tutils.raises(language.ParseException, parse_request, 'GET') - - def test_simple(self): - r = parse_request('GET:"/foo"') - assert r.method.string() == "GET" - assert r.path.string() == "/foo" - r = parse_request('GET:/foo') - assert r.path.string() == "/foo" - r = parse_request('GET:@1k') - assert len(r.path.string()) == 1024 - - def test_multiple(self): - r = list(language.parse_pathoc("GET:/ PUT:/")) - assert r[0].method.string() == "GET" - assert r[1].method.string() == "PUT" - assert len(r) == 2 - - l = """ - GET - "/foo" - ir,@1 - - PUT - - "/foo - - - - bar" - - ir,@1 - """ - r = list(language.parse_pathoc(l)) - assert len(r) == 2 - assert r[0].method.string() == "GET" - assert r[1].method.string() == "PUT" - - l = """ - get:"http://localhost:9999/p/200":ir,@1 - get:"http://localhost:9999/p/200":ir,@2 - """ - r = list(language.parse_pathoc(l)) - assert len(r) == 2 - assert r[0].method.string() == "GET" - assert r[1].method.string() == "GET" - - def test_nested_response(self): - l = "get:/p:s'200'" - r = list(language.parse_pathoc(l)) - assert len(r) == 1 - assert len(r[0].tokens) == 3 - assert isinstance(r[0].tokens[2], http.NestedResponse) - assert r[0].values({}) - - def test_render(self): - s = cStringIO.StringIO() - r = parse_request("GET:'/foo'") - assert language.serve( - r, - s, - language.Settings(request_host="foo.com") - ) - - def test_multiline(self): - l = """ - GET - "/foo" - ir,@1 - """ - r = parse_request(l) - assert r.method.string() == "GET" - assert r.path.string() == "/foo" - assert r.actions - - l = """ - GET - - "/foo - - - - bar" - - ir,@1 - """ - r = parse_request(l) - assert r.method.string() == "GET" - assert r.path.string().endswith("bar") - assert r.actions - - def test_spec(self): - def rt(s): - s = parse_request(s).spec() - assert parse_request(s).spec() == s - rt("get:/foo") - rt("get:/foo:da") - - def test_freeze(self): - r = parse_request("GET:/:b@100").freeze(language.Settings()) - assert len(r.spec()) > 100 - - def test_path_generator(self): - r = parse_request("GET:@100").freeze(language.Settings()) - assert len(r.spec()) > 100 - - def test_websocket(self): - r = parse_request('ws:/path/') - res = r.resolve(language.Settings()) - assert res.method.string().lower() == "get" - assert res.tok(http.Path).value.val == "/path/" - assert res.tok(http.Method).value.val.lower() == "get" - assert http.get_header("Upgrade", res.headers).value.val == "websocket" - - r = parse_request('ws:put:/path/') - res = r.resolve(language.Settings()) - assert r.method.string().lower() == "put" - assert res.tok(http.Path).value.val == "/path/" - assert res.tok(http.Method).value.val.lower() == "put" - assert http.get_header("Upgrade", res.headers).value.val == "websocket" - - -class TestResponse: - - def dummy_response(self): - return language.parse_pathod("400'msg'").next() - - def test_response(self): - r = language.parse_pathod("400:m'msg'").next() - assert r.status_code.string() == "400" - assert r.reason.string() == "msg" - - r = language.parse_pathod("400:m'msg':b@100b").next() - assert r.reason.string() == "msg" - assert r.body.values({}) - assert str(r) - - r = language.parse_pathod("200").next() - assert r.status_code.string() == "200" - assert not r.reason - assert "OK" in [i[:] for i in r.preamble({})] - - def test_render(self): - s = cStringIO.StringIO() - r = language.parse_pathod("400:m'msg'").next() - assert language.serve(r, s, {}) - - r = language.parse_pathod("400:p0,100:dr").next() - assert "p0" in r.spec() - s = r.preview_safe() - assert "p0" not in s.spec() - - def test_raw(self): - s = cStringIO.StringIO() - r = language.parse_pathod("400:b'foo'").next() - language.serve(r, s, {}) - v = s.getvalue() - assert "Content-Length" in v - - s = cStringIO.StringIO() - r = language.parse_pathod("400:b'foo':r").next() - language.serve(r, s, {}) - v = s.getvalue() - assert "Content-Length" not in v - - def test_length(self): - def testlen(x): - s = cStringIO.StringIO() - x = x.next() - language.serve(x, s, language.Settings()) - assert x.length(language.Settings()) == len(s.getvalue()) - testlen(language.parse_pathod("400:m'msg':r")) - testlen(language.parse_pathod("400:m'msg':h'foo'='bar':r")) - testlen(language.parse_pathod("400:m'msg':h'foo'='bar':b@100b:r")) - - def test_maximum_length(self): - def testlen(x): - x = x.next() - s = cStringIO.StringIO() - m = x.maximum_length({}) - language.serve(x, s, {}) - assert m >= len(s.getvalue()) - - r = language.parse_pathod("400:m'msg':b@100:d0") - testlen(r) - - r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'") - testlen(r) - - r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'") - testlen(r) - - def test_parse_err(self): - tutils.raises( - language.ParseException, language.parse_pathod, "400:msg,b:" - ) - try: - language.parse_pathod("400'msg':b:") - except language.ParseException as v: - assert v.marked() - assert str(v) - - def test_nonascii(self): - tutils.raises("ascii", language.parse_pathod, "foo:b\xf0") - - def test_parse_header(self): - r = language.parse_pathod('400:h"foo"="bar"').next() - assert http.get_header("foo", r.headers) - - def test_parse_pause_before(self): - r = language.parse_pathod("400:p0,10").next() - assert r.actions[0].spec() == "p0,10" - - def test_parse_pause_after(self): - r = language.parse_pathod("400:pa,10").next() - assert r.actions[0].spec() == "pa,10" - - def test_parse_pause_random(self): - r = language.parse_pathod("400:pr,10").next() - assert r.actions[0].spec() == "pr,10" - - def test_parse_stress(self): - # While larger values are known to work on linux, len() technically - # returns an int and a python 2.7 int on windows has 32bit precision. - # Therefore, we should keep the body length < 2147483647 bytes in our - # tests. - r = language.parse_pathod("400:b@1g").next() - assert r.length({}) - - def test_spec(self): - def rt(s): - s = language.parse_pathod(s).next().spec() - assert language.parse_pathod(s).next().spec() == s - rt("400:b@100g") - rt("400") - rt("400:da") - - def test_websockets(self): - r = language.parse_pathod("ws").next() - tutils.raises("no websocket key", r.resolve, language.Settings()) - res = r.resolve(language.Settings(websocket_key="foo")) - assert res.status_code.string() == "101" - - -def test_ctype_shortcut(): - e = http.ShortcutContentType.expr() - v = e.parseString("c'foo'")[0] - assert v.key.val == "Content-Type" - assert v.value.val == "foo" - - s = v.spec() - assert s == e.parseString(s)[0].spec() - - e = http.ShortcutContentType.expr() - v = e.parseString("c@100")[0] - v2 = v.freeze({}) - v3 = v2.freeze({}) - assert v2.value.val == v3.value.val - - -def test_location_shortcut(): - e = http.ShortcutLocation.expr() - v = e.parseString("l'foo'")[0] - assert v.key.val == "Location" - assert v.value.val == "foo" - - s = v.spec() - assert s == e.parseString(s)[0].spec() - - e = http.ShortcutLocation.expr() - v = e.parseString("l@100")[0] - v2 = v.freeze({}) - v3 = v2.freeze({}) - assert v2.value.val == v3.value.val - - -def test_shortcuts(): - assert language.parse_pathod( - "400:c'foo'").next().headers[0].key.val == "Content-Type" - assert language.parse_pathod( - "400:l'foo'").next().headers[0].key.val == "Location" - - assert "Android" in tutils.render(parse_request("get:/:ua")) - assert "User-Agent" in tutils.render(parse_request("get:/:ua")) - - -def test_user_agent(): - e = http.ShortcutUserAgent.expr() - v = e.parseString("ua")[0] - assert "Android" in v.string() - - e = http.ShortcutUserAgent.expr() - v = e.parseString("u'a'")[0] - assert "Android" not in v.string() - - v = e.parseString("u@100'")[0] - assert len(str(v.freeze({}).value)) > 100 - v2 = v.freeze({}) - v3 = v2.freeze({}) - assert v2.value.val == v3.value.val - - -def test_nested_response(): - e = http.NestedResponse.expr() - v = e.parseString("s'200'")[0] - assert v.value.val == "200" - tutils.raises( - language.ParseException, - e.parseString, - "s'foo'" - ) - - v = e.parseString('s"200:b@1"')[0] - assert "@1" in v.spec() - f = v.freeze({}) - assert "@1" not in f.spec() - - -def test_nested_response_freeze(): - e = http.NestedResponse( - base.TokValueLiteral( - "200:b'foo':i10,'\\x27'".encode( - "string_escape" - ) - ) - ) - assert e.freeze({}) - assert e.values({}) - - -def test_unique_components(): - tutils.raises( - "multiple body clauses", - language.parse_pathod, - "400:b@1:b@1" - ) diff --git a/test/test_language_http2.py b/test/test_language_http2.py deleted file mode 100644 index 9be49452..00000000 --- a/test/test_language_http2.py +++ /dev/null @@ -1,233 +0,0 @@ -import cStringIO - -import netlib -from netlib import tcp -from netlib.http import user_agents - -from libpathod import language -from libpathod.language import http2, base -import tutils - - -def parse_request(s): - return language.parse_pathoc(s, True).next() - - -def parse_response(s): - return language.parse_pathod(s, True).next() - - -def default_settings(): - return language.Settings( - request_host="foo.com", - protocol=netlib.http.http2.HTTP2Protocol(tcp.TCPClient(('localhost', 1234))) - ) - - -def test_make_error_response(): - d = cStringIO.StringIO() - s = http2.make_error_response("foo", "bar") - language.serve(s, d, default_settings()) - - -class TestRequest: - - def test_cached_values(self): - req = parse_request("get:/") - req_id = id(req) - assert req_id == id(req.resolve(default_settings())) - assert req.values(default_settings()) == req.values(default_settings()) - - def test_nonascii(self): - tutils.raises("ascii", parse_request, "get:\xf0") - - def test_err(self): - tutils.raises(language.ParseException, parse_request, 'GET') - - def test_simple(self): - r = parse_request('GET:"/foo"') - assert r.method.string() == "GET" - assert r.path.string() == "/foo" - r = parse_request('GET:/foo') - assert r.path.string() == "/foo" - - def test_multiple(self): - r = list(language.parse_pathoc("GET:/ PUT:/")) - assert r[0].method.string() == "GET" - assert r[1].method.string() == "PUT" - assert len(r) == 2 - - l = """ - GET - "/foo" - - PUT - - "/foo - - - - bar" - """ - r = list(language.parse_pathoc(l, True)) - assert len(r) == 2 - assert r[0].method.string() == "GET" - assert r[1].method.string() == "PUT" - - l = """ - get:"http://localhost:9999/p/200" - get:"http://localhost:9999/p/200" - """ - r = list(language.parse_pathoc(l, True)) - assert len(r) == 2 - assert r[0].method.string() == "GET" - assert r[1].method.string() == "GET" - - def test_render_simple(self): - s = cStringIO.StringIO() - r = parse_request("GET:'/foo'") - assert language.serve( - r, - s, - default_settings(), - ) - - def test_raw_content_length(self): - r = parse_request('GET:/:r') - assert len(r.headers) == 0 - - r = parse_request('GET:/:r:b"foobar"') - assert len(r.headers) == 0 - - r = parse_request('GET:/') - assert len(r.headers) == 1 - assert r.headers[0].values(default_settings()) == ("content-length", "0") - - r = parse_request('GET:/:b"foobar"') - assert len(r.headers) == 1 - assert r.headers[0].values(default_settings()) == ("content-length", "6") - - r = parse_request('GET:/:b"foobar":h"content-length"="42"') - assert len(r.headers) == 1 - assert r.headers[0].values(default_settings()) == ("content-length", "42") - - r = parse_request('GET:/:r:b"foobar":h"content-length"="42"') - assert len(r.headers) == 1 - assert r.headers[0].values(default_settings()) == ("content-length", "42") - - def test_content_type(self): - r = parse_request('GET:/:r:c"foobar"') - assert len(r.headers) == 1 - assert r.headers[0].values(default_settings()) == ("content-type", "foobar") - - def test_user_agent(self): - r = parse_request('GET:/:r:ua') - assert len(r.headers) == 1 - assert r.headers[0].values(default_settings()) == ("user-agent", user_agents.get_by_shortcut('a')[2]) - - def test_render_with_headers(self): - s = cStringIO.StringIO() - r = parse_request('GET:/foo:h"foo"="bar"') - assert language.serve( - r, - s, - default_settings(), - ) - - def test_nested_response(self): - l = "get:/p/:s'200'" - r = parse_request(l) - assert len(r.tokens) == 3 - assert isinstance(r.tokens[2], http2.NestedResponse) - assert r.values(default_settings()) - - - def test_render_with_body(self): - s = cStringIO.StringIO() - r = parse_request("GET:'/foo':bfoobar") - assert language.serve( - r, - s, - default_settings(), - ) - - def test_spec(self): - def rt(s): - s = parse_request(s).spec() - assert parse_request(s).spec() == s - rt("get:/foo") - - -class TestResponse: - - def test_cached_values(self): - res = parse_response("200") - res_id = id(res) - assert res_id == id(res.resolve(default_settings())) - assert res.values(default_settings()) == res.values(default_settings()) - - def test_nonascii(self): - tutils.raises("ascii", parse_response, "200:\xf0") - - def test_err(self): - tutils.raises(language.ParseException, parse_response, 'GET:/') - - def test_raw_content_length(self): - r = parse_response('200:r') - assert len(r.headers) == 0 - - r = parse_response('200') - assert len(r.headers) == 1 - assert r.headers[0].values(default_settings()) == ("content-length", "0") - - def test_content_type(self): - r = parse_response('200:r:c"foobar"') - assert len(r.headers) == 1 - assert r.headers[0].values(default_settings()) == ("content-type", "foobar") - - def test_simple(self): - r = parse_response('200:r:h"foo"="bar"') - assert r.status_code.string() == "200" - assert len(r.headers) == 1 - assert r.headers[0].values(default_settings()) == ("foo", "bar") - assert r.body is None - - r = parse_response('200:r:h"foo"="bar":bfoobar:h"bla"="fasel"') - assert r.status_code.string() == "200" - assert len(r.headers) == 2 - assert r.headers[0].values(default_settings()) == ("foo", "bar") - assert r.headers[1].values(default_settings()) == ("bla", "fasel") - assert r.body.string() == "foobar" - - def test_render_simple(self): - s = cStringIO.StringIO() - r = parse_response('200') - assert language.serve( - r, - s, - default_settings(), - ) - - def test_render_with_headers(self): - s = cStringIO.StringIO() - r = parse_response('200:h"foo"="bar"') - assert language.serve( - r, - s, - default_settings(), - ) - - def test_render_with_body(self): - s = cStringIO.StringIO() - r = parse_response('200:bfoobar') - assert language.serve( - r, - s, - default_settings(), - ) - - def test_spec(self): - def rt(s): - s = parse_response(s).spec() - assert parse_response(s).spec() == s - rt("200:bfoobar") diff --git a/test/test_language_websocket.py b/test/test_language_websocket.py deleted file mode 100644 index d98fd33e..00000000 --- a/test/test_language_websocket.py +++ /dev/null @@ -1,142 +0,0 @@ - -from libpathod import language -from libpathod.language import websockets -import netlib.websockets -import tutils - - -def parse_request(s): - return language.parse_pathoc(s).next() - - -class TestWebsocketFrame: - - def _test_messages(self, specs, message_klass): - for i in specs: - wf = parse_request(i) - assert isinstance(wf, message_klass) - assert wf - assert wf.values(language.Settings()) - assert wf.resolve(language.Settings()) - - spec = wf.spec() - wf2 = parse_request(spec) - assert wf2.spec() == spec - - def test_server_values(self): - specs = [ - "wf", - "wf:dr", - "wf:b'foo'", - "wf:mask:r'foo'", - "wf:l1024:b'foo'", - "wf:cbinary", - "wf:c1", - "wf:mask:knone", - "wf:fin", - "wf:fin:rsv1:rsv2:rsv3:mask", - "wf:-fin:-rsv1:-rsv2:-rsv3:-mask", - "wf:k@4", - "wf:x10", - ] - self._test_messages(specs, websockets.WebsocketFrame) - - def test_parse_websocket_frames(self): - wf = language.parse_websocket_frame("wf:x10") - assert len(list(wf)) == 10 - tutils.raises( - language.ParseException, - language.parse_websocket_frame, - "wf:x" - ) - - def test_client_values(self): - specs = [ - "wf:f'wf'", - ] - self._test_messages(specs, websockets.WebsocketClientFrame) - - def test_nested_frame(self): - wf = parse_request("wf:f'wf'") - assert wf.nested_frame - - def test_flags(self): - wf = parse_request("wf:fin:mask:rsv1:rsv2:rsv3") - frm = netlib.websockets.Frame.from_bytes(tutils.render(wf)) - assert frm.header.fin - assert frm.header.mask - assert frm.header.rsv1 - assert frm.header.rsv2 - assert frm.header.rsv3 - - wf = parse_request("wf:-fin:-mask:-rsv1:-rsv2:-rsv3") - frm = netlib.websockets.Frame.from_bytes(tutils.render(wf)) - assert not frm.header.fin - assert not frm.header.mask - assert not frm.header.rsv1 - assert not frm.header.rsv2 - assert not frm.header.rsv3 - - def fr(self, spec, **kwargs): - settings = language.base.Settings(**kwargs) - wf = parse_request(spec) - return netlib.websockets.Frame.from_bytes(tutils.render(wf, settings)) - - def test_construction(self): - assert self.fr("wf:c1").header.opcode == 1 - assert self.fr("wf:c0").header.opcode == 0 - assert self.fr("wf:cbinary").header.opcode ==\ - netlib.websockets.OPCODE.BINARY - assert self.fr("wf:ctext").header.opcode ==\ - netlib.websockets.OPCODE.TEXT - - def test_rawbody(self): - frm = self.fr("wf:mask:r'foo'") - assert len(frm.payload) == 3 - assert frm.payload != "foo" - - assert self.fr("wf:r'foo'").payload == "foo" - - def test_construction(self): - # Simple server frame - frm = self.fr("wf:b'foo'") - assert not frm.header.mask - assert not frm.header.masking_key - - # Simple client frame - frm = self.fr("wf:b'foo'", is_client=True) - assert frm.header.mask - assert frm.header.masking_key - frm = self.fr("wf:b'foo':k'abcd'", is_client=True) - assert frm.header.mask - assert frm.header.masking_key == 'abcd' - - # Server frame, mask explicitly set - frm = self.fr("wf:b'foo':mask") - assert frm.header.mask - assert frm.header.masking_key - frm = self.fr("wf:b'foo':k'abcd'") - assert frm.header.mask - assert frm.header.masking_key == 'abcd' - - # Client frame, mask explicitly unset - frm = self.fr("wf:b'foo':-mask", is_client=True) - assert not frm.header.mask - assert not frm.header.masking_key - - frm = self.fr("wf:b'foo':-mask:k'abcd'", is_client=True) - assert not frm.header.mask - # We're reading back a corrupted frame - the first 3 characters of the - # mask is mis-interpreted as the payload - assert frm.payload == "abc" - - def test_knone(self): - with tutils.raises("expected 4 bytes"): - self.fr("wf:b'foo':mask:knone") - - def test_length(self): - assert self.fr("wf:l3:b'foo'").header.payload_length == 3 - frm = self.fr("wf:l2:b'foo'") - assert frm.header.payload_length == 2 - assert frm.payload == "fo" - tutils.raises("expected 1024 bytes", self.fr, "wf:l1024:b'foo'") diff --git a/test/test_language_writer.py b/test/test_language_writer.py deleted file mode 100644 index 1a532903..00000000 --- a/test/test_language_writer.py +++ /dev/null @@ -1,91 +0,0 @@ -import cStringIO - -from libpathod import language -from libpathod.language import writer - - -def test_send_chunk(): - v = "foobarfoobar" - for bs in range(1, len(v) + 2): - s = cStringIO.StringIO() - writer.send_chunk(s, v, bs, 0, len(v)) - assert s.getvalue() == v - for start in range(len(v)): - for end in range(len(v)): - s = cStringIO.StringIO() - writer.send_chunk(s, v, bs, start, end) - assert s.getvalue() == v[start:end] - - -def test_write_values_inject(): - tst = "foo" - - s = cStringIO.StringIO() - writer.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5) - assert s.getvalue() == "aaafoo" - - s = cStringIO.StringIO() - writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) - assert s.getvalue() == "faaaoo" - - s = cStringIO.StringIO() - writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) - assert s.getvalue() == "faaaoo" - - -def test_write_values_disconnects(): - s = cStringIO.StringIO() - tst = "foo" * 100 - writer.write_values(s, [tst], [(0, "disconnect")], blocksize=5) - assert not s.getvalue() - - -def test_write_values(): - tst = "foobarvoing" - s = cStringIO.StringIO() - writer.write_values(s, [tst], []) - assert s.getvalue() == tst - - for bs in range(1, len(tst) + 2): - for off in range(len(tst)): - s = cStringIO.StringIO() - writer.write_values( - s, [tst], [(off, "disconnect")], blocksize=bs - ) - assert s.getvalue() == tst[:off] - - -def test_write_values_pauses(): - tst = "".join(str(i) for i in range(10)) - for i in range(2, 10): - s = cStringIO.StringIO() - writer.write_values( - s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i - ) - assert s.getvalue() == tst - - for i in range(2, 10): - s = cStringIO.StringIO() - writer.write_values(s, [tst], [(1, "pause", 0)], blocksize=i) - assert s.getvalue() == tst - - tst = ["".join(str(i) for i in range(10))] * 5 - for i in range(2, 10): - s = cStringIO.StringIO() - writer.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i) - assert s.getvalue() == "".join(tst) - - -def test_write_values_after(): - s = cStringIO.StringIO() - r = language.parse_pathod("400:da").next() - language.serve(r, s, {}) - - s = cStringIO.StringIO() - r = language.parse_pathod("400:pa,0").next() - language.serve(r, s, {}) - - s = cStringIO.StringIO() - r = language.parse_pathod("400:ia,'xx'").next() - language.serve(r, s, {}) - assert s.getvalue().endswith('xx') diff --git a/test/test_log.py b/test/test_log.py deleted file mode 100644 index 8f38c040..00000000 --- a/test/test_log.py +++ /dev/null @@ -1,25 +0,0 @@ -import StringIO -from libpathod import log -from netlib.exceptions import TcpDisconnect -import netlib.tcp - - -class DummyIO(StringIO.StringIO): - - def start_log(self, *args, **kwargs): - pass - - def get_log(self, *args, **kwargs): - return "" - - -def test_disconnect(): - outf = DummyIO() - rw = DummyIO() - l = log.ConnectionLogger(outf, False, rw, rw) - try: - with l.ctx() as lg: - lg("Test") - except TcpDisconnect: - pass - assert "Test" in outf.getvalue() diff --git a/test/test_pathoc.py b/test/test_pathoc.py deleted file mode 100644 index 62696a64..00000000 --- a/test/test_pathoc.py +++ /dev/null @@ -1,308 +0,0 @@ -import json -import cStringIO -import re -import OpenSSL -from mock import Mock - -from netlib import tcp, http, socks -from netlib.exceptions import HttpException, TcpException, NetlibException -from netlib.http import http1, http2 - -from libpathod import pathoc, test, version, pathod, language -from netlib.tutils import raises -import tutils - - -def test_response(): - r = http.Response("HTTP/1.1", 200, "Message", {}, None, None) - assert repr(r) - - -class _TestDaemon: - ssloptions = pathod.SSLOptions() - - @classmethod - def setup_class(cls): - cls.d = test.Daemon( - ssl=cls.ssl, - ssloptions=cls.ssloptions, - staticdir=tutils.test_data.path("data"), - anchors=[ - (re.compile("/anchor/.*"), "202") - ] - ) - - @classmethod - def teardown_class(cls): - cls.d.shutdown() - - def setUp(self): - self.d.clear_log() - - def test_info(self): - c = pathoc.Pathoc( - ("127.0.0.1", self.d.port), - ssl=self.ssl, - fp=None - ) - c.connect() - resp = c.request("get:/api/info") - assert tuple(json.loads(resp.content)["version"]) == version.IVERSION - - def tval( - self, - requests, - showreq=False, - showresp=False, - explain=False, - showssl=False, - hexdump=False, - timeout=None, - ignorecodes=(), - ignoretimeout=None, - showsummary=True - ): - s = cStringIO.StringIO() - c = pathoc.Pathoc( - ("127.0.0.1", self.d.port), - ssl=self.ssl, - showreq=showreq, - showresp=showresp, - explain=explain, - hexdump=hexdump, - ignorecodes=ignorecodes, - ignoretimeout=ignoretimeout, - showsummary=showsummary, - fp=s - ) - c.connect(showssl=showssl, fp=s) - if timeout: - c.settimeout(timeout) - for i in requests: - r = language.parse_pathoc(i).next() - if explain: - r = r.freeze(language.Settings()) - try: - c.request(r) - except NetlibException: - pass - return s.getvalue() - - -class TestDaemonSSL(_TestDaemon): - ssl = True - ssloptions = pathod.SSLOptions( - request_client_cert=True, - sans=["test1.com", "test2.com"], - alpn_select=b'h2', - ) - - def test_sni(self): - c = pathoc.Pathoc( - ("127.0.0.1", self.d.port), - ssl=True, - sni="foobar.com", - fp=None - ) - c.connect() - c.request("get:/p/200") - r = c.request("get:/api/log") - d = json.loads(r.content) - assert d["log"][0]["request"]["sni"] == "foobar.com" - - def test_showssl(self): - assert "certificate chain" in self.tval(["get:/p/200"], showssl=True) - - def test_clientcert(self): - c = pathoc.Pathoc( - ("127.0.0.1", self.d.port), - ssl=True, - clientcert=tutils.test_data.path("data/clientcert/client.pem"), - fp=None - ) - c.connect() - c.request("get:/p/200") - r = c.request("get:/api/log") - d = json.loads(r.content) - assert d["log"][0]["request"]["clientcert"]["keyinfo"] - - def test_http2_without_ssl(self): - fp = cStringIO.StringIO() - c = pathoc.Pathoc( - ("127.0.0.1", self.d.port), - use_http2=True, - ssl=False, - fp = fp - ) - tutils.raises(NotImplementedError, c.connect) - - -class TestDaemon(_TestDaemon): - ssl = False - - def test_ssl_error(self): - c = pathoc.Pathoc(("127.0.0.1", self.d.port), ssl=True, fp=None) - tutils.raises("ssl handshake", c.connect) - - def test_showssl(self): - assert not "certificate chain" in self.tval( - ["get:/p/200"], - showssl=True) - - def test_ignorecodes(self): - assert "200" in self.tval(["get:'/p/200:b@1'"]) - assert "200" in self.tval(["get:'/p/200:b@1'"]) - assert "200" in self.tval(["get:'/p/200:b@1'"]) - assert "200" not in self.tval(["get:'/p/200:b@1'"], ignorecodes=[200]) - assert "200" not in self.tval( - ["get:'/p/200:b@1'"], - ignorecodes=[ - 200, - 201]) - assert "202" in self.tval(["get:'/p/202:b@1'"], ignorecodes=[200, 201]) - - def test_timeout(self): - assert "Timeout" in self.tval(["get:'/p/200:p0,100'"], timeout=0.01) - assert "HTTP" in self.tval( - ["get:'/p/200:p5,100'"], - showresp=True, - timeout=1 - ) - assert not "HTTP" in self.tval( - ["get:'/p/200:p3,100'"], - showresp=True, - timeout=1, - ignoretimeout=True - ) - - def test_showresp(self): - reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"] - assert self.tval(reqs).count("200") == 2 - assert self.tval(reqs, showresp=True).count("HTTP/1.1 200 OK") == 2 - assert self.tval( - reqs, showresp=True, hexdump=True - ).count("0000000000") == 2 - - def test_showresp_httperr(self): - v = self.tval(["get:'/p/200:d20'"], showresp=True, showsummary=True) - assert "Invalid headers" in v - assert "HTTP/" in v - - def test_explain(self): - reqs = ["get:/p/200:b@100"] - assert "b@100" not in self.tval(reqs, explain=True) - - def test_showreq(self): - reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"] - assert self.tval(reqs, showreq=True).count("GET /api") == 2 - assert self.tval( - reqs, showreq=True, hexdump=True - ).count("0000000000") == 2 - - def test_conn_err(self): - assert "Invalid server response" in self.tval(["get:'/p/200:d2'"]) - - def test_websocket_shutdown(self): - c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None) - c.connect() - c.request("ws:/") - c.stop() - - def test_wait_finish(self): - c = pathoc.Pathoc( - ("127.0.0.1", self.d.port), - fp=None, - ws_read_limit=1 - ) - c.connect() - c.request("ws:/") - c.request("wf:f'wf:x100'") - [i for i in c.wait(timeout=0, finish=False)] - [i for i in c.wait(timeout=0)] - - def test_connect_fail(self): - to = ("foobar", 80) - c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None) - c.rfile, c.wfile = cStringIO.StringIO(), cStringIO.StringIO() - with raises("connect failed"): - c.http_connect(to) - c.rfile = cStringIO.StringIO( - "HTTP/1.1 500 OK\r\n" - ) - with raises("connect failed"): - c.http_connect(to) - c.rfile = cStringIO.StringIO( - "HTTP/1.1 200 OK\r\n" - ) - c.http_connect(to) - - def test_socks_connect(self): - to = ("foobar", 80) - c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None) - c.rfile, c.wfile = tutils.treader(""), cStringIO.StringIO() - tutils.raises(pathoc.PathocError, c.socks_connect, to) - - c.rfile = tutils.treader( - "\x05\xEE" - ) - tutils.raises("SOCKS without authentication", c.socks_connect, ("example.com", 0xDEAD)) - - c.rfile = tutils.treader( - "\x05\x00" + - "\x05\xEE\x00\x03\x0bexample.com\xDE\xAD" - ) - tutils.raises("SOCKS server error", c.socks_connect, ("example.com", 0xDEAD)) - - c.rfile = tutils.treader( - "\x05\x00" + - "\x05\x00\x00\x03\x0bexample.com\xDE\xAD" - ) - c.socks_connect(("example.com", 0xDEAD)) - - -class TestDaemonHTTP2(_TestDaemon): - ssl = True - - if OpenSSL._util.lib.Cryptography_HAS_ALPN: - - def test_http2(self): - c = pathoc.Pathoc( - ("127.0.0.1", self.d.port), - fp=None, - ssl=True, - use_http2=True, - ) - assert isinstance(c.protocol, http2.HTTP2Protocol) - - c = pathoc.Pathoc( - ("127.0.0.1", self.d.port), - ) - assert c.protocol == http1 - - def test_http2_alpn(self): - c = pathoc.Pathoc( - ("127.0.0.1", self.d.port), - fp=None, - ssl=True, - use_http2=True, - http2_skip_connection_preface=True, - ) - - tmp_convert_to_ssl = c.convert_to_ssl - c.convert_to_ssl = Mock() - c.convert_to_ssl.side_effect = tmp_convert_to_ssl - c.connect() - - _, kwargs = c.convert_to_ssl.call_args - assert set(kwargs['alpn_protos']) == set([b'http/1.1', b'h2']) - - def test_request(self): - c = pathoc.Pathoc( - ("127.0.0.1", self.d.port), - fp=None, - ssl=True, - use_http2=True, - ) - c.connect() - resp = c.request("get:/p/200") - assert resp.status_code == 200 diff --git a/test/test_pathoc_cmdline.py b/test/test_pathoc_cmdline.py deleted file mode 100644 index 74dfef57..00000000 --- a/test/test_pathoc_cmdline.py +++ /dev/null @@ -1,59 +0,0 @@ -from libpathod import pathoc_cmdline as cmdline -import tutils -import cStringIO -import mock - - -@mock.patch("argparse.ArgumentParser.error") -def test_pathoc(perror): - assert cmdline.args_pathoc(["pathoc", "foo.com", "get:/"]) - s = cStringIO.StringIO() - with tutils.raises(SystemExit): - cmdline.args_pathoc(["pathoc", "--show-uas"], s, s) - - a = cmdline.args_pathoc(["pathoc", "foo.com:8888", "get:/"]) - assert a.port == 8888 - - a = cmdline.args_pathoc(["pathoc", "foo.com:xxx", "get:/"]) - assert perror.called - perror.reset_mock() - - a = cmdline.args_pathoc(["pathoc", "-I", "10, 20", "foo.com:8888", "get:/"]) - assert a.ignorecodes == [10, 20] - - a = cmdline.args_pathoc(["pathoc", "-I", "xx, 20", "foo.com:8888", "get:/"]) - assert perror.called - perror.reset_mock() - - a = cmdline.args_pathoc(["pathoc", "-c", "foo:10", "foo.com:8888", "get:/"]) - assert a.connect_to == ["foo", 10] - - a = cmdline.args_pathoc(["pathoc", "foo.com", "get:/", "--http2"]) - assert a.use_http2 == True - assert a.ssl == True - - a = cmdline.args_pathoc(["pathoc", "foo.com", "get:/", "--http2-skip-connection-preface"]) - assert a.use_http2 == True - assert a.ssl == True - assert a.http2_skip_connection_preface == True - - a = cmdline.args_pathoc(["pathoc", "-c", "foo", "foo.com:8888", "get:/"]) - assert perror.called - perror.reset_mock() - - a = cmdline.args_pathoc( - ["pathoc", "-c", "foo:bar", "foo.com:8888", "get:/"]) - assert perror.called - perror.reset_mock() - - a = cmdline.args_pathoc( - [ - "pathoc", - "foo.com:8888", - tutils.test_data.path("data/request") - ] - ) - assert len(list(a.requests)) == 1 - - with tutils.raises(SystemExit): - cmdline.args_pathoc(["pathoc", "foo.com", "invalid"], s, s) diff --git a/test/test_pathod.py b/test/test_pathod.py deleted file mode 100644 index 98da7d28..00000000 --- a/test/test_pathod.py +++ /dev/null @@ -1,289 +0,0 @@ -import sys -import cStringIO -import OpenSSL - -from libpathod import pathod, version -from netlib import tcp, http -from netlib.exceptions import HttpException, TlsException -import tutils - - -class TestPathod(object): - - def test_logging(self): - s = cStringIO.StringIO() - p = pathod.Pathod(("127.0.0.1", 0), logfp=s) - assert len(p.get_log()) == 0 - id = p.add_log(dict(s="foo")) - assert p.log_by_id(id) - assert len(p.get_log()) == 1 - p.clear_log() - assert len(p.get_log()) == 0 - - for _ in range(p.LOGBUF + 1): - p.add_log(dict(s="foo")) - assert len(p.get_log()) <= p.LOGBUF - - -class TestNoWeb(tutils.DaemonTests): - noweb = True - - def test_noweb(self): - assert self.get("200:da").status_code == 200 - assert self.getpath("/").status_code == 800 - - -class TestTimeout(tutils.DaemonTests): - timeout = 0.01 - - def test_noweb(self): - # FIXME: Add float values to spec language, reduce test timeout to - # increase test performance - # This is a bodge - we have some platform difference that causes - # different exceptions to be raised here. - tutils.raises(Exception, self.pathoc, ["get:/:p1,1"]) - assert self.d.last_log()["type"] == "timeout" - - -class TestNoApi(tutils.DaemonTests): - noapi = True - - def test_noapi(self): - assert self.getpath("/log").status_code == 404 - r = self.getpath("/") - assert r.status_code == 200 - assert not "Log" in r.content - - -class TestNotAfterConnect(tutils.DaemonTests): - ssl = False - ssloptions = dict( - not_after_connect=True - ) - - def test_connect(self): - r, _ = self.pathoc( - [r"get:'http://foo.com/p/202':da"], - connect_to=("localhost", self.d.port) - ) - assert r[0].status_code == 202 - - -class TestCustomCert(tutils.DaemonTests): - ssl = True - ssloptions = dict( - certs=[("*", tutils.test_data.path("data/testkey.pem"))], - ) - - def test_connect(self): - r, _ = self.pathoc([r"get:/p/202"]) - r = r[0] - assert r.status_code == 202 - assert r.sslinfo - assert "test.com" in str(r.sslinfo.certchain[0].get_subject()) - - -class TestSSLCN(tutils.DaemonTests): - ssl = True - ssloptions = dict( - cn="foo.com" - ) - - def test_connect(self): - r, _ = self.pathoc([r"get:/p/202"]) - r = r[0] - assert r.status_code == 202 - assert r.sslinfo - assert r.sslinfo.certchain[0].get_subject().CN == "foo.com" - - -class TestNohang(tutils.DaemonTests): - nohang = True - - def test_nohang(self): - r = self.get("200:p0,0") - assert r.status_code == 800 - l = self.d.last_log() - assert "Pauses have been disabled" in l["response"]["msg"] - - -class TestHexdump(tutils.DaemonTests): - hexdump = True - - def test_hexdump(self): - r = self.get(r"200:b'\xf0'") - - -class TestNocraft(tutils.DaemonTests): - nocraft = True - - def test_nocraft(self): - r = self.get(r"200:b'\xf0'") - assert r.status_code == 800 - assert "Crafting disabled" in r.content - - -class CommonTests(tutils.DaemonTests): - - def test_binarydata(self): - r = self.get(r"200:b'\xf0'") - l = self.d.last_log() - # FIXME: Other binary data elements - - def test_sizelimit(self): - r = self.get("200:b@1g") - assert r.status_code == 800 - l = self.d.last_log() - assert "too large" in l["response"]["msg"] - - def test_preline(self): - r, _ = self.pathoc([r"get:'/p/200':i0,'\r\n'"]) - assert r[0].status_code == 200 - - def test_info(self): - assert tuple(self.d.info()["version"]) == version.IVERSION - - def test_logs(self): - assert self.d.clear_log() - assert not self.d.last_log() - rsp = self.get("202:da") - assert len(self.d.log()) == 1 - assert self.d.clear_log() - assert len(self.d.log()) == 0 - - def test_disconnect(self): - rsp = self.get("202:b@100k:d200") - assert len(rsp.content) < 200 - - def test_parserr(self): - rsp = self.get("400:msg,b:") - assert rsp.status_code == 800 - - def test_static(self): - rsp = self.get("200:b<file") - assert rsp.status_code == 200 - assert rsp.content.strip() == "testfile" - - def test_anchor(self): - rsp = self.getpath("anchor/foo") - assert rsp.status_code == 202 - - def test_invalid_first_line(self): - c = tcp.TCPClient(("localhost", self.d.port)) - c.connect() - if self.ssl: - c.convert_to_ssl() - c.wfile.write("foo\n\n\n") - c.wfile.flush() - l = self.d.last_log() - assert l["type"] == "error" - assert "foo" in l["msg"] - - def test_invalid_content_length(self): - tutils.raises( - HttpException, - self.pathoc, - ["get:/:h'content-length'='foo'"] - ) - l = self.d.last_log() - assert l["type"] == "error" - assert "Unparseable Content Length" in l["msg"] - - def test_invalid_headers(self): - tutils.raises(HttpException, self.pathoc, ["get:/:h'\t'='foo'"]) - l = self.d.last_log() - assert l["type"] == "error" - assert "Invalid headers" in l["msg"] - - def test_access_denied(self): - rsp = self.get("=nonexistent") - assert rsp.status_code == 800 - - def test_source_access_denied(self): - rsp = self.get("200:b</foo") - assert rsp.status_code == 800 - assert "File access denied" in rsp.content - - def test_proxy(self): - r, _ = self.pathoc([r"get:'http://foo.com/p/202':da"]) - assert r[0].status_code == 202 - - def test_websocket(self): - r, _ = self.pathoc(["ws:/p/"], ws_read_limit=0) - assert r[0].status_code == 101 - - r, _ = self.pathoc(["ws:/p/ws"], ws_read_limit=0) - assert r[0].status_code == 101 - - def test_websocket_frame(self): - r, _ = self.pathoc( - ["ws:/p/", "wf:f'wf:b\"test\"':pa,1"], - ws_read_limit=1 - ) - assert r[1].payload == "test" - - def test_websocket_frame_reflect_error(self): - r, _ = self.pathoc( - ["ws:/p/", "wf:-mask:knone:f'wf:b@10':i13,'a'"], - ws_read_limit=1, - timeout=1 - ) - # FIXME: Race Condition? - assert "Parse error" in self.d.text_log() - - def test_websocket_frame_disconnect_error(self): - self.pathoc(["ws:/p/", "wf:b@10:d3"], ws_read_limit=0) - assert self.d.last_log() - - -class TestDaemon(CommonTests): - ssl = False - - def test_connect(self): - r, _ = self.pathoc( - [r"get:'http://foo.com/p/202':da"], - connect_to=("localhost", self.d.port), - ssl=True - ) - assert r[0].status_code == 202 - - def test_connect_err(self): - tutils.raises( - HttpException, - self.pathoc, - [r"get:'http://foo.com/p/202':da"], - connect_to=("localhost", self.d.port) - ) - - -class TestDaemonSSL(CommonTests): - ssl = True - - def test_ssl_conn_failure(self): - c = tcp.TCPClient(("localhost", self.d.port)) - c.rbufsize = 0 - c.wbufsize = 0 - c.connect() - c.wfile.write("\0\0\0\0") - tutils.raises(TlsException, c.convert_to_ssl) - l = self.d.last_log() - assert l["type"] == "error" - assert "SSL" in l["msg"] - - def test_ssl_cipher(self): - r, _ = self.pathoc([r"get:/p/202"]) - assert r[0].status_code == 202 - assert self.d.last_log()["cipher"][1] > 0 - - -class TestHTTP2(tutils.DaemonTests): - ssl = True - noweb = True - noapi = True - nohang = True - - if OpenSSL._util.lib.Cryptography_HAS_ALPN: - - def test_http2(self): - r, _ = self.pathoc(["GET:/"], ssl=True, use_http2=True) - assert r[0].status_code == 800 diff --git a/test/test_pathod_cmdline.py b/test/test_pathod_cmdline.py deleted file mode 100644 index 829c4b32..00000000 --- a/test/test_pathod_cmdline.py +++ /dev/null @@ -1,85 +0,0 @@ -from libpathod import pathod_cmdline as cmdline -import tutils -import cStringIO -import mock - - -@mock.patch("argparse.ArgumentParser.error") -def test_pathod(perror): - assert cmdline.args_pathod(["pathod"]) - - a = cmdline.args_pathod( - [ - "pathod", - "--cert", - tutils.test_data.path("data/testkey.pem") - ] - ) - assert a.ssl_certs - - a = cmdline.args_pathod( - [ - "pathod", - "--cert", - "nonexistent" - ] - ) - assert perror.called - perror.reset_mock() - - a = cmdline.args_pathod( - [ - "pathod", - "-a", - "foo=200" - ] - ) - assert a.anchors - - a = cmdline.args_pathod( - [ - "pathod", - "-a", - "foo=" + tutils.test_data.path("data/response") - ] - ) - assert a.anchors - - a = cmdline.args_pathod( - [ - "pathod", - "-a", - "?=200" - ] - ) - assert perror.called - perror.reset_mock() - - a = cmdline.args_pathod( - [ - "pathod", - "-a", - "foo" - ] - ) - assert perror.called - perror.reset_mock() - - a = cmdline.args_pathod( - [ - "pathod", - "--limit-size", - "200k" - ] - ) - assert a.sizelimit - - a = cmdline.args_pathod( - [ - "pathod", - "--limit-size", - "q" - ] - ) - assert perror.called - perror.reset_mock() diff --git a/test/test_test.py b/test/test_test.py deleted file mode 100644 index bd92d864..00000000 --- a/test/test_test.py +++ /dev/null @@ -1,45 +0,0 @@ -import logging -import requests -from libpathod import test -import tutils -logging.disable(logging.CRITICAL) - - -class TestDaemonManual: - - def test_simple(self): - with test.Daemon() as d: - rsp = requests.get("http://localhost:%s/p/202:da" % d.port) - assert rsp.ok - assert rsp.status_code == 202 - with tutils.raises(requests.ConnectionError): - requests.get("http://localhost:%s/p/202:da" % d.port) - - def test_startstop_ssl(self): - d = test.Daemon(ssl=True) - rsp = requests.get( - "https://localhost:%s/p/202:da" % - d.port, - verify=False) - assert rsp.ok - assert rsp.status_code == 202 - d.shutdown() - with tutils.raises(requests.ConnectionError): - requests.get("http://localhost:%s/p/202:da" % d.port) - - def test_startstop_ssl_explicit(self): - ssloptions = dict( - certfile=tutils.test_data.path("data/testkey.pem"), - cacert=tutils.test_data.path("data/testkey.pem"), - ssl_after_connect=False - ) - d = test.Daemon(ssl=ssloptions) - rsp = requests.get( - "https://localhost:%s/p/202:da" % - d.port, - verify=False) - assert rsp.ok - assert rsp.status_code == 202 - d.shutdown() - with tutils.raises(requests.ConnectionError): - requests.get("http://localhost:%s/p/202:da" % d.port) diff --git a/test/test_utils.py b/test/test_utils.py deleted file mode 100644 index 7d24e9e4..00000000 --- a/test/test_utils.py +++ /dev/null @@ -1,39 +0,0 @@ -from libpathod import utils -import tutils - - -def test_membool(): - m = utils.MemBool() - assert not m.v - assert m(1) - assert m.v == 1 - assert m(2) - assert m.v == 2 - - -def test_parse_size(): - assert utils.parse_size("100") == 100 - assert utils.parse_size("100k") == 100 * 1024 - tutils.raises("invalid size spec", utils.parse_size, "foo") - tutils.raises("invalid size spec", utils.parse_size, "100kk") - - -def test_parse_anchor_spec(): - assert utils.parse_anchor_spec("foo=200") == ("foo", "200") - assert utils.parse_anchor_spec("foo") is None - - -def test_data_path(): - tutils.raises(ValueError, utils.data.path, "nonexistent") - - -def test_inner_repr(): - assert utils.inner_repr("\x66") == "\x66" - assert utils.inner_repr(u"foo") == "foo" - - -def test_escape_unprintables(): - s = "".join([chr(i) for i in range(255)]) - e = utils.escape_unprintables(s) - assert e.encode('ascii') - assert not "PATHOD_MARKER" in e diff --git a/test/tutils.py b/test/tutils.py deleted file mode 100644 index 664cdd52..00000000 --- a/test/tutils.py +++ /dev/null @@ -1,128 +0,0 @@ -import tempfile -import os -import re -import shutil -import cStringIO -from contextlib import contextmanager - -import netlib -from libpathod import utils, test, pathoc, pathod, language -from netlib import tcp -import requests - -def treader(bytes): - """ - Construct a tcp.Read object from bytes. - """ - fp = cStringIO.StringIO(bytes) - return tcp.Reader(fp) - - -class DaemonTests(object): - noweb = False - noapi = False - nohang = False - ssl = False - timeout = None - hexdump = False - ssloptions = None - nocraft = False - - @classmethod - def setup_class(cls): - opts = cls.ssloptions or {} - cls.confdir = tempfile.mkdtemp() - opts["confdir"] = cls.confdir - so = pathod.SSLOptions(**opts) - cls.d = test.Daemon( - staticdir=test_data.path("data"), - anchors=[ - (re.compile("/anchor/.*"), "202:da") - ], - ssl=cls.ssl, - ssloptions=so, - sizelimit=1 * 1024 * 1024, - noweb=cls.noweb, - noapi=cls.noapi, - nohang=cls.nohang, - timeout=cls.timeout, - hexdump=cls.hexdump, - nocraft=cls.nocraft, - logreq=True, - logresp=True, - explain=True - ) - - @classmethod - def teardown_class(cls): - cls.d.shutdown() - shutil.rmtree(cls.confdir) - - def teardown(self): - if not (self.noweb or self.noapi): - self.d.clear_log() - - def getpath(self, path, params=None): - scheme = "https" if self.ssl else "http" - resp = requests.get( - "%s://localhost:%s/%s" % ( - scheme, - self.d.port, - path - ), - verify=False, - params=params - ) - return resp - - def get(self, spec): - resp = requests.get(self.d.p(spec), verify=False) - return resp - - def pathoc( - self, - specs, - timeout=None, - connect_to=None, - ssl=None, - ws_read_limit=None, - use_http2=False, - ): - """ - Returns a (messages, text log) tuple. - """ - if ssl is None: - ssl = self.ssl - logfp = cStringIO.StringIO() - c = pathoc.Pathoc( - ("localhost", self.d.port), - ssl=ssl, - ws_read_limit=ws_read_limit, - timeout=timeout, - fp=logfp, - use_http2=use_http2, - ) - c.connect(connect_to) - ret = [] - for i in specs: - resp = c.request(i) - if resp: - ret.append(resp) - for frm in c.wait(): - ret.append(frm) - c.stop() - return ret, logfp.getvalue() - - -tmpdir = netlib.tutils.tmpdir - -raises = netlib.tutils.raises - -test_data = utils.Data(__name__) - - -def render(r, settings=language.Settings()): - r = r.resolve(settings) - s = cStringIO.StringIO() - assert language.serve(r, s, settings) - return s.getvalue() |