def modules(): return ["python_inv"] def derive(module, parameters): assert module == r"python_inv" if parameters.keys() != {r"\width"}: raise ValueError("Invalid parameters") return "ilang", r""" module \impl wire width {width:d} input 1 \i wire width {width:d} output 2 \o cell $neg $0 parameter \A_SIGNED 1'0 parameter \A_WIDTH 32'{width:b} parameter \Y_WIDTH 32'{width:b} connect \A \i connect \Y \o end end module \python_inv wire width {width:d} input 1 \i wire width {width:d} output 2 \o cell \impl $0 connect \i \i connect \o \o end end """.format(width=parameters[r"\width"]) # ---------------------------------------------------------------------------- import json import argparse import sys, socket, os try: import msvcrt, win32pipe, win32file except ImportError: msvcrt = win32pipe = win32file = None def map_parameter(parameter): if parameter["type"] == "unsigned": return int(parameter["value"], 2) if parameter["type"] == "signed": width = len(parameter["value"]) value = int(parameter["value"], 2) if value & (1 << (width - 1)): value = -((1 << width) - value) return value if parameter["type"] == "string": return parameter["value"] if parameter["type"] == "real": return float(parameter["value"]) def call(input_json): input = json.loads(input_json) if input["method"] == "modules": return json.dumps({"modules": modules()}) if input["method"] == "derive": try: frontend, source = derive(input["module"], {name: map_parameter(value) for name, value in input["parameters"].items()}) return json.dumps({"frontend": frontend, "source": source}) except ValueError as e: return json.dumps({"error": str(e)}) def main(): parser = argparse.ArgumentParser() modes = parser.add_subparsers(dest="mode") mode_stdio = modes.add_parser("stdio") if os.name == "posix": mode_path = modes.add_parser("unix-socket") if os.name == "nt": mode_path = modes.add_parser("named-pipe") mode_path.add_argument("path") args = parser.parse_args() if args.mode == "stdio": while True: input = sys.stdin.readline() if not input: break sys.stdout.write(call(input) + "\n") sys.stdout.flush() if args.mode == "unix-socket": sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(args.path) try: sock.listen(1) conn, addr = sock.accept() file = conn.makefile("rw") while True: input = file.readline() if not input: break file.write(call(input) + "\n") file.flush() finally: sock.close() os.unlink(args.path) if args.mode == "named-pipe": pipe = win32pipe.CreateNamedPipe(args.path, win32pipe.PIPE_ACCESS_DUPLEX, win32pipe.PIPE_TYPE_BYTE|win32pipe.PIPE_READMODE_BYTE|win32pipe.PIPE_WAIT, 1, 4096, 4096, 0, None) win32pipe.ConnectNamedPipe(pipe, None) try: while True: input = b"" while not input.endswith(b"\n"): result, data = win32file.ReadFile(pipe, 4096) assert result == 0 input += data assert not b"\n" in input or input.endswith(b"\n") output = (call(input.decode("utf-8")) + "\n").encode("utf-8") length = len(output) while length > 0: result, done = win32file.WriteFile(pipe, output) assert result == 0 length -= done except win32file.error as e: if e.args[0] == 109: # ERROR_BROKEN_PIPE pass else: raise if __name__ == "__main__": main() f='#n6'>6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
from __future__ import absolute_import, print_function, division
import six
import time
from email.utils import parsedate_tz, formatdate, mktime_tz
from netlib import human
from netlib import multidict
from netlib.http import cookies
from netlib.http import headers as nheaders
from netlib.http import message
from netlib.http import status_codes
from typing import AnyStr # noqa
from typing import Dict # noqa
from typing import Iterable # noqa
from typing import Tuple # noqa
from typing import Union # noqa
class ResponseData(message.MessageData):
def __init__(
self,
http_version,
status_code,
reason=None,
headers=(),
content=None,
timestamp_start=None,
timestamp_end=None
):
if isinstance(http_version, six.text_type):
http_version = http_version.encode("ascii", "strict")
if isinstance(reason, six.text_type):
reason = reason.encode("ascii", "strict")
if not isinstance(headers, nheaders.Headers):
headers = nheaders.Headers(headers)
if isinstance(content, six.text_type):
raise ValueError("Content must be bytes, not {}".format(type(content).__name__))
self.http_version = http_version
self.status_code = status_code
self.reason = reason
self.headers = headers
self.content = content
self.timestamp_start = timestamp_start
self.timestamp_end = timestamp_end
class Response(message.Message):
"""
An HTTP response.
"""
def __init__(self, *args, **kwargs):
super(Response, self).__init__()
self.data = ResponseData(*args, **kwargs)
def __repr__(self):
if self.raw_content:
details = "{}, {}".format(
self.headers.get("content-type", "unknown content type"),
human.pretty_size(len(self.raw_content))
)
else:
details = "no content"
return "Response({status_code} {reason}, {details})".format(
status_code=self.status_code,
reason=self.reason,
details=details
)
@classmethod
def make(
cls,
status_code=200, # type: int
content=b"", # type: AnyStr
headers=() # type: Union[Dict[AnyStr, AnyStr], Iterable[Tuple[bytes, bytes]]]
):
"""
Simplified API for creating response objects.
"""
resp = cls(
b"HTTP/1.1",
status_code,
status_codes.RESPONSES.get(status_code, "").encode(),
(),
None
)
# Assign this manually to update the content-length header.
if isinstance(content, bytes):
resp.content = content
elif isinstance(content, str):
resp.text = content
else:
raise TypeError("Expected content to be str or bytes, but is {}.".format(
type(content).__name__
))
# Headers can be list or dict, we differentiate here.
if isinstance(headers, dict):
resp.headers = nheaders.Headers(**headers)
elif isinstance(headers, Iterable):
resp.headers = nheaders.Headers(headers)
else:
raise TypeError("Expected headers to be an iterable or dict, but is {}.".format(
type(headers).__name__
))
return resp
@property
def status_code(self):
"""
HTTP Status Code, e.g. ``200``.
"""
return self.data.status_code
@status_code.setter
def status_code(self, status_code):
self.data.status_code = status_code
@property
def reason(self):
"""
HTTP Reason Phrase, e.g. "Not Found".
This is always :py:obj:`None` for HTTP2 requests, because HTTP2 responses do not contain a reason phrase.
"""
return message._native(self.data.reason)
@reason.setter
def reason(self, reason):
self.data.reason = message._always_bytes(reason)
@property
def cookies(self):
# type: () -> multidict.MultiDictView
"""
The response cookies. A possibly empty
:py:class:`~netlib.multidict.MultiDictView`, where the keys are cookie
name strings, and values are (value, attr) tuples. Value is a string,
and attr is an MultiDictView containing cookie attributes. Within
attrs, unary attributes (e.g. HTTPOnly) are indicated by a Null value.
Caveats:
Updating the attr
"""
return multidict.MultiDictView(
self._get_cookies,
self._set_cookies
)
def _get_cookies(self):
h = self.headers.get_all("set-cookie")
return tuple(cookies.parse_set_cookie_headers(h))
def _set_cookies(self, value):
cookie_headers = []
for k, v in value:
header = cookies.format_set_cookie_header(k, v[0], v[1])
cookie_headers.append(header)
self.headers.set_all("set-cookie", cookie_headers)
@cookies.setter
def cookies(self, value):
self._set_cookies(value)
def refresh(self, now=None):
"""
This fairly complex and heuristic function refreshes a server
response for replay.
- It adjusts date, expires and last-modified headers.
- It adjusts cookie expiration.
"""
if not now:
now = time.time()
delta = now - self.timestamp_start
refresh_headers = [
"date",
"expires",
"last-modified",
]
for i in refresh_headers:
if i in self.headers:
d = parsedate_tz(self.headers[i])
if d:
new = mktime_tz(d) + delta
self.headers[i] = formatdate(new)
c = []
for set_cookie_header in self.headers.get_all("set-cookie"):
try:
refreshed = cookies.refresh_set_cookie_header(set_cookie_header, delta)
except ValueError:
refreshed = set_cookie_header
c.append(refreshed)
if c:
self.headers.set_all("set-cookie", c)