1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
import sxp
class ArgError(StandardError):
pass
class Args:
"""Argument encoding support for HTTP.
"""
def __init__(self, paramspec, keyspec):
self.arg_ord = []
self.arg_dict = {}
self.key_ord = []
self.key_dict = {}
for (name, type) in paramspec:
self.arg_ord.append(name)
self.arg_dict[name] = type
for (name, type) in keyspec:
self.key_ord.append(name)
self.key_dict[name] = type
def get_args(self, d, xargs=None):
args = {}
keys = {}
params = []
if xargs:
self.split_args(xargs, args, keys)
self.split_args(d, args, keys)
for a in self.arg_ord:
if a in args:
params.append(args[a])
else:
raise ArgError('Missing parameter: %s' % a)
return (params, keys)
def split_args(self, d, args, keys):
for (k, v) in d.items():
if k in self.arg_dict:
type = self.arg_dict[k]
val = self.coerce(type, v)
args[k] = val
elif k in self.key_dict:
type = self.key_dict[k]
val = self.coerce(type, v)
keys[k] = val
else:
raise ArgError('Invalid parameter: %s' % k)
def get_form_args(self, f, xargs=None):
d = {}
for (k, v) in f.items():
n = len(v)
if ((k not in self.arg_dict) and
(k not in self.key_dict)):
continue
if n == 0:
continue
elif n == 1:
d[k] = v[0]
else:
raise ArgError('Too many values for %s' % k)
return self.get_args(d, xargs=xargs)
def coerce(self, type, v):
try:
if type == 'int':
return int(v)
if type == 'str':
return str(v)
if type == 'sxpr':
return self.sxpr(v)
except ArgError:
raise
except StandardError, ex:
raise ArgError(str(ex))
def sxpr(self, v):
if instanceof(v, types.ListType):
return v
if instanceof(v, types.File) or hasattr(v, 'readline'):
return sxpr_file(v)
if instanceof(v, types.StringType):
return sxpr_file(StringIO(v))
return str(v)
def sxpr_file(self, fin):
try:
vals = sxp.parse(fin)
except:
raise ArgError('Coercion to sxpr failed')
if len(vals) == 1:
return vals[0]
else:
raise ArgError('Too many sxprs')
def call_with_args(self, fn, args, xargs=None):
(params, keys) = self.get_args(args, xargs=xargs)
fn(*params, **keys)
def call_with_form_args(self, fn, fargs, xargs=None):
(params, keys) = self.get_form_args(fargs, xargs=xargs)
fn(*params, **keys)
class ArgFn(Args):
"""Represent a remote HTTP operation as a function.
Used on the client.
"""
def __init__(self, fn, paramspec, keyspec={}):
Args.__init__(self, paramspec, keyspec)
self.fn = fn
def __call__(self, fargs, xargs=None):
return self.call_with_args(self.fn, fargs, xargs=xargs)
class FormFn(Args):
"""Represent an operation as a function over a form.
Used in the HTTP server.
"""
def __init__(self, fn, paramspec, keyspec={}):
Args.__init__(self, paramspec, keyspec)
self.fn = fn
def __call__(self, fargs, xargs=None):
return self.call_with_form_args(self.fn, fargs, xargs=xargs)
|