aboutsummaryrefslogtreecommitdiffstats
path: root/tests/rpc/frontend.py
blob: 8cbec568238a7937720b603cad3d92f373530f50 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def modules():
	return ["python_inv"]

def derive(module, parameters):
	assert module == r"python_inv"
	if parameters.keys() != {r"\width"}:
		raise ValueError("Invalid parameters")
	return "ilang", r"""
module \impl
	wire width {width:d} input 1 \i
	wire width {width:d} output 2 \o
	cell $neg $0
		parameter \A_SIGNED 1'0
		parameter \A_WIDTH 32'{width:b}
		parameter \Y_WIDTH 32'{width:b}
		connect \A \i
		connect \Y \o
	end
end
module \python_inv
	wire width {width:d} input 1 \i
	wire width {width:d} output 2 \o
	cell \impl $0
		connect \i \i
		connect \o \o
	end
end
""".format(width=parameters[r"\width"])

# ----------------------------------------------------------------------------

import json
import argparse
import sys, socket, os, subprocess
try:
	import msvcrt, win32pipe, win32file
except ImportError:
	msvcrt = win32pipe = win32file = None

def map_parameter(parameter):
	if parameter["type"] == "unsigned":
		return int(parameter["value"], 2)
	if parameter["type"] == "signed":
		width = len(parameter["value"])
		value = int(parameter["value"], 2)
		if value & (1 << (width - 1)):
			value = -((1 << width) - value)
		return value
	if parameter["type"] == "string":
		return parameter["value"]
	if parameter["type"] == "real":
		return float(parameter["value"])

def call(input_json):
	input = json.loads(input_json)
	if input["method"] == "modules":
		return json.dumps({"modules": modules()})
	if input["method"] == "derive":
		try:
			frontend, source = derive(input["module"],
				{name: map_parameter(value) for name, value in input["parameters"].items()})
			return json.dumps({"frontend": frontend, "source": source})
		except ValueError as e:
			return json.dumps({"error": str(e)})

def main():
	parser = argparse.ArgumentParser()
	modes = parser.add_subparsers(dest="mode")
	mode_stdio = modes.add_parser("stdio")
	if os.name == "posix":
		mode_path = modes.add_parser("unix-socket")
	if os.name == "nt":
		mode_path = modes.add_parser("named-pipe")
	mode_path.add_argument("path")
	args = parser.parse_args()

	if args.mode == "stdio":
		while True:
			input = sys.stdin.readline()
			if not input: break
			sys.stdout.write(call(input) + "\n")
			sys.stdout.flush()

	if args.mode == "unix-socket":
		sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
		sock.settimeout(30)
		sock.bind(args.path)
		try:
			sock.listen(1)
			ys_proc = subprocess.Popen(["../../yosys", "-ql", "unix.log", "-p", "connect_rpc -path {}; read_verilog design.v; hierarchy -top top; flatten; select -assert-count 1 t:$neg".format(args.path)])
			conn, addr = sock.accept()
			file = conn.makefile("rw")
			while True:
				input = file.readline()
				if not input: break
				file.write(call(input) + "\n")
				file.flush()
			ys_proc.wait(timeout=10)
			if ys_proc.returncode:
				raise subprocess.CalledProcessError(ys_proc.returncode, ys_proc.args)
		finally:
			ys_proc.kill()
			sock.close()
			os.unlink(args.path)

	if args.mode == "named-pipe":
		pipe = win32pipe.CreateNamedPipe(args.path, win32pipe.PIPE_ACCESS_DUPLEX,
		    win32pipe.PIPE_TYPE_BYTE|win32pipe.PIPE_READMODE_BYTE|win32pipe.PIPE_WAIT,
		    1, 4096, 4096, 0, None)
		win32pipe.ConnectNamedPipe(pipe, None)
		try:
			while True:
				input = b""
				while not input.endswith(b"\n"):
					result, data = win32file.ReadFile(pipe, 4096)
					assert result == 0
					input += data
					assert not b"\n" in input or input.endswith(b"\n")
				output = (call(input.decode("utf-8")) + "\n").encode("utf-8")
				length = len(output)
				while length > 0:
					result, done = win32file.WriteFile(pipe, output)
					assert result == 0
					length -= done
		except win32file.error as e:
			if e.args[0] == 109: # ERROR_BROKEN_PIPE
				pass
			else:
				raise

if __name__ == "__main__":
	main()