From 049d3d2b45d42245a305bd0bd3b5164a44dc7ba2 Mon Sep 17 00:00:00 2001 From: Aldo Cortesi Date: Thu, 7 Jun 2012 08:50:06 +1200 Subject: lippathod/app.py -> libpathod/pathod.py --- libpathod/app.py | 215 ---------------------------------------------------- libpathod/pathod.py | 215 ++++++++++++++++++++++++++++++++++++++++++++++++++++ pathod | 6 +- test/test_app.py | 63 --------------- test/test_pathod.py | 63 +++++++++++++++ 5 files changed, 281 insertions(+), 281 deletions(-) delete mode 100644 libpathod/app.py create mode 100644 libpathod/pathod.py delete mode 100644 test/test_app.py create mode 100644 test/test_pathod.py diff --git a/libpathod/app.py b/libpathod/app.py deleted file mode 100644 index ca24c62c..00000000 --- a/libpathod/app.py +++ /dev/null @@ -1,215 +0,0 @@ -import urllib, pprint -import tornado.web, tornado.template, tornado.ioloop, tornado.httpserver -import rparse, utils - - -class APILog(tornado.web.RequestHandler): - def get(self): - self.write( - dict( - d = self.application.get_log() - ) - ) - - -class APILogClear(tornado.web.RequestHandler): - def post(self): - self.application.clear_log() - self.write("OK") - - -class _Page(tornado.web.RequestHandler): - def render(self, name, **kwargs): - tornado.web.RequestHandler.render(self, name + ".html", **kwargs) - - -class Index(_Page): - name = "index" - section = "main" - def get(self): - self.render(self.name, section=self.section, spec="") - - -class Preview(_Page): - name = "preview" - section = "main" - SANITY = 1024*1024 - def get(self): - spec = self.get_argument("spec", None) - args = dict( - spec = spec, - section = self.section, - syntaxerror = None, - error = None - ) - try: - r = rparse.parse(self.application.settings, spec) - except rparse.ParseException, v: - args["syntaxerror"] = str(v) - args["marked"] = v.marked() - return self.render(self.name, **args) - if r.length() > self.SANITY: - error = "Refusing to preview a response of %s bytes. This is for your own good."%r.length() - args["error"] = error - else: - d = utils.DummyRequest() - r.serve(d) - args["output"] = d.getvalue() - self.render(self.name, **args) - - -class Help(_Page): - name = "help" - section = "help" - def get(self): - self.render(self.name, section=self.section) - - -class Log(_Page): - name = "log" - section = "log" - def get(self): - self.render(self.name, section=self.section, log=self.application.log) - - -class OneLog(_Page): - name = "onelog" - section = "log" - def get(self, lid): - l = pprint.pformat(self.application.log_by_id(int(lid))) - self.render(self.name, section=self.section, alog=l, lid=lid) - - -class ClearLog(_Page): - def post(self): - self.application.clear_logs() - self.redirect("/log") - - -class Pathod(object): - def __init__(self, spec, application, request, **settings): - self.application, self.request, self.settings = application, request, settings - try: - self.response = rparse.parse(self.settings, spec) - except rparse.ParseException, v: - self.response = rparse.InternalResponse( - 800, - "Error parsing response spec: %s\n"%v.msg + v.marked() - ) - - def _execute(self, transforms, *args, **kwargs): - d = self.response.serve(self.request) - d["request"] = dict( - path = self.request.path, - method = self.request.method, - headers = self.request.headers, - host = self.request.host, - protocol = self.request.protocol, - remote_address = self.request.connection.address, - full_url = self.request.full_url(), - query = self.request.query, - version = self.request.version, - uri = self.request.uri, - ) - self.application.add_log(d) - - -class RequestPathod(Pathod): - anchor = "/p/" - def __init__(self, application, request, **settings): - spec = urllib.unquote(request.uri)[len(self.anchor):] - Pathod.__init__(self, spec, application, request, **settings) - - -class PathodApp(tornado.web.Application): - LOGBUF = 500 - def __init__(self, **settings): - self.appsettings = settings - tornado.web.Application.__init__( - self, - [ - (r"/", Index), - (r"/log", Log), - (r"/log/clear", ClearLog), - (r"/log/([0-9]+)", OneLog), - (r"/help", Help), - (r"/preview", Preview), - (r"/api/log", APILog), - (r"/api/log/clear", APILogClear), - (r"/p/.*", RequestPathod, settings), - ], - static_path = utils.data.path("static"), - template_path = utils.data.path("templates"), - debug=True - ) - self.log = [] - self.logid = 0 - - def add_anchor(self, pattern, spec): - """ - Anchors are added to the beginning of the handlers. - """ - # We assume we have only one host... - l = self.handlers[0][1] - class FixedPathod(Pathod): - def __init__(self, application, request, **settings): - Pathod.__init__(self, spec, application, request, **settings) - FixedPathod.spec = spec - FixedPathod.pattern = pattern - l.insert(0, tornado.web.URLSpec(pattern, FixedPathod, self.appsettings)) - - def get_anchors(self): - """ - Anchors are added to the beginning of the handlers. - """ - l = self.handlers[0][1] - a = [] - for i in l: - if i.handler_class.__name__ == "FixedPathod": - a.append( - ( - i.handler_class.pattern, - i.handler_class.spec - ) - ) - return a - - def remove_anchor(self, pattern, spec): - """ - Anchors are added to the beginning of the handlers. - """ - l = self.handlers[0][1] - for i, h in enumerate(l): - if h.handler_class.__name__ == "FixedPathod": - if (h.handler_class.pattern, h.handler_class.spec) == (pattern, spec): - del l[i] - return - - def add_log(self, d): - d["id"] = self.logid - self.log.insert(0, d) - if len(self.log) > self.LOGBUF: - self.log.pop() - self.logid += 1 - - def log_by_id(self, id): - for i in self.log: - if i["id"] == id: - return i - - def clear_log(self): - self.log = [] - - def get_log(self): - return self.log - - -# begin nocover -def run(application, port, ssl_options): - http_server = tornado.httpserver.HTTPServer( - application, - ssl_options=ssl_options - ) - http_server.listen(port) - tornado.ioloop.IOLoop.instance().start() - diff --git a/libpathod/pathod.py b/libpathod/pathod.py new file mode 100644 index 00000000..ca24c62c --- /dev/null +++ b/libpathod/pathod.py @@ -0,0 +1,215 @@ +import urllib, pprint +import tornado.web, tornado.template, tornado.ioloop, tornado.httpserver +import rparse, utils + + +class APILog(tornado.web.RequestHandler): + def get(self): + self.write( + dict( + d = self.application.get_log() + ) + ) + + +class APILogClear(tornado.web.RequestHandler): + def post(self): + self.application.clear_log() + self.write("OK") + + +class _Page(tornado.web.RequestHandler): + def render(self, name, **kwargs): + tornado.web.RequestHandler.render(self, name + ".html", **kwargs) + + +class Index(_Page): + name = "index" + section = "main" + def get(self): + self.render(self.name, section=self.section, spec="") + + +class Preview(_Page): + name = "preview" + section = "main" + SANITY = 1024*1024 + def get(self): + spec = self.get_argument("spec", None) + args = dict( + spec = spec, + section = self.section, + syntaxerror = None, + error = None + ) + try: + r = rparse.parse(self.application.settings, spec) + except rparse.ParseException, v: + args["syntaxerror"] = str(v) + args["marked"] = v.marked() + return self.render(self.name, **args) + if r.length() > self.SANITY: + error = "Refusing to preview a response of %s bytes. This is for your own good."%r.length() + args["error"] = error + else: + d = utils.DummyRequest() + r.serve(d) + args["output"] = d.getvalue() + self.render(self.name, **args) + + +class Help(_Page): + name = "help" + section = "help" + def get(self): + self.render(self.name, section=self.section) + + +class Log(_Page): + name = "log" + section = "log" + def get(self): + self.render(self.name, section=self.section, log=self.application.log) + + +class OneLog(_Page): + name = "onelog" + section = "log" + def get(self, lid): + l = pprint.pformat(self.application.log_by_id(int(lid))) + self.render(self.name, section=self.section, alog=l, lid=lid) + + +class ClearLog(_Page): + def post(self): + self.application.clear_logs() + self.redirect("/log") + + +class Pathod(object): + def __init__(self, spec, application, request, **settings): + self.application, self.request, self.settings = application, request, settings + try: + self.response = rparse.parse(self.settings, spec) + except rparse.ParseException, v: + self.response = rparse.InternalResponse( + 800, + "Error parsing response spec: %s\n"%v.msg + v.marked() + ) + + def _execute(self, transforms, *args, **kwargs): + d = self.response.serve(self.request) + d["request"] = dict( + path = self.request.path, + method = self.request.method, + headers = self.request.headers, + host = self.request.host, + protocol = self.request.protocol, + remote_address = self.request.connection.address, + full_url = self.request.full_url(), + query = self.request.query, + version = self.request.version, + uri = self.request.uri, + ) + self.application.add_log(d) + + +class RequestPathod(Pathod): + anchor = "/p/" + def __init__(self, application, request, **settings): + spec = urllib.unquote(request.uri)[len(self.anchor):] + Pathod.__init__(self, spec, application, request, **settings) + + +class PathodApp(tornado.web.Application): + LOGBUF = 500 + def __init__(self, **settings): + self.appsettings = settings + tornado.web.Application.__init__( + self, + [ + (r"/", Index), + (r"/log", Log), + (r"/log/clear", ClearLog), + (r"/log/([0-9]+)", OneLog), + (r"/help", Help), + (r"/preview", Preview), + (r"/api/log", APILog), + (r"/api/log/clear", APILogClear), + (r"/p/.*", RequestPathod, settings), + ], + static_path = utils.data.path("static"), + template_path = utils.data.path("templates"), + debug=True + ) + self.log = [] + self.logid = 0 + + def add_anchor(self, pattern, spec): + """ + Anchors are added to the beginning of the handlers. + """ + # We assume we have only one host... + l = self.handlers[0][1] + class FixedPathod(Pathod): + def __init__(self, application, request, **settings): + Pathod.__init__(self, spec, application, request, **settings) + FixedPathod.spec = spec + FixedPathod.pattern = pattern + l.insert(0, tornado.web.URLSpec(pattern, FixedPathod, self.appsettings)) + + def get_anchors(self): + """ + Anchors are added to the beginning of the handlers. + """ + l = self.handlers[0][1] + a = [] + for i in l: + if i.handler_class.__name__ == "FixedPathod": + a.append( + ( + i.handler_class.pattern, + i.handler_class.spec + ) + ) + return a + + def remove_anchor(self, pattern, spec): + """ + Anchors are added to the beginning of the handlers. + """ + l = self.handlers[0][1] + for i, h in enumerate(l): + if h.handler_class.__name__ == "FixedPathod": + if (h.handler_class.pattern, h.handler_class.spec) == (pattern, spec): + del l[i] + return + + def add_log(self, d): + d["id"] = self.logid + self.log.insert(0, d) + if len(self.log) > self.LOGBUF: + self.log.pop() + self.logid += 1 + + def log_by_id(self, id): + for i in self.log: + if i["id"] == id: + return i + + def clear_log(self): + self.log = [] + + def get_log(self): + return self.log + + +# begin nocover +def run(application, port, ssl_options): + http_server = tornado.httpserver.HTTPServer( + application, + ssl_options=ssl_options + ) + http_server.listen(port) + tornado.ioloop.IOLoop.instance().start() + diff --git a/pathod b/pathod index a4e81e91..7fbf3bd7 100755 --- a/pathod +++ b/pathod @@ -1,6 +1,6 @@ #!/usr/bin/env python import argparse, sys -from libpathod import app, utils, version +from libpathod import pathod, utils, version import tornado.ioloop if __name__ == "__main__": @@ -33,7 +33,7 @@ if __name__ == "__main__": settings = dict( staticdir=args.staticdir ) - application = app.PathodApp(**settings) + application = pathod.PathodApp(**settings) for i in args.anchors: try: rex, spec = utils.parse_anchor_spec(i, settings) @@ -50,6 +50,6 @@ if __name__ == "__main__": ssl = None print "%s listening on port %s"%(version.NAMEVERSION, args.port) try: - app.run(application, args.port, ssl) + pathod.run(application, args.port, ssl) except KeyboardInterrupt: pass diff --git a/test/test_app.py b/test/test_app.py deleted file mode 100644 index db030f9a..00000000 --- a/test/test_app.py +++ /dev/null @@ -1,63 +0,0 @@ -import libpry -from libpathod import app -from tornado import httpserver - -class uApplication(libpry.AutoTree): - def test_anchors(self): - a = app.PathodApp(staticdir=None) - a.add_anchor("/foo", "200") - assert a.get_anchors() == [("/foo", "200")] - a.add_anchor("/bar", "400") - assert a.get_anchors() == [("/bar", "400"), ("/foo", "200")] - a.remove_anchor("/bar", "400") - assert a.get_anchors() == [("/foo", "200")] - a.remove_anchor("/oink", "400") - assert a.get_anchors() == [("/foo", "200")] - - def test_logs(self): - a = app.PathodApp(staticdir=None) - a.LOGBUF = 3 - a.add_log({}) - assert a.log[0]["id"] == 0 - a.add_log({}) - a.add_log({}) - assert a.log[0]["id"] == 2 - a.add_log({}) - assert len(a.log) == 3 - assert a.log[0]["id"] == 3 - assert a.log[-1]["id"] == 1 - - assert a.log_by_id(1)["id"] == 1 - assert not a.log_by_id(0) - - - -class uPages(libpry.AutoTree): - def dummy_page(self, path): - # A hideous, hideous kludge, but Tornado seems to have no more sensible - # way to do this. - a = app.PathodApp(staticdir=None) - for h in a.handlers[0][1]: - if h.regex.match(path): - klass = h.handler_class - r = httpserver.HTTPRequest("GET", path) - del r.connection - k = klass(a, r) - k._transforms = [] - return k - - def test_index(self): - page = self.dummy_page("/") - page.get() - assert "".join(page._write_buffer) - - def test_help(self): - page = self.dummy_page("/help") - page.get() - assert "".join(page._write_buffer) - - - -tests = [ - uApplication(), -] diff --git a/test/test_pathod.py b/test/test_pathod.py new file mode 100644 index 00000000..bb78e094 --- /dev/null +++ b/test/test_pathod.py @@ -0,0 +1,63 @@ +import libpry +from libpathod import pathod +from tornado import httpserver + +class uApplication(libpry.AutoTree): + def test_anchors(self): + a = pathod.PathodApp(staticdir=None) + a.add_anchor("/foo", "200") + assert a.get_anchors() == [("/foo", "200")] + a.add_anchor("/bar", "400") + assert a.get_anchors() == [("/bar", "400"), ("/foo", "200")] + a.remove_anchor("/bar", "400") + assert a.get_anchors() == [("/foo", "200")] + a.remove_anchor("/oink", "400") + assert a.get_anchors() == [("/foo", "200")] + + def test_logs(self): + a = pathod.PathodApp(staticdir=None) + a.LOGBUF = 3 + a.add_log({}) + assert a.log[0]["id"] == 0 + a.add_log({}) + a.add_log({}) + assert a.log[0]["id"] == 2 + a.add_log({}) + assert len(a.log) == 3 + assert a.log[0]["id"] == 3 + assert a.log[-1]["id"] == 1 + + assert a.log_by_id(1)["id"] == 1 + assert not a.log_by_id(0) + + + +class uPages(libpry.AutoTree): + def dummy_page(self, path): + # A hideous, hideous kludge, but Tornado seems to have no more sensible + # way to do this. + a = pathod.PathodApp(staticdir=None) + for h in a.handlers[0][1]: + if h.regex.match(path): + klass = h.handler_class + r = httpserver.HTTPRequest("GET", path) + del r.connection + k = klass(a, r) + k._transforms = [] + return k + + def test_index(self): + page = self.dummy_page("/") + page.get() + assert "".join(page._write_buffer) + + def test_help(self): + page = self.dummy_page("/help") + page.get() + assert "".join(page._write_buffer) + + + +tests = [ + uApplication(), +] -- cgit v1.2.3