diff options
author | Maximilian Hils <git@maximilianhils.com> | 2016-02-15 16:34:22 +0100 |
---|---|---|
committer | Maximilian Hils <git@maximilianhils.com> | 2016-02-15 16:34:22 +0100 |
commit | d7158f975e671b78f0a064dd873cfa7805667528 (patch) | |
tree | 9b40f263c4f613a0dc49f5d8628c371164afd546 /test/pathod | |
parent | 5fe473fb431699c71aa74bb715c2cb5b0500f044 (diff) | |
download | mitmproxy-d7158f975e671b78f0a064dd873cfa7805667528.tar.gz mitmproxy-d7158f975e671b78f0a064dd873cfa7805667528.tar.bz2 mitmproxy-d7158f975e671b78f0a064dd873cfa7805667528.zip |
move tests into shared folder
Diffstat (limited to 'test/pathod')
26 files changed, 2601 insertions, 0 deletions
diff --git a/test/pathod/data/clientcert/.gitignore b/test/pathod/data/clientcert/.gitignore new file mode 100644 index 00000000..07bc53d2 --- /dev/null +++ b/test/pathod/data/clientcert/.gitignore @@ -0,0 +1,3 @@ +client.crt +client.key +client.req diff --git a/test/pathod/data/clientcert/client.cnf b/test/pathod/data/clientcert/client.cnf new file mode 100644 index 00000000..5046a944 --- /dev/null +++ b/test/pathod/data/clientcert/client.cnf @@ -0,0 +1,5 @@ +[ ssl_client ] +basicConstraints = CA:FALSE +nsCertType = client +keyUsage = digitalSignature, keyEncipherment +extendedKeyUsage = clientAuth diff --git a/test/pathod/data/clientcert/client.pem b/test/pathod/data/clientcert/client.pem new file mode 100644 index 00000000..4927bca2 --- /dev/null +++ b/test/pathod/data/clientcert/client.pem @@ -0,0 +1,42 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAzCpoRjSTfIN24kkNap/GYmP9zVWj0Gk8R5BB/PvvN0OB1Zk0 +EEYPsWCcuhEdK0ehiDZX030doF0DOncKKa6mop/d0x2o+ts42peDhZM6JNUrm6d+ +ZWQVtio33mpp77UMhR093vaA+ExDnmE26kBTVijJ1+fRAVDXG/cmQINEri91Kk/G +3YJ5e45UrohGI5seBZ4vV0xbHtmczFRhYFlGOvYsoIe4Lvz/eFS2pIrTIpYQ2VM/ +SQQl+JFy+NlQRsWG2NrxtKOzMnnDE7YN4I3z5D5eZFo1EtwZ48LNCeSwrEOdfuzP +G5q5qbs5KpE/x85H9umuRwSCIArbMwBYV8a8JwIDAQABAoIBAFE3FV/IDltbmHEP +iky93hbJm+6QgKepFReKpRVTyqb7LaygUvueQyPWQMIriKTsy675nxo8DQr7tQsO +y3YlSZgra/xNMikIB6e82c7K8DgyrDQw/rCqjZB3Xt4VCqsWJDLXnQMSn98lx0g7 +d7Lbf8soUpKWXqfdVpSDTi4fibSX6kshXyfSTpcz4AdoncEpViUfU1xkEEmZrjT8 +1GcCsDC41xdNmzCpqRuZX7DKSFRoB+0hUzsC1oiqM7FD5kixonRd4F5PbRXImIzt +6YCsT2okxTA04jX7yByis7LlOLTlkmLtKQYuc3erOFvwx89s4vW+AeFei+GGNitn +tHfSwbECgYEA7SzV+nN62hAERHlg8cEQT4TxnsWvbronYWcc/ev44eHSPDWL5tPi +GHfSbW6YAq5Wa0I9jMWfXyhOYEC3MZTC5EEeLOB71qVrTwcy/sY66rOrcgjFI76Q +5JFHQ4wy3SWU50KxE0oWJO9LIowprG+pW1vzqC3VF0T7q0FqESrY4LUCgYEA3F7Z +80ndnCUlooJAb+Hfotv7peFf1o6+m1PTRcz1lLnVt5R5lXj86kn+tXEpYZo1RiGR +2rE2N0seeznWCooakHcsBN7/qmFIhhooJNF7yW+JP2I4P2UV5+tJ+8bcs/voUkQD +1x+rGOuMn8nvHBd2+Vharft8eGL2mgooPVI2XusCgYEAlMZpO3+w8pTVeHaDP2MR +7i/AuQ3cbCLNjSX3Y7jgGCFllWspZRRIYXzYPNkA9b2SbBnTLjjRLgnEkFBIGgvs +7O2EFjaCuDRvydUEQhjq4ErwIsopj7B8h0QyZcbOKTbn3uFQ3n68wVJx2Sv/ADHT +FIHrp/WIE96r19Niy34LKXkCgYB2W59VsuOKnMz01l5DeR5C+0HSWxS9SReIl2IO +yEFSKullWyJeLIgyUaGy0990430feKI8whcrZXYumuah7IDN/KOwzhCk8vEfzWao +N7bzfqtJVrh9HA7C7DVlO+6H4JFrtcoWPZUIomJ549w/yz6EN3ckoMC+a/Ck1TW9 +ka1QFwKBgQCywG6TrZz0UmOjyLQZ+8Q4uvZklSW5NAKBkNnyuQ2kd5rzyYgMPE8C +Er8T88fdVIKvkhDyHhwcI7n58xE5Gr7wkwsrk/Hbd9/ZB2GgAPY3cATskK1v1McU +YeX38CU0fUS4aoy26hWQXkViB47IGQ3jWo3ZCtzIJl8DI9/RsBWTnw== +-----END RSA PRIVATE KEY----- +-----BEGIN CERTIFICATE----- +MIICYDCCAckCAQEwDQYJKoZIhvcNAQEFBQAwKDESMBAGA1UEAxMJbWl0bXByb3h5 +MRIwEAYDVQQKEwltaXRtcHJveHkwHhcNMTMwMTIwMDEwODEzWhcNMTUxMDE3MDEw +ODEzWjBFMQswCQYDVQQGEwJBVTETMBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UE +ChMYSW50ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOC +AQ8AMIIBCgKCAQEAzCpoRjSTfIN24kkNap/GYmP9zVWj0Gk8R5BB/PvvN0OB1Zk0 +EEYPsWCcuhEdK0ehiDZX030doF0DOncKKa6mop/d0x2o+ts42peDhZM6JNUrm6d+ +ZWQVtio33mpp77UMhR093vaA+ExDnmE26kBTVijJ1+fRAVDXG/cmQINEri91Kk/G +3YJ5e45UrohGI5seBZ4vV0xbHtmczFRhYFlGOvYsoIe4Lvz/eFS2pIrTIpYQ2VM/ +SQQl+JFy+NlQRsWG2NrxtKOzMnnDE7YN4I3z5D5eZFo1EtwZ48LNCeSwrEOdfuzP +G5q5qbs5KpE/x85H9umuRwSCIArbMwBYV8a8JwIDAQABMA0GCSqGSIb3DQEBBQUA +A4GBAFvI+cd47B85PQ970n2dU/PlA2/Hb1ldrrXh2guR4hX6vYx/uuk5yRI/n0Rd +KOXJ3czO0bd2Fpe3ZoNpkW0pOSDej/Q+58ScuJd0gWCT/Sh1eRk6ZdC0kusOuWoY +bPOPMkG45LPgUMFOnZEsfJP6P5mZIxlbCvSMFC25nPHWlct7 +-----END CERTIFICATE----- diff --git a/test/pathod/data/clientcert/make b/test/pathod/data/clientcert/make new file mode 100644 index 00000000..d1caea81 --- /dev/null +++ b/test/pathod/data/clientcert/make @@ -0,0 +1,8 @@ +#!/bin/sh + +openssl genrsa -out client.key 2048 +openssl req -key client.key -new -out client.req +openssl x509 -req -days 365 -in client.req -signkey client.key -out client.crt -extfile client.cnf -extensions ssl_client +openssl x509 -req -days 1000 -in client.req -CA ~/.mitmproxy/mitmproxy-ca.pem -CAkey ~/.mitmproxy/mitmproxy-ca.pem -set_serial 00001 -out client.crt -extensions ssl_client +cat client.key client.crt > client.pem +openssl x509 -text -noout -in client.pem diff --git a/test/pathod/data/file b/test/pathod/data/file new file mode 100644 index 00000000..26918572 --- /dev/null +++ b/test/pathod/data/file @@ -0,0 +1 @@ +testfile diff --git a/test/pathod/data/request b/test/pathod/data/request new file mode 100644 index 00000000..c4c90e76 --- /dev/null +++ b/test/pathod/data/request @@ -0,0 +1 @@ +get:/foo diff --git a/test/pathod/data/response b/test/pathod/data/response new file mode 100644 index 00000000..8f897c85 --- /dev/null +++ b/test/pathod/data/response @@ -0,0 +1 @@ +202 diff --git a/test/pathod/data/testkey.pem b/test/pathod/data/testkey.pem new file mode 100644 index 00000000..b804bd4c --- /dev/null +++ b/test/pathod/data/testkey.pem @@ -0,0 +1,68 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIG5QIBAAKCAYEAwvtKxoZvBV2AxPAkCx8PXbuE7KeqK9bBvk8x+JchPMdf/KZj +sdu2v6Gm8Hi053i7ZGxouFvonJxHAiK6cwk9OYQwa9fbOFf2mgWKEBO4fbCH93tW +DCTdWVxFyNViAvxGHlJs3/IU03pIG29AgUnhRW8pGbabAfx8emcOZJZ3ykEuimaC +4s7mRwdc63GXnbcjTtRkrJsBATI+xvPwuR2+4daX7sPCf0kel3bN2jMpwXfvk/Ww +kJ2BIEeZCg0qIvyMjH9qrUirUnsmQnpPln0CGBbQEBsW9yMfGoFdREiMYls5jZeq +NxjWNv1RTRIm/4RjMwyxnoTA9eDS9wwO2NnJS4vfXAnUTP4BYx8Pe4ZMA2Gm6YrC +ysT6YA1xdHNpcuHXClxwmPj/cm8Z5kIg5clbNIK60ts9yFr/Ao3KPPYJ2GBv8/Oe +ApPBJuubews+/9/13Ew/SJ1t2u28+sPbgXUG8dC2n4vWTvJwKf6Duqxgnm82zdzj +SZoXRQsP984qiN7NAgMBAAECggGBALB6rqWdzCL5DLI0AQun40qdjaR95UKksNvF +5p7we379nl2ZZKb5DSHJ+MWzG1pfJo2wqeAkIBiQQp0mPcgdVrMWeJVD3QHUbDng +RaRjlRr+izJvCeUYANj+8ZLjwECfgf+z7yOLg1oeVeGvAp2C90jXYkYJx6c2lpxb +ZuWYY3hHIw7V1iXfywIDIhFg0TBJMMYK68xmx7QDfFqrNPj4eWsDxqSvvv1iezPw +rkWPBX49RjWPrW5XgSZsZ5J3c+oS1rZmIY7EAgopTWB/3wJjZR1Idz/9l9LIWlBP +6zVC27CIZzSEeGguqNVeyzJ0TPWh5idYNRmSZr6eTUF0245LNO/gqvWKgRSNIZko +HoBa2F1AvCiB67S1kxjwS5y3VkudZE4jkgGKcC2Ws/9QmOZ0HAsjI8VAMp2hj6iN +0HdPMTNtsLgbhKyXsoZuW4YmwfSTPxGi2gvcI7GUozpTz84n1cOneJnz1ygx6Uru +v8DpQg+VX6xTy4X6AK1F8OYNMZ/jaQKBwQDv30NevQStnGbTmcSr+0fd4rmWFklK +V6B2X7zWynVpSGvCWkqVSp3mG6aiZItAltVMRL/9LT6zIheyClyd+vXIjR2+W210 +XMxrvz7A8qCXkvB2dyEhrMdCfZ7p8+kf+eD2c/Mnxb7VpmDfHYLx30JeQoBwjrwU +Eul+dE1P+r8bWBaLTjlsipTya74yItWWAToXAo+s1BXBtXhEsLoe4FghlC0u724d +ucjDaeICdLcerApdvg6Q6p4kVHaoF6ka6I8CgcEA0Bdc05ery9gLC6CclV+BhA5Q +dfDq2P7qhc7e1ipwNRrQo2gy5HhgOkTL3dJWc+8rV6CBP/JfchnsW40tDOnPCTLT +gg3n7vv3RHrtncApXuhIFR+B5xjohTPBzxRUMiAOre2d0F5b6eBXFjptf/1i2tQ+ +qdqJoyOGOZP0hKVslGIfz+CKc6WEkIqX7c91Msdr5myeaWDI5TsurfuKRBH395T3 +BMAi6oinAAEb1rdySenLO2A/0kVmBVlTpaN3TNjjAoHBAMvS4uQ1qSv8okNbfgrF +UqPwa9JkzZImM2tinovFLU9xAl/7aTTCWrmU9Vs4JDuV71kHcjwnngeJCKl4tIpp +HUB06Lk/5xnhYLKNpz087cjeSwXe5IBA2HBfXhFd+NH6+nVwwUUieq4A2n+8C/CK +zVJbH9iE8Lv99fpFyQwU/R63EzD8Hz9j4ny7oLnpb6QvFrVGr98jt/kJwlBb+0sR +RtIBnwMq4F7R5w5lgm6jzpZ5ibVuMeJh+k7Ulp7uu/rpcQKBwQDE3sWIvf7f7PaO +OpbJz0CmYjCHVLWrNIlGrPAv6Jid9U+cuXEkrCpGFl5V77CxIH59+bEuga0BMztl +ZkxP4khoqHhom6VpeWJ3nGGAFJRPYS0JJvTsYalilBPxSYdaoO+iZ6MdxpfozcE2 +m3KLW3uSEqlyYvpCqNJNWQhGEoeGXstADWyPevHPGgAhElwL/ZW8u9inU9Tc4sAI +BGnMer+BsaJ+ERU3lK+Clony+z2aZiFLfIUE93lM6DT2CZBN2QcCgcAVk4L0bfA6 +HFnP/ZWNlnYWpOVFKcq57PX+J5/k7Tf34e2cYM2P0eqYggWZbzVd8qoCOQCHrAx0 +aZSSvEyKAVvzRNeqbm1oXaMojksMnrSX5henHjPbZlr1EmM7+zMnSTMkfVOx/6g1 +97sASej31XdOAgKCBJGymrwvYrCLW+P5cHqd+D8v/PvfpRIQM54p5ixRt3EYZvtR +zGrzsr0OGyOLZtj1DB0a3kvajAAOCl3TawJSzviKo2mwc+/xj28MCQM= +-----END RSA PRIVATE KEY----- +-----BEGIN CERTIFICATE----- +MIIE4TCCA0mgAwIBAgIJALONCAWZxPhUMA0GCSqGSIb3DQEBCwUAMEExCzAJBgNV +BAYTAk5aMQ4wDAYDVQQIDAVPdGFnbzEPMA0GA1UECgwGUGF0aG9kMREwDwYDVQQD +DAh0ZXN0LmNvbTAeFw0xNTA0MTgyMjA0NTNaFw00MjA5MDIyMjA0NTNaMEExCzAJ +BgNVBAYTAk5aMQ4wDAYDVQQIDAVPdGFnbzEPMA0GA1UECgwGUGF0aG9kMREwDwYD +VQQDDAh0ZXN0LmNvbTCCAaIwDQYJKoZIhvcNAQEBBQADggGPADCCAYoCggGBAML7 +SsaGbwVdgMTwJAsfD127hOynqivWwb5PMfiXITzHX/ymY7Hbtr+hpvB4tOd4u2Rs +aLhb6JycRwIiunMJPTmEMGvX2zhX9poFihATuH2wh/d7Vgwk3VlcRcjVYgL8Rh5S +bN/yFNN6SBtvQIFJ4UVvKRm2mwH8fHpnDmSWd8pBLopmguLO5kcHXOtxl523I07U +ZKybAQEyPsbz8LkdvuHWl+7Dwn9JHpd2zdozKcF375P1sJCdgSBHmQoNKiL8jIx/ +aq1Iq1J7JkJ6T5Z9AhgW0BAbFvcjHxqBXURIjGJbOY2XqjcY1jb9UU0SJv+EYzMM +sZ6EwPXg0vcMDtjZyUuL31wJ1Ez+AWMfD3uGTANhpumKwsrE+mANcXRzaXLh1wpc +cJj4/3JvGeZCIOXJWzSCutLbPcha/wKNyjz2Cdhgb/PzngKTwSbrm3sLPv/f9dxM +P0idbdrtvPrD24F1BvHQtp+L1k7ycCn+g7qsYJ5vNs3c40maF0ULD/fOKojezQID +AQABo4HbMIHYMAsGA1UdDwQEAwIFoDAdBgNVHQ4EFgQUbEgfTauEqEP/bnBtby1K +bihJvcswcQYDVR0jBGowaIAUbEgfTauEqEP/bnBtby1KbihJvcuhRaRDMEExCzAJ +BgNVBAYTAk5aMQ4wDAYDVQQIDAVPdGFnbzEPMA0GA1UECgwGUGF0aG9kMREwDwYD +VQQDDAh0ZXN0LmNvbYIJALONCAWZxPhUMAwGA1UdEwQFMAMBAf8wKQYDVR0RBCIw +IIIIdGVzdC5jb22CCXRlc3QyLmNvbYIJdGVzdDMuY29tMA0GCSqGSIb3DQEBCwUA +A4IBgQBcTedXtUb91DxQRtg73iomz7cQ4niZntUBW8iE5rpoA7prtQNGHMCbHwaX +tbWFkzBmL5JTBWvd/6AQ2LtiB3rYB3W/iRhbpsNJ501xaoOguPEQ9720Ph8TEveM +208gNzGsEOcNALwyXj2y9M19NGu9zMa8eu1Tc3IsQaVaGKHx8XZn5HTNUx8EdcwI +Z/Ji9ETDCL7+e5INv0tqfFSazWaQUwxM4IzPMkKTYRcMuN/6eog609k9r9pp32Ut +rKlzc6GIkAlgJJ0Wkoz1V46DmJNJdJG7eLu/mtsB85j6hytIQeWTf1fll5YnMZLF +HgNZtfYn8Q0oTdBQ0ZOaZeQCfZ8emYBdLJf2YB83uGRMjQ1FoeIxzQqiRq8WHRdb +9Q45i0DINMnNp0DbLMA4numZ7wT9SQb6sql9eUyuCNDw7nGIWTHUNfLtU1Er3h1d +icJuApx9+//UN/pGh0yTXb3fZbiI4IehRmkpnIWonIAwaVGm6JZU04wiIn8CuBho +/qQdlS8= +-----END CERTIFICATE----- diff --git a/test/pathod/scripts/generate.sh b/test/pathod/scripts/generate.sh new file mode 100644 index 00000000..eec3077d --- /dev/null +++ b/test/pathod/scripts/generate.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +if [ ! -f ./private.key ] +then + openssl genrsa -out private.key 3072 +fi +openssl req \ + -batch \ + -new -x509 \ + -key private.key \ + -sha256 \ + -out cert.pem \ + -days 9999 \ + -config ./openssl.cnf +openssl x509 -in cert.pem -text -noout +cat ./private.key ./cert.pem > testcert.pem +rm ./private.key ./cert.pem diff --git a/test/pathod/scripts/openssl.cnf b/test/pathod/scripts/openssl.cnf new file mode 100644 index 00000000..5c890354 --- /dev/null +++ b/test/pathod/scripts/openssl.cnf @@ -0,0 +1,39 @@ +[ req ] +default_bits = 1024 +default_keyfile = privkey.pem +distinguished_name = req_distinguished_name +x509_extensions = v3_ca + +[ req_distinguished_name ] +countryName = Country Name (2 letter code) +countryName_default = NZ +countryName_min = 2 +countryName_max = 2 +stateOrProvinceName = State or Province Name (full name) +stateOrProvinceName_default = Otago +localityName = Locality Name (eg, city) +0.organizationName = Organization Name (eg, company) +0.organizationName_default = Pathod +commonName = Common Name (e.g. server FQDN or YOUR name) +commonName_default = test.com +commonName_max = 64 + +[ v3_req ] + +basicConstraints = CA:FALSE +keyUsage = nonRepudiation, digitalSignature, keyEncipherment + +[ v3_ca ] + +keyUsage = digitalSignature, keyEncipherment +subjectKeyIdentifier=hash +authorityKeyIdentifier=keyid:always,issuer:always +basicConstraints = CA:true +subjectAltName = @alternate_names + + +[ alternate_names ] + +DNS.1 = test.com +DNS.2 = test2.com +DNS.3 = test3.com diff --git a/test/pathod/test_app.py b/test/pathod/test_app.py new file mode 100644 index 00000000..4536db8e --- /dev/null +++ b/test/pathod/test_app.py @@ -0,0 +1,85 @@ +import tutils + + +class TestApp(tutils.DaemonTests): + SSL = False + + def test_index(self): + r = self.getpath("/") + assert r.status_code == 200 + assert r.content + + def test_about(self): + r = self.getpath("/about") + assert r.ok + + def test_download(self): + r = self.getpath("/download") + assert r.ok + + def test_docs(self): + assert self.getpath("/docs/pathod").status_code == 200 + assert self.getpath("/docs/pathoc").status_code == 200 + assert self.getpath("/docs/language").status_code == 200 + assert self.getpath("/docs/libpathod").status_code == 200 + assert self.getpath("/docs/test").status_code == 200 + + def test_log(self): + assert self.getpath("/log").status_code == 200 + assert self.get("200:da").status_code == 200 + id = self.d.log()[0]["id"] + assert self.getpath("/log").status_code == 200 + assert self.getpath("/log/%s" % id).status_code == 200 + assert self.getpath("/log/9999999").status_code == 404 + + def test_log_binary(self): + assert self.get("200:h@10b=@10b:da") + + def test_response_preview(self): + r = self.getpath("/response_preview", params=dict(spec="200")) + assert r.status_code == 200 + assert 'Response' in r.content + + r = self.getpath("/response_preview", params=dict(spec="foo")) + assert r.status_code == 200 + assert 'Error' in r.content + + r = self.getpath("/response_preview", params=dict(spec="200:b@100m")) + assert r.status_code == 200 + assert "too large" in r.content + + r = self.getpath("/response_preview", params=dict(spec="200:b@5k")) + assert r.status_code == 200 + assert 'Response' in r.content + + r = self.getpath( + "/response_preview", + params=dict( + spec="200:b<nonexistent")) + assert r.status_code == 200 + assert 'File access denied' in r.content + + r = self.getpath("/response_preview", params=dict(spec="200:b<file")) + assert r.status_code == 200 + assert 'testfile' in r.content + + def test_request_preview(self): + r = self.getpath("/request_preview", params=dict(spec="get:/")) + assert r.status_code == 200 + assert 'Request' in r.content + + r = self.getpath("/request_preview", params=dict(spec="foo")) + assert r.status_code == 200 + assert 'Error' in r.content + + r = self.getpath("/request_preview", params=dict(spec="get:/:b@100m")) + assert r.status_code == 200 + assert "too large" in r.content + + r = self.getpath("/request_preview", params=dict(spec="get:/:b@5k")) + assert r.status_code == 200 + assert 'Request' in r.content + + r = self.getpath("/request_preview", params=dict(spec="")) + assert r.status_code == 200 + assert 'empty spec' in r.content diff --git a/test/pathod/test_language_actions.py b/test/pathod/test_language_actions.py new file mode 100644 index 00000000..755f0d85 --- /dev/null +++ b/test/pathod/test_language_actions.py @@ -0,0 +1,135 @@ +import cStringIO + +from libpathod.language import actions +from libpathod import language + + +def parse_request(s): + return language.parse_pathoc(s).next() + + +def test_unique_name(): + assert not actions.PauseAt(0, "f").unique_name + assert actions.DisconnectAt(0).unique_name + + +class TestDisconnects: + + def test_parse_pathod(self): + a = language.parse_pathod("400:d0").next().actions[0] + assert a.spec() == "d0" + a = language.parse_pathod("400:dr").next().actions[0] + assert a.spec() == "dr" + + def test_at(self): + e = actions.DisconnectAt.expr() + v = e.parseString("d0")[0] + assert isinstance(v, actions.DisconnectAt) + assert v.offset == 0 + + v = e.parseString("d100")[0] + assert v.offset == 100 + + e = actions.DisconnectAt.expr() + v = e.parseString("dr")[0] + assert v.offset == "r" + + def test_spec(self): + assert actions.DisconnectAt("r").spec() == "dr" + assert actions.DisconnectAt(10).spec() == "d10" + + +class TestInject: + + def test_parse_pathod(self): + a = language.parse_pathod("400:ir,@100").next().actions[0] + assert a.offset == "r" + assert a.value.datatype == "bytes" + assert a.value.usize == 100 + + a = language.parse_pathod("400:ia,@100").next().actions[0] + assert a.offset == "a" + + def test_at(self): + e = actions.InjectAt.expr() + v = e.parseString("i0,'foo'")[0] + assert v.value.val == "foo" + assert v.offset == 0 + assert isinstance(v, actions.InjectAt) + + v = e.parseString("ir,'foo'")[0] + assert v.offset == "r" + + def test_serve(self): + s = cStringIO.StringIO() + r = language.parse_pathod("400:i0,'foo'").next() + assert language.serve(r, s, {}) + + def test_spec(self): + e = actions.InjectAt.expr() + v = e.parseString("i0,'foo'")[0] + assert v.spec() == 'i0,"foo"' + + def test_spec(self): + e = actions.InjectAt.expr() + v = e.parseString("i0,@100")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +class TestPauses: + + def test_parse_pathod(self): + e = actions.PauseAt.expr() + v = e.parseString("p10,10")[0] + assert v.seconds == 10 + assert v.offset == 10 + + v = e.parseString("p10,f")[0] + assert v.seconds == "f" + + v = e.parseString("pr,f")[0] + assert v.offset == "r" + + v = e.parseString("pa,f")[0] + assert v.offset == "a" + + def test_request(self): + r = language.parse_pathod('400:p10,10').next() + assert r.actions[0].spec() == "p10,10" + + def test_spec(self): + assert actions.PauseAt("r", 5).spec() == "pr,5" + assert actions.PauseAt(0, 5).spec() == "p0,5" + assert actions.PauseAt(0, "f").spec() == "p0,f" + + def test_freeze(self): + l = actions.PauseAt("r", 5) + assert l.freeze({}).spec() == l.spec() + + +class Test_Action: + + def test_cmp(self): + a = actions.DisconnectAt(0) + b = actions.DisconnectAt(1) + c = actions.DisconnectAt(0) + assert a < b + assert a == c + l = sorted([b, a]) + assert l[0].offset == 0 + + def test_resolve(self): + r = parse_request('GET:"/foo"') + e = actions.DisconnectAt("r") + ret = e.resolve({}, r) + assert isinstance(ret.offset, int) + + def test_repr(self): + e = actions.DisconnectAt("r") + assert repr(e) + + def test_freeze(self): + l = actions.DisconnectAt(5) + assert l.freeze({}).spec() == l.spec() diff --git a/test/pathod/test_language_base.py b/test/pathod/test_language_base.py new file mode 100644 index 00000000..b18ee5b2 --- /dev/null +++ b/test/pathod/test_language_base.py @@ -0,0 +1,352 @@ +import os +from libpathod import language +from libpathod.language import base, exceptions +import tutils + + +def parse_request(s): + return language.parse_pathoc(s).next() + + +def test_times(): + reqs = list(language.parse_pathoc("get:/:x5")) + assert len(reqs) == 5 + assert not reqs[0].times + + +def test_caseless_literal(): + class CL(base.CaselessLiteral): + TOK = "foo" + v = CL("foo") + assert v.expr() + assert v.values(language.Settings()) + + +class TestTokValueNakedLiteral: + + def test_expr(self): + v = base.TokValueNakedLiteral("foo") + assert v.expr() + + def test_spec(self): + v = base.TokValueNakedLiteral("foo") + assert v.spec() == repr(v) == "foo" + + v = base.TokValueNakedLiteral("f\x00oo") + assert v.spec() == repr(v) == r"f\x00oo" + + +class TestTokValueLiteral: + + def test_espr(self): + v = base.TokValueLiteral("foo") + assert v.expr() + assert v.val == "foo" + + v = base.TokValueLiteral("foo\n") + assert v.expr() + assert v.val == "foo\n" + assert repr(v) + + def test_spec(self): + v = base.TokValueLiteral("foo") + assert v.spec() == r"'foo'" + + v = base.TokValueLiteral("f\x00oo") + assert v.spec() == repr(v) == r"'f\x00oo'" + + v = base.TokValueLiteral("\"") + assert v.spec() == repr(v) == '\'"\'' + + def roundtrip(self, spec): + e = base.TokValueLiteral.expr() + v = base.TokValueLiteral(spec) + v2 = e.parseString(v.spec()) + assert v.val == v2[0].val + assert v.spec() == v2[0].spec() + + def test_roundtrip(self): + self.roundtrip("'") + self.roundtrip('\'') + self.roundtrip("a") + self.roundtrip("\"") + # self.roundtrip("\\") + self.roundtrip("200:b'foo':i23,'\\''") + self.roundtrip("\a") + + +class TestTokValueGenerate: + + def test_basic(self): + v = base.TokValue.parseString("@10b")[0] + assert v.usize == 10 + assert v.unit == "b" + assert v.bytes() == 10 + v = base.TokValue.parseString("@10")[0] + assert v.unit == "b" + v = base.TokValue.parseString("@10k")[0] + assert v.bytes() == 10240 + v = base.TokValue.parseString("@10g")[0] + assert v.bytes() == 1024 ** 3 * 10 + + v = base.TokValue.parseString("@10g,digits")[0] + assert v.datatype == "digits" + g = v.get_generator({}) + assert g[:100] + + v = base.TokValue.parseString("@10,digits")[0] + assert v.unit == "b" + assert v.datatype == "digits" + + def test_spec(self): + v = base.TokValueGenerate(1, "b", "bytes") + assert v.spec() == repr(v) == "@1" + + v = base.TokValueGenerate(1, "k", "bytes") + assert v.spec() == repr(v) == "@1k" + + v = base.TokValueGenerate(1, "k", "ascii") + assert v.spec() == repr(v) == "@1k,ascii" + + v = base.TokValueGenerate(1, "b", "ascii") + assert v.spec() == repr(v) == "@1,ascii" + + def test_freeze(self): + v = base.TokValueGenerate(100, "b", "ascii") + f = v.freeze(language.Settings()) + assert len(f.val) == 100 + + +class TestTokValueFile: + + def test_file_value(self): + v = base.TokValue.parseString("<'one two'")[0] + assert str(v) + assert v.path == "one two" + + v = base.TokValue.parseString("<path")[0] + assert v.path == "path" + + def test_access_control(self): + v = base.TokValue.parseString("<path")[0] + with tutils.tmpdir() as t: + p = os.path.join(t, "path") + with open(p, "wb") as f: + f.write("x" * 10000) + + assert v.get_generator(language.Settings(staticdir=t)) + + v = base.TokValue.parseString("<path2")[0] + tutils.raises( + exceptions.FileAccessDenied, + v.get_generator, + language.Settings(staticdir=t) + ) + tutils.raises( + "access disabled", + v.get_generator, + language.Settings() + ) + + v = base.TokValue.parseString("</outside")[0] + tutils.raises( + "outside", + v.get_generator, + language.Settings(staticdir=t) + ) + + def test_spec(self): + v = base.TokValue.parseString("<'one two'")[0] + v2 = base.TokValue.parseString(v.spec())[0] + assert v2.path == "one two" + + def test_freeze(self): + v = base.TokValue.parseString("<'one two'")[0] + v2 = v.freeze({}) + assert v2.path == v.path + + +class TestMisc: + + def test_generators(self): + v = base.TokValue.parseString("'val'")[0] + g = v.get_generator({}) + assert g[:] == "val" + + def test_value(self): + assert base.TokValue.parseString("'val'")[0].val == "val" + assert base.TokValue.parseString('"val"')[0].val == "val" + assert base.TokValue.parseString('"\'val\'"')[0].val == "'val'" + + def test_value(self): + class TT(base.Value): + preamble = "m" + e = TT.expr() + v = e.parseString("m'msg'")[0] + assert v.value.val == "msg" + + s = v.spec() + assert s == e.parseString(s)[0].spec() + + v = e.parseString("m@100")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + def test_fixedlengthvalue(self): + class TT(base.FixedLengthValue): + preamble = "m" + length = 4 + + e = TT.expr() + assert e.parseString("m@4") + tutils.raises("invalid value length", e.parseString, "m@100") + tutils.raises("invalid value length", e.parseString, "m@1") + + with tutils.tmpdir() as t: + p = os.path.join(t, "path") + s = base.Settings(staticdir=t) + with open(p, "wb") as f: + f.write("a" * 20) + v = e.parseString("m<path")[0] + tutils.raises("invalid value length", v.values, s) + + p = os.path.join(t, "path") + with open(p, "wb") as f: + f.write("a" * 4) + v = e.parseString("m<path")[0] + assert v.values(s) + + +class TKeyValue(base.KeyValue): + preamble = "h" + + def values(self, settings): + return [ + self.key.get_generator(settings), + ": ", + self.value.get_generator(settings), + "\r\n", + ] + + +class TestKeyValue: + + def test_simple(self): + e = TKeyValue.expr() + v = e.parseString("h'foo'='bar'")[0] + assert v.key.val == "foo" + assert v.value.val == "bar" + + v2 = e.parseString(v.spec())[0] + assert v2.key.val == v.key.val + assert v2.value.val == v.value.val + + s = v.spec() + assert s == e.parseString(s)[0].spec() + + def test_freeze(self): + e = TKeyValue.expr() + v = e.parseString("h@10=@10'")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.key.val == v3.key.val + assert v2.value.val == v3.value.val + + +def test_intfield(): + class TT(base.IntField): + preamble = "t" + names = { + "one": 1, + "two": 2, + "three": 3 + } + max = 4 + e = TT.expr() + + v = e.parseString("tone")[0] + assert v.value == 1 + assert v.spec() == "tone" + assert v.values(language.Settings()) + + v = e.parseString("t1")[0] + assert v.value == 1 + assert v.spec() == "t1" + + v = e.parseString("t4")[0] + assert v.value == 4 + assert v.spec() == "t4" + + tutils.raises("can't exceed", e.parseString, "t5") + + +def test_options_or_value(): + class TT(base.OptionsOrValue): + options = [ + "one", + "two", + "three" + ] + e = TT.expr() + assert e.parseString("one")[0].value.val == "one" + assert e.parseString("'foo'")[0].value.val == "foo" + assert e.parseString("'get'")[0].value.val == "get" + + assert e.parseString("one")[0].spec() == "one" + assert e.parseString("'foo'")[0].spec() == "'foo'" + + s = e.parseString("one")[0].spec() + assert s == e.parseString(s)[0].spec() + + s = e.parseString("'foo'")[0].spec() + assert s == e.parseString(s)[0].spec() + + v = e.parseString("@100")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +def test_integer(): + e = base.Integer.expr() + v = e.parseString("200")[0] + assert v.string() == "200" + assert v.spec() == "200" + + assert v.freeze({}).value == v.value + + class BInt(base.Integer): + bounds = (1, 5) + + tutils.raises("must be between", BInt, 0) + tutils.raises("must be between", BInt, 6) + assert BInt(5) + assert BInt(1) + assert BInt(3) + + +class TBoolean(base.Boolean): + name = "test" + + +def test_unique_name(): + b = TBoolean(True) + assert b.unique_name + + +class test_boolean(): + e = TBoolean.expr() + assert e.parseString("test")[0].value + assert not e.parseString("-test")[0].value + + def roundtrip(s): + e = TBoolean.expr() + s2 = e.parseString(s)[0].spec() + v1 = e.parseString(s)[0].value + v2 = e.parseString(s2)[0].value + assert s == s2 + assert v1 == v2 + + roundtrip("test") + roundtrip("-test") diff --git a/test/pathod/test_language_generators.py b/test/pathod/test_language_generators.py new file mode 100644 index 00000000..945560c3 --- /dev/null +++ b/test/pathod/test_language_generators.py @@ -0,0 +1,42 @@ +import os + +from libpathod.language import generators +import tutils + + +def test_randomgenerator(): + g = generators.RandomGenerator("bytes", 100) + assert repr(g) + assert len(g[:10]) == 10 + assert len(g[1:10]) == 9 + assert len(g[:1000]) == 100 + assert len(g[1000:1001]) == 0 + assert g[0] + + +def test_filegenerator(): + with tutils.tmpdir() as t: + path = os.path.join(t, "foo") + f = open(path, "wb") + f.write("x" * 10000) + f.close() + g = generators.FileGenerator(path) + assert len(g) == 10000 + assert g[0] == "x" + assert g[-1] == "x" + assert g[0:5] == "xxxxx" + assert repr(g) + # remove all references to FileGenerator instance to close the file + # handle. + del g + + +def test_transform_generator(): + def trans(offset, data): + return "a" * len(data) + g = "one" + t = generators.TransformGenerator(g, trans) + assert len(t) == len(g) + assert t[0] == "a" + assert t[:] == "a" * len(g) + assert repr(t) diff --git a/test/pathod/test_language_http.py b/test/pathod/test_language_http.py new file mode 100644 index 00000000..26bb6a45 --- /dev/null +++ b/test/pathod/test_language_http.py @@ -0,0 +1,358 @@ +import cStringIO + +from libpathod import language +from libpathod.language import http, base +import tutils + + +def parse_request(s): + return language.parse_pathoc(s).next() + + +def test_make_error_response(): + d = cStringIO.StringIO() + s = http.make_error_response("foo") + language.serve(s, d, {}) + + +class TestRequest: + + def test_nonascii(self): + tutils.raises("ascii", parse_request, "get:\xf0") + + def test_err(self): + tutils.raises(language.ParseException, parse_request, 'GET') + + def test_simple(self): + r = parse_request('GET:"/foo"') + assert r.method.string() == "GET" + assert r.path.string() == "/foo" + r = parse_request('GET:/foo') + assert r.path.string() == "/foo" + r = parse_request('GET:@1k') + assert len(r.path.string()) == 1024 + + def test_multiple(self): + r = list(language.parse_pathoc("GET:/ PUT:/")) + assert r[0].method.string() == "GET" + assert r[1].method.string() == "PUT" + assert len(r) == 2 + + l = """ + GET + "/foo" + ir,@1 + + PUT + + "/foo + + + + bar" + + ir,@1 + """ + r = list(language.parse_pathoc(l)) + assert len(r) == 2 + assert r[0].method.string() == "GET" + assert r[1].method.string() == "PUT" + + l = """ + get:"http://localhost:9999/p/200":ir,@1 + get:"http://localhost:9999/p/200":ir,@2 + """ + r = list(language.parse_pathoc(l)) + assert len(r) == 2 + assert r[0].method.string() == "GET" + assert r[1].method.string() == "GET" + + def test_nested_response(self): + l = "get:/p:s'200'" + r = list(language.parse_pathoc(l)) + assert len(r) == 1 + assert len(r[0].tokens) == 3 + assert isinstance(r[0].tokens[2], http.NestedResponse) + assert r[0].values({}) + + def test_render(self): + s = cStringIO.StringIO() + r = parse_request("GET:'/foo'") + assert language.serve( + r, + s, + language.Settings(request_host="foo.com") + ) + + def test_multiline(self): + l = """ + GET + "/foo" + ir,@1 + """ + r = parse_request(l) + assert r.method.string() == "GET" + assert r.path.string() == "/foo" + assert r.actions + + l = """ + GET + + "/foo + + + + bar" + + ir,@1 + """ + r = parse_request(l) + assert r.method.string() == "GET" + assert r.path.string().endswith("bar") + assert r.actions + + def test_spec(self): + def rt(s): + s = parse_request(s).spec() + assert parse_request(s).spec() == s + rt("get:/foo") + rt("get:/foo:da") + + def test_freeze(self): + r = parse_request("GET:/:b@100").freeze(language.Settings()) + assert len(r.spec()) > 100 + + def test_path_generator(self): + r = parse_request("GET:@100").freeze(language.Settings()) + assert len(r.spec()) > 100 + + def test_websocket(self): + r = parse_request('ws:/path/') + res = r.resolve(language.Settings()) + assert res.method.string().lower() == "get" + assert res.tok(http.Path).value.val == "/path/" + assert res.tok(http.Method).value.val.lower() == "get" + assert http.get_header("Upgrade", res.headers).value.val == "websocket" + + r = parse_request('ws:put:/path/') + res = r.resolve(language.Settings()) + assert r.method.string().lower() == "put" + assert res.tok(http.Path).value.val == "/path/" + assert res.tok(http.Method).value.val.lower() == "put" + assert http.get_header("Upgrade", res.headers).value.val == "websocket" + + +class TestResponse: + + def dummy_response(self): + return language.parse_pathod("400'msg'").next() + + def test_response(self): + r = language.parse_pathod("400:m'msg'").next() + assert r.status_code.string() == "400" + assert r.reason.string() == "msg" + + r = language.parse_pathod("400:m'msg':b@100b").next() + assert r.reason.string() == "msg" + assert r.body.values({}) + assert str(r) + + r = language.parse_pathod("200").next() + assert r.status_code.string() == "200" + assert not r.reason + assert "OK" in [i[:] for i in r.preamble({})] + + def test_render(self): + s = cStringIO.StringIO() + r = language.parse_pathod("400:m'msg'").next() + assert language.serve(r, s, {}) + + r = language.parse_pathod("400:p0,100:dr").next() + assert "p0" in r.spec() + s = r.preview_safe() + assert "p0" not in s.spec() + + def test_raw(self): + s = cStringIO.StringIO() + r = language.parse_pathod("400:b'foo'").next() + language.serve(r, s, {}) + v = s.getvalue() + assert "Content-Length" in v + + s = cStringIO.StringIO() + r = language.parse_pathod("400:b'foo':r").next() + language.serve(r, s, {}) + v = s.getvalue() + assert "Content-Length" not in v + + def test_length(self): + def testlen(x): + s = cStringIO.StringIO() + x = x.next() + language.serve(x, s, language.Settings()) + assert x.length(language.Settings()) == len(s.getvalue()) + testlen(language.parse_pathod("400:m'msg':r")) + testlen(language.parse_pathod("400:m'msg':h'foo'='bar':r")) + testlen(language.parse_pathod("400:m'msg':h'foo'='bar':b@100b:r")) + + def test_maximum_length(self): + def testlen(x): + x = x.next() + s = cStringIO.StringIO() + m = x.maximum_length({}) + language.serve(x, s, {}) + assert m >= len(s.getvalue()) + + r = language.parse_pathod("400:m'msg':b@100:d0") + testlen(r) + + r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'") + testlen(r) + + r = language.parse_pathod("400:m'msg':b@100:d0:i0,'foo'") + testlen(r) + + def test_parse_err(self): + tutils.raises( + language.ParseException, language.parse_pathod, "400:msg,b:" + ) + try: + language.parse_pathod("400'msg':b:") + except language.ParseException as v: + assert v.marked() + assert str(v) + + def test_nonascii(self): + tutils.raises("ascii", language.parse_pathod, "foo:b\xf0") + + def test_parse_header(self): + r = language.parse_pathod('400:h"foo"="bar"').next() + assert http.get_header("foo", r.headers) + + def test_parse_pause_before(self): + r = language.parse_pathod("400:p0,10").next() + assert r.actions[0].spec() == "p0,10" + + def test_parse_pause_after(self): + r = language.parse_pathod("400:pa,10").next() + assert r.actions[0].spec() == "pa,10" + + def test_parse_pause_random(self): + r = language.parse_pathod("400:pr,10").next() + assert r.actions[0].spec() == "pr,10" + + def test_parse_stress(self): + # While larger values are known to work on linux, len() technically + # returns an int and a python 2.7 int on windows has 32bit precision. + # Therefore, we should keep the body length < 2147483647 bytes in our + # tests. + r = language.parse_pathod("400:b@1g").next() + assert r.length({}) + + def test_spec(self): + def rt(s): + s = language.parse_pathod(s).next().spec() + assert language.parse_pathod(s).next().spec() == s + rt("400:b@100g") + rt("400") + rt("400:da") + + def test_websockets(self): + r = language.parse_pathod("ws").next() + tutils.raises("no websocket key", r.resolve, language.Settings()) + res = r.resolve(language.Settings(websocket_key="foo")) + assert res.status_code.string() == "101" + + +def test_ctype_shortcut(): + e = http.ShortcutContentType.expr() + v = e.parseString("c'foo'")[0] + assert v.key.val == "Content-Type" + assert v.value.val == "foo" + + s = v.spec() + assert s == e.parseString(s)[0].spec() + + e = http.ShortcutContentType.expr() + v = e.parseString("c@100")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +def test_location_shortcut(): + e = http.ShortcutLocation.expr() + v = e.parseString("l'foo'")[0] + assert v.key.val == "Location" + assert v.value.val == "foo" + + s = v.spec() + assert s == e.parseString(s)[0].spec() + + e = http.ShortcutLocation.expr() + v = e.parseString("l@100")[0] + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +def test_shortcuts(): + assert language.parse_pathod( + "400:c'foo'").next().headers[0].key.val == "Content-Type" + assert language.parse_pathod( + "400:l'foo'").next().headers[0].key.val == "Location" + + assert "Android" in tutils.render(parse_request("get:/:ua")) + assert "User-Agent" in tutils.render(parse_request("get:/:ua")) + + +def test_user_agent(): + e = http.ShortcutUserAgent.expr() + v = e.parseString("ua")[0] + assert "Android" in v.string() + + e = http.ShortcutUserAgent.expr() + v = e.parseString("u'a'")[0] + assert "Android" not in v.string() + + v = e.parseString("u@100'")[0] + assert len(str(v.freeze({}).value)) > 100 + v2 = v.freeze({}) + v3 = v2.freeze({}) + assert v2.value.val == v3.value.val + + +def test_nested_response(): + e = http.NestedResponse.expr() + v = e.parseString("s'200'")[0] + assert v.value.val == "200" + tutils.raises( + language.ParseException, + e.parseString, + "s'foo'" + ) + + v = e.parseString('s"200:b@1"')[0] + assert "@1" in v.spec() + f = v.freeze({}) + assert "@1" not in f.spec() + + +def test_nested_response_freeze(): + e = http.NestedResponse( + base.TokValueLiteral( + "200:b'foo':i10,'\\x27'".encode( + "string_escape" + ) + ) + ) + assert e.freeze({}) + assert e.values({}) + + +def test_unique_components(): + tutils.raises( + "multiple body clauses", + language.parse_pathod, + "400:b@1:b@1" + ) diff --git a/test/pathod/test_language_http2.py b/test/pathod/test_language_http2.py new file mode 100644 index 00000000..9be49452 --- /dev/null +++ b/test/pathod/test_language_http2.py @@ -0,0 +1,233 @@ +import cStringIO + +import netlib +from netlib import tcp +from netlib.http import user_agents + +from libpathod import language +from libpathod.language import http2, base +import tutils + + +def parse_request(s): + return language.parse_pathoc(s, True).next() + + +def parse_response(s): + return language.parse_pathod(s, True).next() + + +def default_settings(): + return language.Settings( + request_host="foo.com", + protocol=netlib.http.http2.HTTP2Protocol(tcp.TCPClient(('localhost', 1234))) + ) + + +def test_make_error_response(): + d = cStringIO.StringIO() + s = http2.make_error_response("foo", "bar") + language.serve(s, d, default_settings()) + + +class TestRequest: + + def test_cached_values(self): + req = parse_request("get:/") + req_id = id(req) + assert req_id == id(req.resolve(default_settings())) + assert req.values(default_settings()) == req.values(default_settings()) + + def test_nonascii(self): + tutils.raises("ascii", parse_request, "get:\xf0") + + def test_err(self): + tutils.raises(language.ParseException, parse_request, 'GET') + + def test_simple(self): + r = parse_request('GET:"/foo"') + assert r.method.string() == "GET" + assert r.path.string() == "/foo" + r = parse_request('GET:/foo') + assert r.path.string() == "/foo" + + def test_multiple(self): + r = list(language.parse_pathoc("GET:/ PUT:/")) + assert r[0].method.string() == "GET" + assert r[1].method.string() == "PUT" + assert len(r) == 2 + + l = """ + GET + "/foo" + + PUT + + "/foo + + + + bar" + """ + r = list(language.parse_pathoc(l, True)) + assert len(r) == 2 + assert r[0].method.string() == "GET" + assert r[1].method.string() == "PUT" + + l = """ + get:"http://localhost:9999/p/200" + get:"http://localhost:9999/p/200" + """ + r = list(language.parse_pathoc(l, True)) + assert len(r) == 2 + assert r[0].method.string() == "GET" + assert r[1].method.string() == "GET" + + def test_render_simple(self): + s = cStringIO.StringIO() + r = parse_request("GET:'/foo'") + assert language.serve( + r, + s, + default_settings(), + ) + + def test_raw_content_length(self): + r = parse_request('GET:/:r') + assert len(r.headers) == 0 + + r = parse_request('GET:/:r:b"foobar"') + assert len(r.headers) == 0 + + r = parse_request('GET:/') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == ("content-length", "0") + + r = parse_request('GET:/:b"foobar"') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == ("content-length", "6") + + r = parse_request('GET:/:b"foobar":h"content-length"="42"') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == ("content-length", "42") + + r = parse_request('GET:/:r:b"foobar":h"content-length"="42"') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == ("content-length", "42") + + def test_content_type(self): + r = parse_request('GET:/:r:c"foobar"') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == ("content-type", "foobar") + + def test_user_agent(self): + r = parse_request('GET:/:r:ua') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == ("user-agent", user_agents.get_by_shortcut('a')[2]) + + def test_render_with_headers(self): + s = cStringIO.StringIO() + r = parse_request('GET:/foo:h"foo"="bar"') + assert language.serve( + r, + s, + default_settings(), + ) + + def test_nested_response(self): + l = "get:/p/:s'200'" + r = parse_request(l) + assert len(r.tokens) == 3 + assert isinstance(r.tokens[2], http2.NestedResponse) + assert r.values(default_settings()) + + + def test_render_with_body(self): + s = cStringIO.StringIO() + r = parse_request("GET:'/foo':bfoobar") + assert language.serve( + r, + s, + default_settings(), + ) + + def test_spec(self): + def rt(s): + s = parse_request(s).spec() + assert parse_request(s).spec() == s + rt("get:/foo") + + +class TestResponse: + + def test_cached_values(self): + res = parse_response("200") + res_id = id(res) + assert res_id == id(res.resolve(default_settings())) + assert res.values(default_settings()) == res.values(default_settings()) + + def test_nonascii(self): + tutils.raises("ascii", parse_response, "200:\xf0") + + def test_err(self): + tutils.raises(language.ParseException, parse_response, 'GET:/') + + def test_raw_content_length(self): + r = parse_response('200:r') + assert len(r.headers) == 0 + + r = parse_response('200') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == ("content-length", "0") + + def test_content_type(self): + r = parse_response('200:r:c"foobar"') + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == ("content-type", "foobar") + + def test_simple(self): + r = parse_response('200:r:h"foo"="bar"') + assert r.status_code.string() == "200" + assert len(r.headers) == 1 + assert r.headers[0].values(default_settings()) == ("foo", "bar") + assert r.body is None + + r = parse_response('200:r:h"foo"="bar":bfoobar:h"bla"="fasel"') + assert r.status_code.string() == "200" + assert len(r.headers) == 2 + assert r.headers[0].values(default_settings()) == ("foo", "bar") + assert r.headers[1].values(default_settings()) == ("bla", "fasel") + assert r.body.string() == "foobar" + + def test_render_simple(self): + s = cStringIO.StringIO() + r = parse_response('200') + assert language.serve( + r, + s, + default_settings(), + ) + + def test_render_with_headers(self): + s = cStringIO.StringIO() + r = parse_response('200:h"foo"="bar"') + assert language.serve( + r, + s, + default_settings(), + ) + + def test_render_with_body(self): + s = cStringIO.StringIO() + r = parse_response('200:bfoobar') + assert language.serve( + r, + s, + default_settings(), + ) + + def test_spec(self): + def rt(s): + s = parse_response(s).spec() + assert parse_response(s).spec() == s + rt("200:bfoobar") diff --git a/test/pathod/test_language_websocket.py b/test/pathod/test_language_websocket.py new file mode 100644 index 00000000..d98fd33e --- /dev/null +++ b/test/pathod/test_language_websocket.py @@ -0,0 +1,142 @@ + +from libpathod import language +from libpathod.language import websockets +import netlib.websockets +import tutils + + +def parse_request(s): + return language.parse_pathoc(s).next() + + +class TestWebsocketFrame: + + def _test_messages(self, specs, message_klass): + for i in specs: + wf = parse_request(i) + assert isinstance(wf, message_klass) + assert wf + assert wf.values(language.Settings()) + assert wf.resolve(language.Settings()) + + spec = wf.spec() + wf2 = parse_request(spec) + assert wf2.spec() == spec + + def test_server_values(self): + specs = [ + "wf", + "wf:dr", + "wf:b'foo'", + "wf:mask:r'foo'", + "wf:l1024:b'foo'", + "wf:cbinary", + "wf:c1", + "wf:mask:knone", + "wf:fin", + "wf:fin:rsv1:rsv2:rsv3:mask", + "wf:-fin:-rsv1:-rsv2:-rsv3:-mask", + "wf:k@4", + "wf:x10", + ] + self._test_messages(specs, websockets.WebsocketFrame) + + def test_parse_websocket_frames(self): + wf = language.parse_websocket_frame("wf:x10") + assert len(list(wf)) == 10 + tutils.raises( + language.ParseException, + language.parse_websocket_frame, + "wf:x" + ) + + def test_client_values(self): + specs = [ + "wf:f'wf'", + ] + self._test_messages(specs, websockets.WebsocketClientFrame) + + def test_nested_frame(self): + wf = parse_request("wf:f'wf'") + assert wf.nested_frame + + def test_flags(self): + wf = parse_request("wf:fin:mask:rsv1:rsv2:rsv3") + frm = netlib.websockets.Frame.from_bytes(tutils.render(wf)) + assert frm.header.fin + assert frm.header.mask + assert frm.header.rsv1 + assert frm.header.rsv2 + assert frm.header.rsv3 + + wf = parse_request("wf:-fin:-mask:-rsv1:-rsv2:-rsv3") + frm = netlib.websockets.Frame.from_bytes(tutils.render(wf)) + assert not frm.header.fin + assert not frm.header.mask + assert not frm.header.rsv1 + assert not frm.header.rsv2 + assert not frm.header.rsv3 + + def fr(self, spec, **kwargs): + settings = language.base.Settings(**kwargs) + wf = parse_request(spec) + return netlib.websockets.Frame.from_bytes(tutils.render(wf, settings)) + + def test_construction(self): + assert self.fr("wf:c1").header.opcode == 1 + assert self.fr("wf:c0").header.opcode == 0 + assert self.fr("wf:cbinary").header.opcode ==\ + netlib.websockets.OPCODE.BINARY + assert self.fr("wf:ctext").header.opcode ==\ + netlib.websockets.OPCODE.TEXT + + def test_rawbody(self): + frm = self.fr("wf:mask:r'foo'") + assert len(frm.payload) == 3 + assert frm.payload != "foo" + + assert self.fr("wf:r'foo'").payload == "foo" + + def test_construction(self): + # Simple server frame + frm = self.fr("wf:b'foo'") + assert not frm.header.mask + assert not frm.header.masking_key + + # Simple client frame + frm = self.fr("wf:b'foo'", is_client=True) + assert frm.header.mask + assert frm.header.masking_key + frm = self.fr("wf:b'foo':k'abcd'", is_client=True) + assert frm.header.mask + assert frm.header.masking_key == 'abcd' + + # Server frame, mask explicitly set + frm = self.fr("wf:b'foo':mask") + assert frm.header.mask + assert frm.header.masking_key + frm = self.fr("wf:b'foo':k'abcd'") + assert frm.header.mask + assert frm.header.masking_key == 'abcd' + + # Client frame, mask explicitly unset + frm = self.fr("wf:b'foo':-mask", is_client=True) + assert not frm.header.mask + assert not frm.header.masking_key + + frm = self.fr("wf:b'foo':-mask:k'abcd'", is_client=True) + assert not frm.header.mask + # We're reading back a corrupted frame - the first 3 characters of the + # mask is mis-interpreted as the payload + assert frm.payload == "abc" + + def test_knone(self): + with tutils.raises("expected 4 bytes"): + self.fr("wf:b'foo':mask:knone") + + def test_length(self): + assert self.fr("wf:l3:b'foo'").header.payload_length == 3 + frm = self.fr("wf:l2:b'foo'") + assert frm.header.payload_length == 2 + assert frm.payload == "fo" + tutils.raises("expected 1024 bytes", self.fr, "wf:l1024:b'foo'") diff --git a/test/pathod/test_language_writer.py b/test/pathod/test_language_writer.py new file mode 100644 index 00000000..1a532903 --- /dev/null +++ b/test/pathod/test_language_writer.py @@ -0,0 +1,91 @@ +import cStringIO + +from libpathod import language +from libpathod.language import writer + + +def test_send_chunk(): + v = "foobarfoobar" + for bs in range(1, len(v) + 2): + s = cStringIO.StringIO() + writer.send_chunk(s, v, bs, 0, len(v)) + assert s.getvalue() == v + for start in range(len(v)): + for end in range(len(v)): + s = cStringIO.StringIO() + writer.send_chunk(s, v, bs, start, end) + assert s.getvalue() == v[start:end] + + +def test_write_values_inject(): + tst = "foo" + + s = cStringIO.StringIO() + writer.write_values(s, [tst], [(0, "inject", "aaa")], blocksize=5) + assert s.getvalue() == "aaafoo" + + s = cStringIO.StringIO() + writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) + assert s.getvalue() == "faaaoo" + + s = cStringIO.StringIO() + writer.write_values(s, [tst], [(1, "inject", "aaa")], blocksize=5) + assert s.getvalue() == "faaaoo" + + +def test_write_values_disconnects(): + s = cStringIO.StringIO() + tst = "foo" * 100 + writer.write_values(s, [tst], [(0, "disconnect")], blocksize=5) + assert not s.getvalue() + + +def test_write_values(): + tst = "foobarvoing" + s = cStringIO.StringIO() + writer.write_values(s, [tst], []) + assert s.getvalue() == tst + + for bs in range(1, len(tst) + 2): + for off in range(len(tst)): + s = cStringIO.StringIO() + writer.write_values( + s, [tst], [(off, "disconnect")], blocksize=bs + ) + assert s.getvalue() == tst[:off] + + +def test_write_values_pauses(): + tst = "".join(str(i) for i in range(10)) + for i in range(2, 10): + s = cStringIO.StringIO() + writer.write_values( + s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i + ) + assert s.getvalue() == tst + + for i in range(2, 10): + s = cStringIO.StringIO() + writer.write_values(s, [tst], [(1, "pause", 0)], blocksize=i) + assert s.getvalue() == tst + + tst = ["".join(str(i) for i in range(10))] * 5 + for i in range(2, 10): + s = cStringIO.StringIO() + writer.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i) + assert s.getvalue() == "".join(tst) + + +def test_write_values_after(): + s = cStringIO.StringIO() + r = language.parse_pathod("400:da").next() + language.serve(r, s, {}) + + s = cStringIO.StringIO() + r = language.parse_pathod("400:pa,0").next() + language.serve(r, s, {}) + + s = cStringIO.StringIO() + r = language.parse_pathod("400:ia,'xx'").next() + language.serve(r, s, {}) + assert s.getvalue().endswith('xx') diff --git a/test/pathod/test_log.py b/test/pathod/test_log.py new file mode 100644 index 00000000..8f38c040 --- /dev/null +++ b/test/pathod/test_log.py @@ -0,0 +1,25 @@ +import StringIO +from libpathod import log +from netlib.exceptions import TcpDisconnect +import netlib.tcp + + +class DummyIO(StringIO.StringIO): + + def start_log(self, *args, **kwargs): + pass + + def get_log(self, *args, **kwargs): + return "" + + +def test_disconnect(): + outf = DummyIO() + rw = DummyIO() + l = log.ConnectionLogger(outf, False, rw, rw) + try: + with l.ctx() as lg: + lg("Test") + except TcpDisconnect: + pass + assert "Test" in outf.getvalue() diff --git a/test/pathod/test_pathoc.py b/test/pathod/test_pathoc.py new file mode 100644 index 00000000..62696a64 --- /dev/null +++ b/test/pathod/test_pathoc.py @@ -0,0 +1,308 @@ +import json +import cStringIO +import re +import OpenSSL +from mock import Mock + +from netlib import tcp, http, socks +from netlib.exceptions import HttpException, TcpException, NetlibException +from netlib.http import http1, http2 + +from libpathod import pathoc, test, version, pathod, language +from netlib.tutils import raises +import tutils + + +def test_response(): + r = http.Response("HTTP/1.1", 200, "Message", {}, None, None) + assert repr(r) + + +class _TestDaemon: + ssloptions = pathod.SSLOptions() + + @classmethod + def setup_class(cls): + cls.d = test.Daemon( + ssl=cls.ssl, + ssloptions=cls.ssloptions, + staticdir=tutils.test_data.path("data"), + anchors=[ + (re.compile("/anchor/.*"), "202") + ] + ) + + @classmethod + def teardown_class(cls): + cls.d.shutdown() + + def setUp(self): + self.d.clear_log() + + def test_info(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + ssl=self.ssl, + fp=None + ) + c.connect() + resp = c.request("get:/api/info") + assert tuple(json.loads(resp.content)["version"]) == version.IVERSION + + def tval( + self, + requests, + showreq=False, + showresp=False, + explain=False, + showssl=False, + hexdump=False, + timeout=None, + ignorecodes=(), + ignoretimeout=None, + showsummary=True + ): + s = cStringIO.StringIO() + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + ssl=self.ssl, + showreq=showreq, + showresp=showresp, + explain=explain, + hexdump=hexdump, + ignorecodes=ignorecodes, + ignoretimeout=ignoretimeout, + showsummary=showsummary, + fp=s + ) + c.connect(showssl=showssl, fp=s) + if timeout: + c.settimeout(timeout) + for i in requests: + r = language.parse_pathoc(i).next() + if explain: + r = r.freeze(language.Settings()) + try: + c.request(r) + except NetlibException: + pass + return s.getvalue() + + +class TestDaemonSSL(_TestDaemon): + ssl = True + ssloptions = pathod.SSLOptions( + request_client_cert=True, + sans=["test1.com", "test2.com"], + alpn_select=b'h2', + ) + + def test_sni(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + ssl=True, + sni="foobar.com", + fp=None + ) + c.connect() + c.request("get:/p/200") + r = c.request("get:/api/log") + d = json.loads(r.content) + assert d["log"][0]["request"]["sni"] == "foobar.com" + + def test_showssl(self): + assert "certificate chain" in self.tval(["get:/p/200"], showssl=True) + + def test_clientcert(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + ssl=True, + clientcert=tutils.test_data.path("data/clientcert/client.pem"), + fp=None + ) + c.connect() + c.request("get:/p/200") + r = c.request("get:/api/log") + d = json.loads(r.content) + assert d["log"][0]["request"]["clientcert"]["keyinfo"] + + def test_http2_without_ssl(self): + fp = cStringIO.StringIO() + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + use_http2=True, + ssl=False, + fp = fp + ) + tutils.raises(NotImplementedError, c.connect) + + +class TestDaemon(_TestDaemon): + ssl = False + + def test_ssl_error(self): + c = pathoc.Pathoc(("127.0.0.1", self.d.port), ssl=True, fp=None) + tutils.raises("ssl handshake", c.connect) + + def test_showssl(self): + assert not "certificate chain" in self.tval( + ["get:/p/200"], + showssl=True) + + def test_ignorecodes(self): + assert "200" in self.tval(["get:'/p/200:b@1'"]) + assert "200" in self.tval(["get:'/p/200:b@1'"]) + assert "200" in self.tval(["get:'/p/200:b@1'"]) + assert "200" not in self.tval(["get:'/p/200:b@1'"], ignorecodes=[200]) + assert "200" not in self.tval( + ["get:'/p/200:b@1'"], + ignorecodes=[ + 200, + 201]) + assert "202" in self.tval(["get:'/p/202:b@1'"], ignorecodes=[200, 201]) + + def test_timeout(self): + assert "Timeout" in self.tval(["get:'/p/200:p0,100'"], timeout=0.01) + assert "HTTP" in self.tval( + ["get:'/p/200:p5,100'"], + showresp=True, + timeout=1 + ) + assert not "HTTP" in self.tval( + ["get:'/p/200:p3,100'"], + showresp=True, + timeout=1, + ignoretimeout=True + ) + + def test_showresp(self): + reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"] + assert self.tval(reqs).count("200") == 2 + assert self.tval(reqs, showresp=True).count("HTTP/1.1 200 OK") == 2 + assert self.tval( + reqs, showresp=True, hexdump=True + ).count("0000000000") == 2 + + def test_showresp_httperr(self): + v = self.tval(["get:'/p/200:d20'"], showresp=True, showsummary=True) + assert "Invalid headers" in v + assert "HTTP/" in v + + def test_explain(self): + reqs = ["get:/p/200:b@100"] + assert "b@100" not in self.tval(reqs, explain=True) + + def test_showreq(self): + reqs = ["get:/api/info:p0,0", "get:/api/info:p0,0"] + assert self.tval(reqs, showreq=True).count("GET /api") == 2 + assert self.tval( + reqs, showreq=True, hexdump=True + ).count("0000000000") == 2 + + def test_conn_err(self): + assert "Invalid server response" in self.tval(["get:'/p/200:d2'"]) + + def test_websocket_shutdown(self): + c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None) + c.connect() + c.request("ws:/") + c.stop() + + def test_wait_finish(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + fp=None, + ws_read_limit=1 + ) + c.connect() + c.request("ws:/") + c.request("wf:f'wf:x100'") + [i for i in c.wait(timeout=0, finish=False)] + [i for i in c.wait(timeout=0)] + + def test_connect_fail(self): + to = ("foobar", 80) + c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None) + c.rfile, c.wfile = cStringIO.StringIO(), cStringIO.StringIO() + with raises("connect failed"): + c.http_connect(to) + c.rfile = cStringIO.StringIO( + "HTTP/1.1 500 OK\r\n" + ) + with raises("connect failed"): + c.http_connect(to) + c.rfile = cStringIO.StringIO( + "HTTP/1.1 200 OK\r\n" + ) + c.http_connect(to) + + def test_socks_connect(self): + to = ("foobar", 80) + c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None) + c.rfile, c.wfile = tutils.treader(""), cStringIO.StringIO() + tutils.raises(pathoc.PathocError, c.socks_connect, to) + + c.rfile = tutils.treader( + "\x05\xEE" + ) + tutils.raises("SOCKS without authentication", c.socks_connect, ("example.com", 0xDEAD)) + + c.rfile = tutils.treader( + "\x05\x00" + + "\x05\xEE\x00\x03\x0bexample.com\xDE\xAD" + ) + tutils.raises("SOCKS server error", c.socks_connect, ("example.com", 0xDEAD)) + + c.rfile = tutils.treader( + "\x05\x00" + + "\x05\x00\x00\x03\x0bexample.com\xDE\xAD" + ) + c.socks_connect(("example.com", 0xDEAD)) + + +class TestDaemonHTTP2(_TestDaemon): + ssl = True + + if OpenSSL._util.lib.Cryptography_HAS_ALPN: + + def test_http2(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + fp=None, + ssl=True, + use_http2=True, + ) + assert isinstance(c.protocol, http2.HTTP2Protocol) + + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + ) + assert c.protocol == http1 + + def test_http2_alpn(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + fp=None, + ssl=True, + use_http2=True, + http2_skip_connection_preface=True, + ) + + tmp_convert_to_ssl = c.convert_to_ssl + c.convert_to_ssl = Mock() + c.convert_to_ssl.side_effect = tmp_convert_to_ssl + c.connect() + + _, kwargs = c.convert_to_ssl.call_args + assert set(kwargs['alpn_protos']) == set([b'http/1.1', b'h2']) + + def test_request(self): + c = pathoc.Pathoc( + ("127.0.0.1", self.d.port), + fp=None, + ssl=True, + use_http2=True, + ) + c.connect() + resp = c.request("get:/p/200") + assert resp.status_code == 200 diff --git a/test/pathod/test_pathoc_cmdline.py b/test/pathod/test_pathoc_cmdline.py new file mode 100644 index 00000000..74dfef57 --- /dev/null +++ b/test/pathod/test_pathoc_cmdline.py @@ -0,0 +1,59 @@ +from libpathod import pathoc_cmdline as cmdline +import tutils +import cStringIO +import mock + + +@mock.patch("argparse.ArgumentParser.error") +def test_pathoc(perror): + assert cmdline.args_pathoc(["pathoc", "foo.com", "get:/"]) + s = cStringIO.StringIO() + with tutils.raises(SystemExit): + cmdline.args_pathoc(["pathoc", "--show-uas"], s, s) + + a = cmdline.args_pathoc(["pathoc", "foo.com:8888", "get:/"]) + assert a.port == 8888 + + a = cmdline.args_pathoc(["pathoc", "foo.com:xxx", "get:/"]) + assert perror.called + perror.reset_mock() + + a = cmdline.args_pathoc(["pathoc", "-I", "10, 20", "foo.com:8888", "get:/"]) + assert a.ignorecodes == [10, 20] + + a = cmdline.args_pathoc(["pathoc", "-I", "xx, 20", "foo.com:8888", "get:/"]) + assert perror.called + perror.reset_mock() + + a = cmdline.args_pathoc(["pathoc", "-c", "foo:10", "foo.com:8888", "get:/"]) + assert a.connect_to == ["foo", 10] + + a = cmdline.args_pathoc(["pathoc", "foo.com", "get:/", "--http2"]) + assert a.use_http2 == True + assert a.ssl == True + + a = cmdline.args_pathoc(["pathoc", "foo.com", "get:/", "--http2-skip-connection-preface"]) + assert a.use_http2 == True + assert a.ssl == True + assert a.http2_skip_connection_preface == True + + a = cmdline.args_pathoc(["pathoc", "-c", "foo", "foo.com:8888", "get:/"]) + assert perror.called + perror.reset_mock() + + a = cmdline.args_pathoc( + ["pathoc", "-c", "foo:bar", "foo.com:8888", "get:/"]) + assert perror.called + perror.reset_mock() + + a = cmdline.args_pathoc( + [ + "pathoc", + "foo.com:8888", + tutils.test_data.path("data/request") + ] + ) + assert len(list(a.requests)) == 1 + + with tutils.raises(SystemExit): + cmdline.args_pathoc(["pathoc", "foo.com", "invalid"], s, s) diff --git a/test/pathod/test_pathod.py b/test/pathod/test_pathod.py new file mode 100644 index 00000000..98da7d28 --- /dev/null +++ b/test/pathod/test_pathod.py @@ -0,0 +1,289 @@ +import sys +import cStringIO +import OpenSSL + +from libpathod import pathod, version +from netlib import tcp, http +from netlib.exceptions import HttpException, TlsException +import tutils + + +class TestPathod(object): + + def test_logging(self): + s = cStringIO.StringIO() + p = pathod.Pathod(("127.0.0.1", 0), logfp=s) + assert len(p.get_log()) == 0 + id = p.add_log(dict(s="foo")) + assert p.log_by_id(id) + assert len(p.get_log()) == 1 + p.clear_log() + assert len(p.get_log()) == 0 + + for _ in range(p.LOGBUF + 1): + p.add_log(dict(s="foo")) + assert len(p.get_log()) <= p.LOGBUF + + +class TestNoWeb(tutils.DaemonTests): + noweb = True + + def test_noweb(self): + assert self.get("200:da").status_code == 200 + assert self.getpath("/").status_code == 800 + + +class TestTimeout(tutils.DaemonTests): + timeout = 0.01 + + def test_noweb(self): + # FIXME: Add float values to spec language, reduce test timeout to + # increase test performance + # This is a bodge - we have some platform difference that causes + # different exceptions to be raised here. + tutils.raises(Exception, self.pathoc, ["get:/:p1,1"]) + assert self.d.last_log()["type"] == "timeout" + + +class TestNoApi(tutils.DaemonTests): + noapi = True + + def test_noapi(self): + assert self.getpath("/log").status_code == 404 + r = self.getpath("/") + assert r.status_code == 200 + assert not "Log" in r.content + + +class TestNotAfterConnect(tutils.DaemonTests): + ssl = False + ssloptions = dict( + not_after_connect=True + ) + + def test_connect(self): + r, _ = self.pathoc( + [r"get:'http://foo.com/p/202':da"], + connect_to=("localhost", self.d.port) + ) + assert r[0].status_code == 202 + + +class TestCustomCert(tutils.DaemonTests): + ssl = True + ssloptions = dict( + certs=[("*", tutils.test_data.path("data/testkey.pem"))], + ) + + def test_connect(self): + r, _ = self.pathoc([r"get:/p/202"]) + r = r[0] + assert r.status_code == 202 + assert r.sslinfo + assert "test.com" in str(r.sslinfo.certchain[0].get_subject()) + + +class TestSSLCN(tutils.DaemonTests): + ssl = True + ssloptions = dict( + cn="foo.com" + ) + + def test_connect(self): + r, _ = self.pathoc([r"get:/p/202"]) + r = r[0] + assert r.status_code == 202 + assert r.sslinfo + assert r.sslinfo.certchain[0].get_subject().CN == "foo.com" + + +class TestNohang(tutils.DaemonTests): + nohang = True + + def test_nohang(self): + r = self.get("200:p0,0") + assert r.status_code == 800 + l = self.d.last_log() + assert "Pauses have been disabled" in l["response"]["msg"] + + +class TestHexdump(tutils.DaemonTests): + hexdump = True + + def test_hexdump(self): + r = self.get(r"200:b'\xf0'") + + +class TestNocraft(tutils.DaemonTests): + nocraft = True + + def test_nocraft(self): + r = self.get(r"200:b'\xf0'") + assert r.status_code == 800 + assert "Crafting disabled" in r.content + + +class CommonTests(tutils.DaemonTests): + + def test_binarydata(self): + r = self.get(r"200:b'\xf0'") + l = self.d.last_log() + # FIXME: Other binary data elements + + def test_sizelimit(self): + r = self.get("200:b@1g") + assert r.status_code == 800 + l = self.d.last_log() + assert "too large" in l["response"]["msg"] + + def test_preline(self): + r, _ = self.pathoc([r"get:'/p/200':i0,'\r\n'"]) + assert r[0].status_code == 200 + + def test_info(self): + assert tuple(self.d.info()["version"]) == version.IVERSION + + def test_logs(self): + assert self.d.clear_log() + assert not self.d.last_log() + rsp = self.get("202:da") + assert len(self.d.log()) == 1 + assert self.d.clear_log() + assert len(self.d.log()) == 0 + + def test_disconnect(self): + rsp = self.get("202:b@100k:d200") + assert len(rsp.content) < 200 + + def test_parserr(self): + rsp = self.get("400:msg,b:") + assert rsp.status_code == 800 + + def test_static(self): + rsp = self.get("200:b<file") + assert rsp.status_code == 200 + assert rsp.content.strip() == "testfile" + + def test_anchor(self): + rsp = self.getpath("anchor/foo") + assert rsp.status_code == 202 + + def test_invalid_first_line(self): + c = tcp.TCPClient(("localhost", self.d.port)) + c.connect() + if self.ssl: + c.convert_to_ssl() + c.wfile.write("foo\n\n\n") + c.wfile.flush() + l = self.d.last_log() + assert l["type"] == "error" + assert "foo" in l["msg"] + + def test_invalid_content_length(self): + tutils.raises( + HttpException, + self.pathoc, + ["get:/:h'content-length'='foo'"] + ) + l = self.d.last_log() + assert l["type"] == "error" + assert "Unparseable Content Length" in l["msg"] + + def test_invalid_headers(self): + tutils.raises(HttpException, self.pathoc, ["get:/:h'\t'='foo'"]) + l = self.d.last_log() + assert l["type"] == "error" + assert "Invalid headers" in l["msg"] + + def test_access_denied(self): + rsp = self.get("=nonexistent") + assert rsp.status_code == 800 + + def test_source_access_denied(self): + rsp = self.get("200:b</foo") + assert rsp.status_code == 800 + assert "File access denied" in rsp.content + + def test_proxy(self): + r, _ = self.pathoc([r"get:'http://foo.com/p/202':da"]) + assert r[0].status_code == 202 + + def test_websocket(self): + r, _ = self.pathoc(["ws:/p/"], ws_read_limit=0) + assert r[0].status_code == 101 + + r, _ = self.pathoc(["ws:/p/ws"], ws_read_limit=0) + assert r[0].status_code == 101 + + def test_websocket_frame(self): + r, _ = self.pathoc( + ["ws:/p/", "wf:f'wf:b\"test\"':pa,1"], + ws_read_limit=1 + ) + assert r[1].payload == "test" + + def test_websocket_frame_reflect_error(self): + r, _ = self.pathoc( + ["ws:/p/", "wf:-mask:knone:f'wf:b@10':i13,'a'"], + ws_read_limit=1, + timeout=1 + ) + # FIXME: Race Condition? + assert "Parse error" in self.d.text_log() + + def test_websocket_frame_disconnect_error(self): + self.pathoc(["ws:/p/", "wf:b@10:d3"], ws_read_limit=0) + assert self.d.last_log() + + +class TestDaemon(CommonTests): + ssl = False + + def test_connect(self): + r, _ = self.pathoc( + [r"get:'http://foo.com/p/202':da"], + connect_to=("localhost", self.d.port), + ssl=True + ) + assert r[0].status_code == 202 + + def test_connect_err(self): + tutils.raises( + HttpException, + self.pathoc, + [r"get:'http://foo.com/p/202':da"], + connect_to=("localhost", self.d.port) + ) + + +class TestDaemonSSL(CommonTests): + ssl = True + + def test_ssl_conn_failure(self): + c = tcp.TCPClient(("localhost", self.d.port)) + c.rbufsize = 0 + c.wbufsize = 0 + c.connect() + c.wfile.write("\0\0\0\0") + tutils.raises(TlsException, c.convert_to_ssl) + l = self.d.last_log() + assert l["type"] == "error" + assert "SSL" in l["msg"] + + def test_ssl_cipher(self): + r, _ = self.pathoc([r"get:/p/202"]) + assert r[0].status_code == 202 + assert self.d.last_log()["cipher"][1] > 0 + + +class TestHTTP2(tutils.DaemonTests): + ssl = True + noweb = True + noapi = True + nohang = True + + if OpenSSL._util.lib.Cryptography_HAS_ALPN: + + def test_http2(self): + r, _ = self.pathoc(["GET:/"], ssl=True, use_http2=True) + assert r[0].status_code == 800 diff --git a/test/pathod/test_pathod_cmdline.py b/test/pathod/test_pathod_cmdline.py new file mode 100644 index 00000000..829c4b32 --- /dev/null +++ b/test/pathod/test_pathod_cmdline.py @@ -0,0 +1,85 @@ +from libpathod import pathod_cmdline as cmdline +import tutils +import cStringIO +import mock + + +@mock.patch("argparse.ArgumentParser.error") +def test_pathod(perror): + assert cmdline.args_pathod(["pathod"]) + + a = cmdline.args_pathod( + [ + "pathod", + "--cert", + tutils.test_data.path("data/testkey.pem") + ] + ) + assert a.ssl_certs + + a = cmdline.args_pathod( + [ + "pathod", + "--cert", + "nonexistent" + ] + ) + assert perror.called + perror.reset_mock() + + a = cmdline.args_pathod( + [ + "pathod", + "-a", + "foo=200" + ] + ) + assert a.anchors + + a = cmdline.args_pathod( + [ + "pathod", + "-a", + "foo=" + tutils.test_data.path("data/response") + ] + ) + assert a.anchors + + a = cmdline.args_pathod( + [ + "pathod", + "-a", + "?=200" + ] + ) + assert perror.called + perror.reset_mock() + + a = cmdline.args_pathod( + [ + "pathod", + "-a", + "foo" + ] + ) + assert perror.called + perror.reset_mock() + + a = cmdline.args_pathod( + [ + "pathod", + "--limit-size", + "200k" + ] + ) + assert a.sizelimit + + a = cmdline.args_pathod( + [ + "pathod", + "--limit-size", + "q" + ] + ) + assert perror.called + perror.reset_mock() diff --git a/test/pathod/test_test.py b/test/pathod/test_test.py new file mode 100644 index 00000000..bd92d864 --- /dev/null +++ b/test/pathod/test_test.py @@ -0,0 +1,45 @@ +import logging +import requests +from libpathod import test +import tutils +logging.disable(logging.CRITICAL) + + +class TestDaemonManual: + + def test_simple(self): + with test.Daemon() as d: + rsp = requests.get("http://localhost:%s/p/202:da" % d.port) + assert rsp.ok + assert rsp.status_code == 202 + with tutils.raises(requests.ConnectionError): + requests.get("http://localhost:%s/p/202:da" % d.port) + + def test_startstop_ssl(self): + d = test.Daemon(ssl=True) + rsp = requests.get( + "https://localhost:%s/p/202:da" % + d.port, + verify=False) + assert rsp.ok + assert rsp.status_code == 202 + d.shutdown() + with tutils.raises(requests.ConnectionError): + requests.get("http://localhost:%s/p/202:da" % d.port) + + def test_startstop_ssl_explicit(self): + ssloptions = dict( + certfile=tutils.test_data.path("data/testkey.pem"), + cacert=tutils.test_data.path("data/testkey.pem"), + ssl_after_connect=False + ) + d = test.Daemon(ssl=ssloptions) + rsp = requests.get( + "https://localhost:%s/p/202:da" % + d.port, + verify=False) + assert rsp.ok + assert rsp.status_code == 202 + d.shutdown() + with tutils.raises(requests.ConnectionError): + requests.get("http://localhost:%s/p/202:da" % d.port) diff --git a/test/pathod/test_utils.py b/test/pathod/test_utils.py new file mode 100644 index 00000000..7d24e9e4 --- /dev/null +++ b/test/pathod/test_utils.py @@ -0,0 +1,39 @@ +from libpathod import utils +import tutils + + +def test_membool(): + m = utils.MemBool() + assert not m.v + assert m(1) + assert m.v == 1 + assert m(2) + assert m.v == 2 + + +def test_parse_size(): + assert utils.parse_size("100") == 100 + assert utils.parse_size("100k") == 100 * 1024 + tutils.raises("invalid size spec", utils.parse_size, "foo") + tutils.raises("invalid size spec", utils.parse_size, "100kk") + + +def test_parse_anchor_spec(): + assert utils.parse_anchor_spec("foo=200") == ("foo", "200") + assert utils.parse_anchor_spec("foo") is None + + +def test_data_path(): + tutils.raises(ValueError, utils.data.path, "nonexistent") + + +def test_inner_repr(): + assert utils.inner_repr("\x66") == "\x66" + assert utils.inner_repr(u"foo") == "foo" + + +def test_escape_unprintables(): + s = "".join([chr(i) for i in range(255)]) + e = utils.escape_unprintables(s) + assert e.encode('ascii') + assert not "PATHOD_MARKER" in e diff --git a/test/pathod/tutils.py b/test/pathod/tutils.py new file mode 100644 index 00000000..664cdd52 --- /dev/null +++ b/test/pathod/tutils.py @@ -0,0 +1,128 @@ +import tempfile +import os +import re +import shutil +import cStringIO +from contextlib import contextmanager + +import netlib +from libpathod import utils, test, pathoc, pathod, language +from netlib import tcp +import requests + +def treader(bytes): + """ + Construct a tcp.Read object from bytes. + """ + fp = cStringIO.StringIO(bytes) + return tcp.Reader(fp) + + +class DaemonTests(object): + noweb = False + noapi = False + nohang = False + ssl = False + timeout = None + hexdump = False + ssloptions = None + nocraft = False + + @classmethod + def setup_class(cls): + opts = cls.ssloptions or {} + cls.confdir = tempfile.mkdtemp() + opts["confdir"] = cls.confdir + so = pathod.SSLOptions(**opts) + cls.d = test.Daemon( + staticdir=test_data.path("data"), + anchors=[ + (re.compile("/anchor/.*"), "202:da") + ], + ssl=cls.ssl, + ssloptions=so, + sizelimit=1 * 1024 * 1024, + noweb=cls.noweb, + noapi=cls.noapi, + nohang=cls.nohang, + timeout=cls.timeout, + hexdump=cls.hexdump, + nocraft=cls.nocraft, + logreq=True, + logresp=True, + explain=True + ) + + @classmethod + def teardown_class(cls): + cls.d.shutdown() + shutil.rmtree(cls.confdir) + + def teardown(self): + if not (self.noweb or self.noapi): + self.d.clear_log() + + def getpath(self, path, params=None): + scheme = "https" if self.ssl else "http" + resp = requests.get( + "%s://localhost:%s/%s" % ( + scheme, + self.d.port, + path + ), + verify=False, + params=params + ) + return resp + + def get(self, spec): + resp = requests.get(self.d.p(spec), verify=False) + return resp + + def pathoc( + self, + specs, + timeout=None, + connect_to=None, + ssl=None, + ws_read_limit=None, + use_http2=False, + ): + """ + Returns a (messages, text log) tuple. + """ + if ssl is None: + ssl = self.ssl + logfp = cStringIO.StringIO() + c = pathoc.Pathoc( + ("localhost", self.d.port), + ssl=ssl, + ws_read_limit=ws_read_limit, + timeout=timeout, + fp=logfp, + use_http2=use_http2, + ) + c.connect(connect_to) + ret = [] + for i in specs: + resp = c.request(i) + if resp: + ret.append(resp) + for frm in c.wait(): + ret.append(frm) + c.stop() + return ret, logfp.getvalue() + + +tmpdir = netlib.tutils.tmpdir + +raises = netlib.tutils.raises + +test_data = utils.Data(__name__) + + +def render(r, settings=language.Settings()): + r = r.resolve(settings) + s = cStringIO.StringIO() + assert language.serve(r, s, settings) + return s.getvalue() |