diff options
Diffstat (limited to 'tools/xenmgr/lib/server')
24 files changed, 3048 insertions, 0 deletions
diff --git a/tools/xenmgr/lib/server/SrvBase.py b/tools/xenmgr/lib/server/SrvBase.py new file mode 100644 index 0000000000..722b60f49d --- /dev/null +++ b/tools/xenmgr/lib/server/SrvBase.py @@ -0,0 +1,137 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +import cgi + +import os +import sys +import types +import StringIO + +from twisted.internet import defer +from twisted.internet import reactor +from twisted.web import error +from twisted.web import resource +from twisted.web import server + +from xenmgr import sxp +from xenmgr import PrettyPrint + +def uri_pathlist(p): + """Split a path into a list. + p path + return list of path elements + """ + l = [] + for x in p.split('/'): + if x == '': continue + l.append(x) + return l + +class SrvBase(resource.Resource): + """Base class for services. + """ + + def parse_form(self, req, method): + """Parse the data for a request, GET using the URL, POST using encoded data. + Posts should use enctype='multipart/form-data' in the <form> tag, + rather than 'application/x-www-form-urlencoded'. Only 'multipart/form-data' + handles file upload. + + req request + returns a cgi.FieldStorage instance + """ + env = {} + env['REQUEST_METHOD'] = method + if self.query: + env['QUERY_STRING'] = self.query + val = cgi.FieldStorage(fp=req.rfile, headers=req.headers, environ=env) + return val + + def use_sxp(self, req): + """Determine whether to send an SXP response to a request. + Uses SXP if there is no User-Agent, no Accept, or application/sxp is in Accept. + + req request + returns 1 for SXP, 0 otherwise + """ + ok = 0 + user_agent = req.getHeader('User-Agent') + accept = req.getHeader('Accept') + if (not user_agent) or (not accept) or (accept.find(sxp.mime_type) >= 0): + ok = 1 + return ok + + def get_op_method(self, op): + """Get the method for an operation. + For operation 'foo' looks for 'op_foo'. + + op operation name + returns method or None + """ + op_method_name = 'op_' + op + return getattr(self, op_method_name, None) + + def perform(self, req): + """General operation handler for posted operations. + For operation 'foo' looks for a method op_foo and calls + it with op_foo(op, req). Replies with code 500 if op_foo + is not found. + + The method must return a list when req.use_sxp is true + and an HTML string otherwise (or list). + Methods may also return a Deferred (for incomplete processing). + + req request + """ + op = req.args.get('op') + if op is None or len(op) != 1: + req.setResponseCode(404, "Invalid") + return '' + op = op[0] + op_method = self.get_op_method(op) + if op_method is None: + req.setResponseCode(501, "Not implemented") + req.setHeader("Content-Type", "text/plain") + req.write("Not implemented: " + op) + return '' + else: + val = op_method(op, req) + if isinstance(val, defer.Deferred): + val.addCallback(self._cb_perform, req, 1) + return server.NOT_DONE_YET + else: + self._cb_perform(val, req, 0) + return '' + + def _cb_perform(self, val, req, dfr): + """Callback to complete the request. + May be called from a Deferred. + """ + if isinstance(val, error.ErrorPage): + req.write(val.render(req)) + elif self.use_sxp(req): + req.setHeader("Content-Type", sxp.mime_type) + sxp.show(val, req) + else: + req.write('<html><head></head><body>') + self.print_path(req) + if isinstance(val, types.ListType): + req.write('<code><pre>') + PrettyPrint.prettyprint(val, out=req) + req.write('</pre></code>') + else: + req.write(str(val)) + req.write('</body></html>') + if dfr: + req.finish() + + def print_path(self, req): + """Print the path with hyperlinks. + """ + pathlist = [x for x in req.prepath if x != '' ] + s = "/" + req.write('<h1><a href="/">/</a>') + for x in pathlist: + s += x + "/" + req.write(' <a href="%s">%s</a>/' % (s, x)) + req.write("</h1>") diff --git a/tools/xenmgr/lib/server/SrvConsole.py b/tools/xenmgr/lib/server/SrvConsole.py new file mode 100644 index 0000000000..ea25bb6113 --- /dev/null +++ b/tools/xenmgr/lib/server/SrvConsole.py @@ -0,0 +1,42 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +from xenmgr import sxp +from xenmgr import XendConsole +from SrvDir import SrvDir + +class SrvConsole(SrvDir): + """An individual console. + """ + + def __init__(self, info): + SrvDir.__init__(self) + self.info = info + self.xc = XendConsole.instance() + + def op_disconnect(self, op, req): + val = self.xc.console_disconnect(self.info.id) + return val + + def render_POST(self, req): + return self.perform(req) + + def render_GET(self, req): + if self.use_sxp(req): + req.setHeader("Content-Type", sxp.mime_type) + sxp.show(self.info.sxpr(), out=req) + else: + req.write('<html><head></head><body>') + self.print_path(req) + #self.ls() + req.write('<p>%s</p>' % self.info) + req.write('<p><a href="%s">Connect to domain %d</a></p>' + % (self.info.uri(), self.info.dom2)) + self.form(req) + req.write('</body></html>') + return '' + + def form(self, req): + req.write('<form method="post" action="%s">' % req.prePathURL()) + if self.info.connection(): + req.write('<input type="submit" name="op" value="disconnect">') + req.write('</form>') diff --git a/tools/xenmgr/lib/server/SrvConsoleDir.py b/tools/xenmgr/lib/server/SrvConsoleDir.py new file mode 100644 index 0000000000..89b092c18d --- /dev/null +++ b/tools/xenmgr/lib/server/SrvConsoleDir.py @@ -0,0 +1,58 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +from SrvDir import SrvDir +from SrvConsole import SrvConsole +from xenmgr import XendConsole +from xenmgr import sxp + +class SrvConsoleDir(SrvDir): + """Console directory. + """ + + def __init__(self): + SrvDir.__init__(self) + self.xconsole = XendConsole.instance() + + def console(self, x): + val = None + try: + info = self.xconsole.console_get(x) + val = SrvConsole(info) + except KeyError: + pass + return val + + def get(self, x): + v = SrvDir.get(self, x) + if v is not None: + return v + v = self.console(x) + return v + + def render_GET(self, req): + if self.use_sxp(req): + req.setHeader("Content-Type", sxp.mime_type) + self.ls_console(req, 1) + else: + req.write("<html><head></head><body>") + self.print_path(req) + self.ls(req) + self.ls_console(req) + #self.form(req.wfile) + req.write("</body></html>") + return '' + + def ls_console(self, req, use_sxp=0): + url = req.prePathURL() + if not url.endswith('/'): + url += '/' + if use_sxp: + consoles = self.xconsole.console_ls() + sxp.show(consoles, out=req) + else: + consoles = self.xconsole.consoles() + consoles.sort(lambda x, y: cmp(x.id, y.id)) + req.write('<ul>') + for c in consoles: + req.write('<li><a href="%s%s"> %s</a></li>' % (url, c.id, c)) + req.write('</ul>') diff --git a/tools/xenmgr/lib/server/SrvConsoleServer.py b/tools/xenmgr/lib/server/SrvConsoleServer.py new file mode 100644 index 0000000000..88f3964811 --- /dev/null +++ b/tools/xenmgr/lib/server/SrvConsoleServer.py @@ -0,0 +1,631 @@ +########################################################### +## Xen controller daemon +## Copyright (c) 2004, K A Fraser (University of Cambridge) +## Copyright (C) 2004, Mike Wray <mike.wray@hp.com> +########################################################### + +import os +import os.path +import signal +import sys +import socket +import pwd +import re +import StringIO + +from twisted.internet import pollreactor +pollreactor.install() + +from twisted.internet import reactor +from twisted.internet import protocol +from twisted.internet import abstract +from twisted.internet import defer + +import xend.utils + +from xenmgr import sxp +from xenmgr import PrettyPrint +from xenmgr import EventServer +eserver = EventServer.instance() + +from xenmgr.server import SrvServer + +import channel +import blkif +import netif +import console +from params import * + +DEBUG = 1 + +class MgmtProtocol(protocol.DatagramProtocol): + """Handler for the management socket (unix-domain). + """ + + def __init__(self, daemon): + #protocol.DatagramProtocol.__init__(self) + self.daemon = daemon + + def write(self, data, addr): + return self.transport.write(data, addr) + + def datagramReceived(self, data, addr): + if DEBUG: print 'datagramReceived> addr=', addr, 'data=', data + io = StringIO.StringIO(data) + try: + vals = sxp.parse(io) + res = self.dispatch(vals[0]) + self.send_result(addr, res) + except SystemExit: + raise + except: + if DEBUG: + raise + else: + self.send_error(addr) + + def send_reply(self, addr, sxpr): + io = StringIO.StringIO() + sxp.show(sxpr, out=io) + io.seek(0) + self.write(io.getvalue(), addr) + + def send_result(self, addr, res): + + def fn(res, self=self, addr=addr): + self.send_reply(addr, ['ok', res]) + + if isinstance(res, defer.Deferred): + res.addCallback(fn) + else: + fn(res) + + def send_error(self, addr): + (extype, exval) = sys.exc_info()[:2] + self.send_reply(addr, ['err', + ['type', str(extype) ], + ['value', str(exval) ] ] ) + + def opname(self, name): + """Get the name of the method for an operation. + """ + return 'op_' + name.replace('.', '_') + + def operror(self, name, v): + """Default operation handler - signals an error. + """ + raise NotImplementedError('Invalid operation: ' +name) + + def dispatch(self, req): + """Dispatch a request to its handler. + """ + op_name = sxp.name(req) + op_method_name = self.opname(op_name) + op_method = getattr(self, op_method_name, self.operror) + return op_method(op_name, req) + + def op_console_create(self, name, req): + """Create a new control interface - console for a domain. + """ + print name, req + dom = sxp.child_value(req, 'domain') + if not dom: raise ValueError('Missing domain') + dom = int(dom) + console_port = sxp.child_value(req, 'console_port') + if console_port: + console_port = int(console_port) + resp = self.daemon.console_create(dom, console_port) + print name, resp + return resp + + def op_consoles(self, name, req): + """Get a list of the consoles. + """ + return self.daemon.consoles() + + def op_console_disconnect(self, name, req): + id = sxp.child_value(req, 'id') + if not id: + raise ValueError('Missing console id') + id = int(id) + console = self.daemon.get_console(id) + if not console: + raise ValueError('Invalid console id') + if console.conn: + console.conn.loseConnection() + return ['ok'] + + def op_blkifs(self, name, req): + pass + + def op_blkif_devs(self, name, req): + pass + + def op_blkif_create(self, name, req): + pass + + def op_blkif_dev_create(self, name, req): + pass + + def op_netifs(self, name, req): + pass + + def op_netif_devs(self, name, req): + pass + + def op_netif_create(self, name, req): + pass + + def op_netif_dev_create(self, name, req): + pass + +class NotifierProtocol(protocol.Protocol): + """Asynchronous handler for i/o on the notifier (event channel). + """ + + def __init__(self, channelFactory): + self.channelFactory = channelFactory + + def notificationReceived(self, idx, type): + #print 'NotifierProtocol>notificationReceived>', idx, type + channel = self.channelFactory.getChannel(idx) + if not channel: + return + #print 'NotifierProtocol>notificationReceived> channel', channel + channel.notificationReceived(type) + + def connectionLost(self, reason=None): + pass + + def doStart(self): + pass + + def doStop(self): + pass + + def startProtocol(self): + pass + + def stopProtocol(self): + pass + +class NotifierPort(abstract.FileDescriptor): + """Transport class for the event channel. + """ + + def __init__(self, daemon, notifier, proto, reactor=None): + assert isinstance(proto, NotifierProtocol) + abstract.FileDescriptor.__init__(self, reactor) + self.daemon = daemon + self.notifier = notifier + self.protocol = proto + + def startListening(self): + self._bindNotifier() + self._connectToProtocol() + + def stopListening(self): + if self.connected: + result = self.d = defer.Deferred() + else: + result = None + self.loseConnection() + return result + + def fileno(self): + return self.notifier.fileno() + + def _bindNotifier(self): + self.connected = 1 + + def _connectToProtocol(self): + self.protocol.makeConnection(self) + self.startReading() + + def loseConnection(self): + if self.connected: + self.stopReading() + self.disconnecting = 1 + reactor.callLater(0, self.connectionLost) + + def connectionLost(self, reason=None): + abstract.FileDescriptor.connectionLost(self, reason) + if hasattr(self, 'protocol'): + self.protocol.doStop() + self.connected = 0 + #self.notifier.close() # Not implemented. + os.close(self.fileno()) + del self.notifier + if hasattr(self, 'd'): + self.d.callback(None) + del self.d + + def doRead(self): + #print 'NotifierPort>doRead>', self + count = 0 + while 1: + #print 'NotifierPort>doRead>', count + notification = self.notifier.read() + if not notification: + break + (idx, type) = notification + self.protocol.notificationReceived(idx, type) + self.notifier.unmask(idx) + count += 1 + #print 'NotifierPort>doRead<' + +class EventProtocol(protocol.Protocol): + """Asynchronous handler for a connected event socket. + """ + + def __init__(self, daemon): + #protocol.Protocol.__init__(self) + self.daemon = daemon + # Event queue. + self.queue = [] + # Subscribed events. + self.events = [] + self.parser = sxp.Parser() + self.pretty = 0 + + # For debugging subscribe to everything and make output pretty. + self.subscribe(['*']) + self.pretty = 1 + + def dataReceived(self, data): + try: + self.parser.input(data) + if self.parser.ready(): + val = self.parser.get_val() + res = self.dispatch(val) + self.send_result(res) + if self.parser.at_eof(): + self.loseConnection() + except SystemExit: + raise + except: + if DEBUG: + raise + else: + self.send_error() + + def connectionLost(self, reason=None): + self.unsubscribe() + + def send_reply(self, sxpr): + io = StringIO.StringIO() + if self.pretty: + PrettyPrint.prettyprint(sxpr, out=io) + else: + sxp.show(sxpr, out=io) + print >> io + io.seek(0) + return self.transport.write(io.getvalue()) + + def send_result(self, res): + return self.send_reply(['ok', res]) + + def send_error(self): + (extype, exval) = sys.exc_info()[:2] + return self.send_reply(['err', + ['type', str(extype)], + ['value', str(exval)]]) + + def send_event(self, val): + return self.send_reply(['event', val[0], val[1]]) + + def unsubscribe(self): + for event in self.events: + eserver.unsubscribe(event, self.queue_event) + + def subscribe(self, events): + self.unsubscribe() + for event in events: + eserver.subscribe(event, self.queue_event) + self.events = events + + def queue_event(self, name, v): + # Despite the name we dont' queue the event here. + # We send it because the transport will queue it. + self.send_event([name, v]) + + def opname(self, name): + return 'op_' + name.replace('.', '_') + + def operror(self, name, req): + raise NotImplementedError('Invalid operation: ' +name) + + def dispatch(self, req): + op_name = sxp.name(req) + op_method_name = self.opname(op_name) + op_method = getattr(self, op_method_name, self.operror) + return op_method(op_name, req) + + def op_help(self, name, req): + def nameop(x): + if x.startswith('op_'): + return x[3:].replace('_', '.') + else: + return x + + l = [ nameop(k) for k in dir(self) if k.startswith('op_') ] + return l + + def op_quit(self, name, req): + self.loseConnection() + + def op_exit(self, name, req): + sys.exit(0) + + def op_pretty(self, name, req): + self.pretty = 1 + return ['ok'] + + def op_console_disconnect(self, name, req): + id = sxp.child_value(req, 'id') + if not id: + raise ValueError('Missing console id') + self.daemon.console_disconnect(id) + return ['ok'] + + def op_info(self, name, req): + val = self.daemon.consoles() + return val + + def op_sys_subscribe(self, name, v): + # (sys.subscribe event*) + # Subscribe to the events: + self.subscribe(v[1:]) + return ['ok'] + + def op_sys_inject(self, name, v): + # (sys.inject event) + event = v[1] + eserver.inject(sxp.name(event), event) + return ['ok'] + + +class EventFactory(protocol.Factory): + """Asynchronous handler for the event server socket. + """ + protocol = EventProtocol + service = None + + def __init__(self, daemon): + #protocol.Factory.__init__(self) + self.daemon = daemon + + def buildProtocol(self, addr): + proto = self.protocol(self.daemon) + proto.factory = self + return proto + +class Daemon: + """The xend daemon. + """ + def __init__(self): + self.shutdown = 0 + + def daemon_pids(self): + pids = [] + pidex = '(?P<pid>\d+)' + pythonex = '(?P<python>\S*python\S*)' + cmdex = '(?P<cmd>.*)' + procre = re.compile('^\s*' + pidex + '\s*' + pythonex + '\s*' + cmdex + '$') + xendre = re.compile('^/usr/sbin/xend\s*(start|restart)\s*.*$') + procs = os.popen('ps -e -o pid,args 2>/dev/null') + for proc in procs: + pm = procre.match(proc) + if not pm: continue + xm = xendre.match(pm.group('cmd')) + if not xm: continue + #print 'pid=', pm.group('pid'), 'cmd=', pm.group('cmd') + pids.append(int(pm.group('pid'))) + return pids + + def new_cleanup(self, kill=0): + err = 0 + pids = self.daemon_pids() + if kill: + for pid in pids: + print "Killing daemon pid=%d" % pid + os.kill(pid, signal.SIGHUP) + elif pids: + err = 1 + print "Daemon already running: ", pids + return err + + def cleanup(self, kill=False): + # No cleanup to do if PID_FILE is empty. + if not os.path.isfile(PID_FILE) or not os.path.getsize(PID_FILE): + return 0 + # Read the pid of the previous invocation and search active process list. + pid = open(PID_FILE, 'r').read() + lines = os.popen('ps ' + pid + ' 2>/dev/null').readlines() + for line in lines: + if re.search('^ *' + pid + '.+xend', line): + if not kill: + print "Daemon is already running (pid %d)" % int(pid) + return 1 + # Old daemon is still active: terminate it. + os.kill(int(pid), 1) + # Delete the stale PID_FILE. + os.remove(PID_FILE) + return 0 + + def install_child_reaper(self): + #signal.signal(signal.SIGCHLD, self.onSIGCHLD) + # Ensure that zombie children are automatically reaped. + xend.utils.autoreap() + + def onSIGCHLD(self, signum, frame): + code = 1 + while code > 0: + code = os.waitpid(-1, os.WNOHANG) + + def start(self): + if self.cleanup(kill=False): + return 1 + + # Detach from TTY. + if not DEBUG: + os.setsid() + + if self.set_user(): + return 1 + + self.install_child_reaper() + + # Fork -- parent writes PID_FILE and exits. + pid = os.fork() + if pid: + # Parent + pidfile = open(PID_FILE, 'w') + pidfile.write(str(pid)) + pidfile.close() + return 0 + # Child + logfile = self.open_logfile() + self.redirect_output(logfile) + self.run() + return 0 + + def open_logfile(self): + if not os.path.exists(CONTROL_DIR): + os.makedirs(CONTROL_DIR) + + # Open log file. Truncate it if non-empty, and request line buffering. + if os.path.isfile(LOG_FILE): + os.rename(LOG_FILE, LOG_FILE+'.old') + logfile = open(LOG_FILE, 'w+', 1) + return logfile + + def set_user(self): + # Set the UID. + try: + os.setuid(pwd.getpwnam(USER)[2]) + return 0 + except KeyError, error: + print "Error: no such user '%s'" % USER + return 1 + + def redirect_output(self, logfile): + if DEBUG: return + # Close down standard file handles + try: + os.close(0) # stdin + os.close(1) # stdout + os.close(2) # stderr + except: + pass + # Redirect output to log file. + sys.stdout = sys.stderr = logfile + + def stop(self): + return self.cleanup(kill=True) + + def run(self): + self.createFactories() + self.listenMgmt() + self.listenEvent() + self.listenNotifier() + SrvServer.create() + reactor.run() + + def createFactories(self): + self.channelF = channel.channelFactory() + self.blkifCF = blkif.BlkifControllerFactory() + self.netifCF = netif.NetifControllerFactory() + self.consoleCF = console.ConsoleControllerFactory() + + def listenMgmt(self): + protocol = MgmtProtocol(self) + s = os.path.join(CONTROL_DIR, MGMT_SOCK) + if os.path.exists(s): + os.unlink(s) + return reactor.listenUNIXDatagram(s, protocol) + + def listenEvent(self): + protocol = EventFactory(self) + return reactor.listenTCP(EVENT_PORT, protocol) + + def listenNotifier(self): + protocol = NotifierProtocol(self.channelF) + p = NotifierPort(self, self.channelF.notifier, protocol, reactor) + p.startListening() + return p + + def exit(self): + reactor.diconnectAll() + sys.exit(0) + + def blkif_create(self, dom): + """Create a block device interface controller. + + Returns Deferred + """ + d = self.blkifCF.createInstance(dom) + return d + + def blkif_dev_create(self, dom, vdev, mode, segment): + """Create a block device. + + Returns Deferred + """ + ctrl = self.blkifCF.getInstanceByDom(dom) + if not ctrl: + raise ValueError('No blkif controller: %d' % dom) + print 'blkif_dev_create>', dom, vdev, mode, segment + d = ctrl.attach_device(vdev, mode, segment) + return d + + def netif_create(self, dom): + """Create a network interface controller. + + """ + return self.netifCF.createInstance(dom) + + def netif_dev_create(self, dom, vif, vmac): + """Create a network device. + + todo + """ + ctrl = self.netifCF.getInstanceByDom(dom) + if not ctrl: + raise ValueError('No netif controller: %d' % dom) + d = ctrl.attach_device(vif, vmac) + return d + + def console_create(self, dom, console_port=None): + """Create a console for a domain. + """ + console = self.consoleCF.getInstanceByDom(dom) + if console is None: + console = self.consoleCF.createInstance(dom, console_port) + return console.sxpr() + + def consoles(self): + return [ c.sxpr() for c in self.consoleCF.getInstances() ] + + def get_console(self, id): + return self.consoleCF.getInstance(id) + + def get_domain_console(self, dom): + return self.consoleCF.getInstanceByDom(dom) + + def console_disconnect(self, id): + """Disconnect any connected console client. + """ + console = self.get_console(id) + if not console: + raise ValueError('Invalid console id') + if console.conn: + console.conn.loseConnection() + +def instance(): + global inst + try: + inst + except: + inst = Daemon() + return inst diff --git a/tools/xenmgr/lib/server/SrvDeviceDir.py b/tools/xenmgr/lib/server/SrvDeviceDir.py new file mode 100644 index 0000000000..52f428540d --- /dev/null +++ b/tools/xenmgr/lib/server/SrvDeviceDir.py @@ -0,0 +1,9 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +from SrvDir import SrvDir + +class SrvDeviceDir(SrvDir): + """Device directory. + """ + + pass diff --git a/tools/xenmgr/lib/server/SrvDir.py b/tools/xenmgr/lib/server/SrvDir.py new file mode 100644 index 0000000000..f4310e279c --- /dev/null +++ b/tools/xenmgr/lib/server/SrvDir.py @@ -0,0 +1,91 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +from twisted.web import error +from xenmgr import sxp +from SrvBase import SrvBase + +class SrvConstructor: + """Delayed constructor for sub-servers. + Does not import the sub-server class or create the object until needed. + """ + + def __init__(self, klass): + """Create a constructor. It is assumed that the class + should be imported as 'import klass from klass'. + + klass name of its class + """ + self.klass = klass + self.obj = None + + def getobj(self): + """Get the sub-server object, importing its class and instantiating it if + necessary. + """ + if not self.obj: + exec 'from %s import %s' % (self.klass, self.klass) + klassobj = eval(self.klass) + self.obj = klassobj() + return self.obj + +class SrvDir(SrvBase): + """Base class for directory servlets. + """ + isLeaf = False + + def __init__(self): + SrvBase.__init__(self) + self.table = {} + self.order = [] + + def getChild(self, x, req): + if x == '': return self + val = self.get(x) + if val is None: + return error.NoResource('Not found') + else: + return val + + def get(self, x): + val = self.table.get(x) + if val is not None: + val = val.getobj() + return val + + def add(self, x, xclass = None): + if xclass is None: + xclass = 'SrvDir' + self.table[x] = SrvConstructor(xclass) + self.order.append(x) + + def render_GET(self, req): + if self.use_sxp(req): + req.setHeader("Content-type", sxp.mime_type) + self.ls(req, 1) + else: + req.write('<html><head></head><body>') + self.print_path(req) + self.ls(req) + self.form(req) + req.write('</body></html>') + return '' + + def ls(self, req, use_sxp=0): + url = req.prePathURL() + if not url.endswith('/'): + url += '/' + if use_sxp: + req.write('(ls ') + for k in self.order: + req.write(' ' + k) + req.write(')') + else: + req.write('<ul>') + for k in self.order: + v = self.get(k) + req.write('<li><a href="%s%s">%s</a></li>' + % (url, k, k)) + req.write('</ul>') + + def form(self, req): + pass diff --git a/tools/xenmgr/lib/server/SrvDomain.py b/tools/xenmgr/lib/server/SrvDomain.py new file mode 100644 index 0000000000..0ef5676941 --- /dev/null +++ b/tools/xenmgr/lib/server/SrvDomain.py @@ -0,0 +1,202 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +from xenmgr import sxp +from xenmgr import XendDomain +from xenmgr import XendConsole +from xenmgr import PrettyPrint +from xenmgr.Args import FormFn + +from SrvDir import SrvDir + +class SrvDomain(SrvDir): + """Service managing a single domain. + """ + + def __init__(self, dom): + SrvDir.__init__(self) + self.dom = dom + self.xd = XendDomain.instance() + self.xconsole = XendConsole.instance() + + def op_start(self, op, req): + val = self.xd.domain_start(self.dom.id) + return val + + def op_stop(self, op, req): + val = self.xd.domain_stop(self.dom.id) + return val + + def op_shutdown(self, op, req): + val = self.xd.domain_shutdown(self.dom.id) + req.setResponseCode(202) + req.setHeader("Location", "%s/.." % req.prePathURL()) + return val + + def op_halt(self, op, req): + val = self.xd.domain_halt(self.dom.id) + req.setHeader("Location", "%s/.." % req.prePathURL()) + return val + + def op_save(self, op, req): + fn = FormFn(self.xd.domain_save, + [['dom', 'int'], + ['dst', 'str']]) + val = fn(req.args, {'dom': self.dom.id}) + return val + + def op_restore(self, op, req): + fn = FormFn(self.xd.domain_restore, + [['dom', 'int'], + ['src', 'str']]) + val = fn(req.args, {'dom': self.dom.id}) + return val + + def op_migrate(self, op, req): + fn = FormFn(self.xd.domain_migrate, + [['dom', 'int'], + ['destination', 'str']]) + val = fn(req.args, {'dom': self.dom.id}) + val = 0 # Some migrate id. + req.setResponseCode(202) + #req.send_header("Location", "%s/.." % self.path) # Some migrate url. + return val + + def op_pincpu(self, op, req): + fn = FormFn(self.xd.domain_migrate, + [['dom', 'int'], + ['cpu', 'int']]) + val = fn(req.args, {'dom': self.dom.id}) + return val + + def op_cpu_bvt_set(self, op, req): + fn = FormFn(self.xd.domain_cpu_bvt_set, + [['dom', 'int'], + ['mcuadv', 'int'], + ['warp', 'int'], + ['warpl', 'int'], + ['warpu', 'int']]) + val = fn(req.args, {'dom': self.dom.id}) + return val + + def op_cpu_atropos_set(self, op, req): + fn = FormFn(self.xd.domain_cpu_atropos_set, + [['dom', 'int'], + ['period', 'int'], + ['slice', 'int'], + ['latency', 'int'], + ['xtratime', 'int']]) + val = fn(req.args, {'dom': self.dom.id}) + return val + + def op_vifs(self, op, req): + return self.xd.domain_vif_ls(self.dom.id) + + def op_vif(self, op, req): + fn = FormFn(self.xd.domain_vif_get, + [['dom', 'int'], + ['vif', 'int']]) + val = fn(req.args, {'dom': self.dom.id}) + return val + + def op_vif_stats(self, op, req): + #todo + fn = FormFn(self.xd.domain_vif_stats, + [['dom', 'int'], + ['vif', 'int']]) + #val = fn(req.args, {'dom': self.dom.id}) + val = 999 + #return val + return val + + def op_vif_ip_add(self, op, req): + fn = FormFn(self.xd.domain_vif_ip_add, + [['dom', 'int'], + ['vif', 'int'], + ['ip', 'str']]) + val = fn(req.args, {'dom': self.dom.id}) + return val + + def op_vif_scheduler_set(self, op, req): + fn = FormFn(self.xd.domain_vif_scheduler_set, + [['dom', 'int'], + ['vif', 'int'], + ['bytes', 'int'], + ['usecs', 'int']]) + val = fn(req.args, {'dom': self.dom.id}) + return val + + def op_vif_scheduler_get(self, op, req): + fn = FormFn(self.xd.domain_vif_scheduler_set, + [['dom', 'int'], + ['vif', 'int']]) + val = fn(req.args, {'dom': self.dom.id}) + return val + + def op_vbds(self, op, req): + return self.xd.domain_vbd_ls(self.dom.id) + + def op_vbd(self, op, req): + fn = FormFn(self.xd.domain_vbd_get, + [['dom', 'int'], + ['vbd', 'int']]) + val = fn(req.args, {'dom': self.dom.id}) + return val + + def op_vbd_add(self, op, req): + fn = FormFn(self.xd.domain_vbd_add, + [['dom', 'int'], + ['uname', 'str'], + ['dev', 'str'], + ['mode', 'str']]) + val = fn(req.args, {'dom': self.dom.id}) + return val + + def op_vbd_remove(self, op, req): + fn = FormFn(self.xd.domain_vbd_remove, + [['dom', 'int'], + ['dev', 'str']]) + val = fn(req.args, {'dom': self.dom.id}) + return val + + def render_POST(self, req): + return self.perform(req) + + def render_GET(self, req): + op = req.args.get('op') + if op and op[0] in ['vifs', 'vif', 'vif_stats', 'vbds', 'vbd']: + return self.perform(req) + if self.use_sxp(req): + req.setHeader("Content-Type", sxp.mime_type) + sxp.show(self.dom.sxpr(), out=req) + else: + req.write('<html><head></head><body>') + self.print_path(req) + #self.ls() + req.write('<p>%s</p>' % self.dom) + if self.dom.console: + cinfo = self.dom.console + cid = cinfo.id + #todo: Local xref: need to know server prefix. + req.write('<p><a href="/xend/console/%s">Console %s</a></p>' + % (cid, cid)) + req.write('<p><a href="%s">Connect to console</a></p>' + % cinfo.uri()) + if self.dom.config: + req.write("<code><pre>") + PrettyPrint.prettyprint(self.dom.config, out=req) + req.write("</pre></code>") + req.write('<a href="%s?op=vif_stats&vif=0">vif 0 stats</a>' + % req.prePathURL()) + self.form(req) + req.write('</body></html>') + return '' + + def form(self, req): + req.write('<form method="post" action="%s">' % req.prePathURL()) + req.write('<input type="submit" name="op" value="start">') + req.write('<input type="submit" name="op" value="stop">') + req.write('<input type="submit" name="op" value="shutdown">') + req.write('<input type="submit" name="op" value="halt">') + req.write('<br><input type="submit" name="op" value="migrate">') + req.write('To: <input type="text" name="destination">') + req.write('</form>') diff --git a/tools/xenmgr/lib/server/SrvDomainDir.py b/tools/xenmgr/lib/server/SrvDomainDir.py new file mode 100644 index 0000000000..7bb2996d9b --- /dev/null +++ b/tools/xenmgr/lib/server/SrvDomainDir.py @@ -0,0 +1,130 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +from StringIO import StringIO + +from twisted.protocols import http +from twisted.web import error + +from xenmgr import sxp +from xenmgr import XendDomain + +from SrvDir import SrvDir +from SrvDomain import SrvDomain + +class SrvDomainDir(SrvDir): + """Service that manages the domain directory. + """ + + def __init__(self): + SrvDir.__init__(self) + self.xd = XendDomain.instance() + + def domain(self, x): + val = None + try: + dom = self.xd.domain_get(x) + val = SrvDomain(dom) + except KeyError: + pass + return val + + def get(self, x): + v = SrvDir.get(self, x) + if v is not None: + return v + v = self.domain(x) + return v + + def op_create(self, op, req): + ok = 0 + try: + configstring = req.args.get('config')[0] + print 'config:', configstring + pin = sxp.Parser() + pin.input(configstring) + pin.input_eof() + config = pin.get_val() + ok = 1 + except Exception, ex: + print ex + if not ok: + req.setResponseCode(http.BAD_REQUEST, "Invalid configuration") + return "Invalid configuration" + return error.ErrorPage(http.BAD_REQUEST, + "Invalid", + "Invalid configuration") + try: + deferred = self.xd.domain_create(config) + deferred.addCallback(self._cb_op_create, configstring, req) + return deferred + except Exception, ex: + raise + #return ['err', str(ex) ] + #req.setResponseCode(http.BAD_REQUEST, "Error creating domain") + #return str(ex) + #return error.ErrorPage(http.BAD_REQUEST, + # "Error creating domain", + # str(ex)) + + + def _cb_op_create(self, dominfo, configstring, req): + """Callback to handle deferred domain creation. + """ + dom = dominfo.id + domurl = "%s/%s" % (req.prePathURL(), dom) + req.setResponseCode(201, "created") + req.setHeader("Location", domurl) + if self.use_sxp(req): + return dominfo.sxpr() + else: + out = StringIO() + print >> out, ('<p> Created <a href="%s">Domain %s</a></p>' + % (domurl, dom)) + print >> out, '<p><pre>' + print >> out, configstring + print >> out, '</pre></p>' + val = out.getvalue() + out.close() + return val + + def render_POST(self, req): + return self.perform(req) + + def render_GET(self, req): + if self.use_sxp(req): + req.setHeader("Content-Type", sxp.mime_type) + self.ls_domain(req, 1) + else: + req.write("<html><head></head><body>") + self.print_path(req) + self.ls(req) + self.ls_domain(req) + self.form(req) + req.write("</body></html>") + return '' + + def ls_domain(self, req, use_sxp=0): + url = req.prePathURL() + if not url.endswith('/'): + url += '/' + if use_sxp: + domains = self.xd.domain_ls() + sxp.show(domains, out=req) + else: + domains = self.xd.domains() + domains.sort(lambda x, y: cmp(x.id, y.id)) + req.write('<ul>') + for d in domains: + req.write('<li><a href="%s%s"> Domain %s</a>' + % (url, d.id, d.id)) + req.write('name=%s' % d.name) + req.write('memory=%d'% d.memory) + req.write('</li>') + req.write('</ul>') + + def form(self, req): + req.write('<form method="post" action="%s" enctype="multipart/form-data">' + % req.prePathURL()) + req.write('<button type="submit" name="op" value="create">Create Domain</button>') + req.write('Config <input type="file" name="config"><br>') + req.write('</form>') diff --git a/tools/xenmgr/lib/server/SrvEventDir.py b/tools/xenmgr/lib/server/SrvEventDir.py new file mode 100644 index 0000000000..eda56972da --- /dev/null +++ b/tools/xenmgr/lib/server/SrvEventDir.py @@ -0,0 +1,41 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +from xenmgr import sxp +from xenmgr import EventServer +from SrvDir import SrvDir + +class SrvEventDir(SrvDir): + """Event directory. + """ + + def __init__(self): + SrvDir.__init__(self) + self.eserver = EventServer.instance() + + def op_inject(self, op, req): + eventstring = req.args.get('event') + pin = sxp.Parser() + pin.input(eventstring) + pin.input_eof() + sxpr = pin.get_val() + self.eserver.inject(sxp.name(sxpr), sxpr) + if req.use_sxp: + sxp.name(sxpr) + else: + return '<code>' + eventstring + '</code>' + + def render_POST(self, req): + return self.perform(req) + + def form(self, req): + action = req.prePathURL() + req.write('<form method="post" action="%s" enctype="multipart/form-data">' + % action) + req.write('<button type="submit" name="op" value="inject">Inject</button>') + req.write('Event <input type="text" name="event" size="40"><br>') + req.write('</form>') + req.write('<form method="post" action="%s" enctype="multipart/form-data">' + % action) + req.write('<button type="submit" name="op" value="inject">Inject</button>') + req.write('Event file<input type="file" name="event"><br>') + req.write('</form>') diff --git a/tools/xenmgr/lib/server/SrvNode.py b/tools/xenmgr/lib/server/SrvNode.py new file mode 100644 index 0000000000..3c6168e337 --- /dev/null +++ b/tools/xenmgr/lib/server/SrvNode.py @@ -0,0 +1,59 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +import os +from SrvDir import SrvDir +from xenmgr import sxp +from xenmgr import XendNode + +class SrvNode(SrvDir): + """Information about the node. + """ + + def __init__(self): + SrvDir.__init__(self) + self.xn = XendNode.instance() + + def op_shutdown(self, op, req): + val = self.xn.shutdown() + return val + + def op_reboot(self, op, req): + val = self.xn.reboot() + return val + + def op_cpu_rrobin_slice_set(self, op, req): + fn = FormFn(self.xn.cpu_rrobin_slice_set, + [['slice', 'int']]) + val = fn(req.args, {}) + return val + + def op_cpu_bvt_slice_set(self, op, req): + fn = FormFn(self.xn.cpu_bvt_slice_set, + [['slice', 'int']]) + val = fn(req.args, {}) + return val + + def render_POST(self, req): + return self.perform(req) + + def render_GET(self, req): + if self.use_sxp(req): + req.setHeader("Content-Type", sxp.mime_type) + sxp.show(['node'] + self.info(), out=req) + else: + req.write('<html><head></head><body>') + self.print_path(req) + req.write('<ul>') + for d in self.info(): + req.write('<li> %10s: %s' % (d[0], d[1])) + req.write('</ul>') + req.write('</body></html>') + return '' + + def info(self): + (sys, host, rel, ver, mch) = os.uname() + return [['system', sys], + ['host', host], + ['release', rel], + ['version', ver], + ['machine', mch]] diff --git a/tools/xenmgr/lib/server/SrvRoot.py b/tools/xenmgr/lib/server/SrvRoot.py new file mode 100644 index 0000000000..b002d2cf76 --- /dev/null +++ b/tools/xenmgr/lib/server/SrvRoot.py @@ -0,0 +1,31 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +from xenmgr import XendRoot +xroot = XendRoot.instance() +from SrvDir import SrvDir + +class SrvRoot(SrvDir): + """The root of the xend server. + """ + + """Server sub-components. Each entry is (name, class), where + 'name' is the entry name and 'class' is the name of its class. + """ + #todo Get this list from the XendRoot config. + subdirs = [ + ('node', 'SrvNode' ), + ('domain', 'SrvDomainDir' ), + ('console', 'SrvConsoleDir' ), + ('event', 'SrvEventDir' ), + ('vdisk', 'SrvVdiskDir' ), + ('device', 'SrvDeviceDir' ), + ('vnet', 'SrvVnetDir' ), + ] + + def __init__(self): + SrvDir.__init__(self) + for (name, klass) in self.subdirs: + self.add(name, klass) + for (name, klass) in self.subdirs: + self.get(name) + xroot.start() diff --git a/tools/xenmgr/lib/server/SrvServer.py b/tools/xenmgr/lib/server/SrvServer.py new file mode 100644 index 0000000000..a42219b620 --- /dev/null +++ b/tools/xenmgr/lib/server/SrvServer.py @@ -0,0 +1,53 @@ +#!/usr/bin/python2 +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +"""Example xend HTTP and console server. + + Can be accessed from a browser or from a program. + Do 'python SrvServer.py' to run the server. + Then point a web browser at http://localhost:8000/xend and follow the links. + Most are stubs, except /domain which has a list of domains and a 'create domain' + button. + + You can also access the server from a program. + Do 'python XendClient.py' to run a few test operations. + + The data served differs depending on the client (as defined by User-Agent + and Accept in the HTTP headers). If the client is a browser, data + is returned in HTML, with interactive forms. If the client is a program, + data is returned in SXP format, with no forms. + + The server serves to the world by default. To restrict it to the local host + change 'interface' in main(). + + Mike Wray <mike.wray@hp.com> +""" +# todo Support security settings etc. in the config file. +# todo Support command-line args. + +from twisted.web import server +from twisted.web import resource +from twisted.internet import reactor + +from xenmgr import XendRoot +xroot = XendRoot.instance() + +from SrvRoot import SrvRoot + +def create(port=None, interface=None): + if port is None: port = 8000 + if interface is None: interface = '' + root = resource.Resource() + xend = SrvRoot() + root.putChild('xend', xend) + site = server.Site(root) + reactor.listenTCP(port, site, interface=interface) + + +def main(port=None, interface=None): + create(port, interface) + reactor.run() + + +if __name__ == '__main__': + main() diff --git a/tools/xenmgr/lib/server/SrvVdisk.py b/tools/xenmgr/lib/server/SrvVdisk.py new file mode 100644 index 0000000000..4200b9b0e5 --- /dev/null +++ b/tools/xenmgr/lib/server/SrvVdisk.py @@ -0,0 +1,12 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +from xenmgr import XendVdisk +from SrvVdiskDir import SrvVdiskDir + +class SrvVdisk(SrvDir): + """A virtual disk. + """ + + def __init__(self): + SrvDir.__init__(self) + self.xvdisk = XendVdisk.instance() diff --git a/tools/xenmgr/lib/server/SrvVdiskDir.py b/tools/xenmgr/lib/server/SrvVdiskDir.py new file mode 100644 index 0000000000..a6a9e55782 --- /dev/null +++ b/tools/xenmgr/lib/server/SrvVdiskDir.py @@ -0,0 +1,28 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +from xenmgr import XendVdisk +from SrvDir import SrvDir + +class SrvVdiskDir(SrvDir): + """Virtual disk directory. + """ + + def __init__(self): + SrvDir.__init__(self) + self.xvdisk = XendVdisk.instance() + + def vdisk(self, x): + val = None + try: + dom = self.xvdisk.vdisk_get(x) + val = SrvVdisk(dom) + except KeyError: + pass + return val + + def get(self, x): + v = SrvDir.get(self, x) + if v is not None: + return v + v = self.vdisk(x) + return v diff --git a/tools/xenmgr/lib/server/SrvVnetDir.py b/tools/xenmgr/lib/server/SrvVnetDir.py new file mode 100644 index 0000000000..a8a814192d --- /dev/null +++ b/tools/xenmgr/lib/server/SrvVnetDir.py @@ -0,0 +1,9 @@ +# Copyright (C) 2004 Mike Wray <mike.wray@hp.com> + +from SrvDir import SrvDir + +class SrvVnetDir(SrvDir): + """Vnet directory. + """ + + pass diff --git a/tools/xenmgr/lib/server/__init__.py b/tools/xenmgr/lib/server/__init__.py new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/tools/xenmgr/lib/server/__init__.py @@ -0,0 +1 @@ + diff --git a/tools/xenmgr/lib/server/blkif.py b/tools/xenmgr/lib/server/blkif.py new file mode 100755 index 0000000000..0ef8ff0ff3 --- /dev/null +++ b/tools/xenmgr/lib/server/blkif.py @@ -0,0 +1,232 @@ +import channel +import controller +from messages import * + +class BlkifControllerFactory(controller.ControllerFactory): + """Factory for creating block device interface controllers. + Also handles the 'back-end' channel to dom0. + """ + + # todo: add support for setting dom controlling blkifs (don't assume 0). + # todo: add support for 'recovery'. + + def __init__(self): + controller.ControllerFactory.__init__(self) + + self.majorTypes = [ CMSG_BLKIF_BE ] + + self.subTypes = { + CMSG_BLKIF_BE_CREATE : self.recv_be_create, + CMSG_BLKIF_BE_CONNECT : self.recv_be_connect, + CMSG_BLKIF_BE_VBD_CREATE : self.recv_be_vbd_create, + CMSG_BLKIF_BE_VBD_GROW : self.recv_be_vbd_grow, + CMSG_BLKIF_BE_DRIVER_STATUS_CHANGED: self.recv_be_driver_status_changed, + } + self.attached = 1 + self.registerChannel() + + def createInstance(self, dom): + d = self.addDeferred() + blkif = self.getInstanceByDom(dom) + if blkif: + self.callDeferred(blkif) + else: + blkif = BlkifController(self, dom) + self.addInstance(blkif) + blkif.send_be_create() + return d + + def setControlDomain(self, dom): + if self.channel: + self.deregisterChannel() + self.attached = 0 + self.dom = dom + self.registerChannel() + # + #if xend.blkif.be_port: + # xend.blkif.recovery = True + #xend.blkif.be_port = xend.main.port_from_dom(dom) + + def recv_be_create(self, msg, req): + #print 'recv_be_create>' + val = unpackMsg('blkif_be_create_t', msg) + blkif = self.getInstanceByDom(val['domid']) + self.callDeferred(blkif) + + def recv_be_connect(self, msg, req): + #print 'recv_be_create>' + val = unpackMsg('blkif_be_connect_t', msg) + blkif = self.getInstanceByDom(val['domid']) + if blkif: + blkif.send_fe_interface_status_changed() + else: + pass + + def recv_be_vbd_create(self, msg, req): + #print 'recv_be_vbd_create>' + val = unpackMsg('blkif_be_vbd_create_t', msg) + blkif = self.getInstanceByDom(val['domid']) + if blkif: + blkif.send_be_vbd_grow(val['vdevice']) + else: + pass + + def recv_be_vbd_grow(self, msg, req): + #print 'recv_be_vbd_grow>' + val = unpackMsg('blkif_be_vbd_grow_t', msg) + # Check status? + if self.attached: + self.callDeferred(0) + else: + self.reattach_device(val['domid'], val['vdevice']) + + def reattach_device(self, dom, vdev): + blkif = self.getInstanceByDom(dom) + if blkif: + blkif.reattach_device(vdev) + attached = 1 + for blkif in self.getInstances(): + if not blkif.attached: + attached = 0 + break + self.attached = attached + if self.attached: + self.reattached() + + def reattached(self): + for blkif in self.getInstances(): + blkif.reattached() + + def recv_be_driver_status_changed(self, msg, req): + val = unpackMsg('blkif_be_driver_status_changed_t'. msg) + status = val['status'] + if status == BLKIF_DRIVER_STATUS_UP and not self.attached: + for blkif in self.getInstances(): + blkif.detach() + +class BlkDev: + """Info record for a block device. + """ + + def __init__(self, vdev, mode, segment): + self.vdev = vdev + self.mode = mode + self.device = segment['device'] + self.start_sector = segment['start_sector'] + self.nr_sectors = segment['nr_sectors'] + self.attached = 1 + + def readonly(self): + return 'w' not in self.mode + +class BlkifController(controller.Controller): + """Block device interface controller. Handles all block devices + for a domain. + """ + + def __init__(self, factory, dom): + #print 'BlkifController> dom=', dom + controller.Controller.__init__(self, factory, dom) + self.devices = {} + + self.majorTypes = [ CMSG_BLKIF_FE ] + + self.subTypes = { + CMSG_BLKIF_FE_DRIVER_STATUS_CHANGED: + self.recv_fe_driver_status_changed, + CMSG_BLKIF_FE_INTERFACE_CONNECT : + self.recv_fe_interface_connect, + } + self.attached = 1 + self.registerChannel() + #print 'BlkifController<', 'dom=', self.dom, 'idx=', self.idx + + def attach_device(self, vdev, mode, segment): + """Attach a device to the specified interface. + """ + #print 'BlkifController>attach_device>', self.dom, vdev, mode, segment + if vdev in self.devices: return -1 + dev = BlkDev(vdev, mode, segment) + self.devices[vdev] = dev + self.send_be_vbd_create(vdev) + return self.factory.addDeferred() + + def detach(self): + self.attached = 0 + for dev in self.devices.values(): + dev.attached = 0 + self.send_be_vbd_create(vdev) + + def reattach_device(self, vdev): + dev = self.devices[vdev] + dev.attached = 1 + attached = 1 + for dev in self.devices.values(): + if not dev.attached: + attached = 0 + break + self.attached = attached + return self.attached + + def reattached(self): + msg = packMsg('blkif_fe_interface_status_changed_t', + { 'handle' : 0, + 'status' : BLKIF_INTERFACE_STATUS_DISCONNECTED}) + self.writeRequest(msg) + + def recv_fe_driver_status_changed(self, msg, req): + msg = packMsg('blkif_fe_interface_status_changed_t', + { 'handle' : 0, + 'status' : BLKIF_INTERFACE_STATUS_DISCONNECTED, + 'evtchn' : 0 }) + self.writeRequest(msg) + + def recv_fe_interface_connect(self, msg, req): + val = unpackMsg('blkif_fe_interface_connect_t', msg) + self.evtchn = channel.eventChannel(0, self.dom) + msg = packMsg('blkif_be_connect_t', + { 'domid' : self.dom, + 'blkif_handle' : val['handle'], + 'evtchn' : self.evtchn['port1'], + 'shmem_frame' : val['shmem_frame'] }) + self.factory.writeRequest(msg) + pass + + #def recv_fe_interface_status_changed(self, msg, req): + # (hnd, status, chan) = unpackMsg('blkif_fe_interface_status_changed_t', msg) + # print 'recv_fe_interface_status_changed>', hnd, status, chan + # pass + + def send_fe_interface_status_changed(self): + msg = packMsg('blkif_fe_interface_status_changed_t', + { 'handle' : 0, + 'status' : BLKIF_INTERFACE_STATUS_CONNECTED, + 'evtchn' : self.evtchn['port2'] }) + self.writeRequest(msg) + + def send_be_create(self): + msg = packMsg('blkif_be_create_t', + { 'domid' : self.dom, + 'blkif_handle' : 0 }) + self.factory.writeRequest(msg) + + def send_be_vbd_create(self, vdev): + dev = self.devices[vdev] + msg = packMsg('blkif_be_vbd_create_t', + { 'domid' : self.dom, + 'blkif_handle' : 0, + 'vdevice' : dev.vdev, + 'readonly' : dev.readonly() }) + self.factory.writeRequest(msg) + + def send_be_vbd_grow(self, vdev): + dev = self.devices[vdev] + msg = packMsg('blkif_be_vbd_grow_t', + { 'domid' : self.dom, + 'blkif_handle' : 0, + 'vdevice' : dev.vdev, + 'extent.device' : dev.device, + 'extent.sector_start' : dev.start_sector, + 'extent.sector_length' : dev.nr_sectors }) + self.factory.writeRequest(msg) + diff --git a/tools/xenmgr/lib/server/channel.py b/tools/xenmgr/lib/server/channel.py new file mode 100755 index 0000000000..7678e1807f --- /dev/null +++ b/tools/xenmgr/lib/server/channel.py @@ -0,0 +1,259 @@ +import Xc; xc = Xc.new() +import xend.utils +from messages import msgTypeName + +def eventChannel(dom1, dom2): + return xc.evtchn_bind_interdomain(dom1=dom1, dom2=dom2) + +class ChannelFactory: + """Factory for creating channels. + Maintains a table of channels. + """ + + channels = {} + + def __init__(self): + self.notifier = xend.utils.notifier() + + def addChannel(self, channel): + idx = channel.idx + self.channels[idx] = channel + self.notifier.bind(idx) + # Try to wake it up + #self.notifier.unmask(idx) + #channel.notify() + + def getChannel(self, idx): + return self.channels.get(idx) + + def delChannel(self, idx): + if idx in self.channels: + del self.channels[idx] + self.notifier.unbind(idx) + + def domChannel(self, dom): + for chan in self.channels.values(): + if chan.dom == dom: + return chan + chan = Channel(self, dom) + self.addChannel(chan) + return chan + + def channelClosed(self, channel): + self.delChannel(channel.idx) + + def createPort(self, dom): + return xend.utils.port(dom) + +def channelFactory(): + global inst + try: + inst + except: + inst = ChannelFactory() + return inst + +class Channel: + """A control channel to a domain. Messages for the domain device controllers + are multiplexed over the channel (console, block devs, net devs). + """ + + def __init__(self, factory, dom): + self.factory = factory + self.dom = dom + self.port = self.factory.createPort(dom) + self.idx = self.port.local_port + self.devs = [] + self.devs_by_type = {} + self.closed = 0 + self.queue = [] + + def getIndex(self): + return self.idx + + def getLocalPort(self): + return self.port.local_port + + def getRemotePort(self): + return self.port.remote_port + + def close(self): + for d in self.devs: + d.lostChannel() + self.factory.channelClosed(self) + del self.devs + del self.devs_by_type + + def registerDevice(self, types, dev): + """Register a device controller. + + @param types message types the controller handles + @param dev device controller + """ + self.devs.append(dev) + for ty in types: + self.devs_by_type[ty] = dev + + def unregisterDevice(self, dev): + """Remove the registration for a device controller. + + @param dev device controller + """ + self.devs.remove(dev) + types = [ ty for (ty, d) in self.devs_by_type.items() + if d == dev ] + for ty in types: + del devs_by_type[ty] + + def getDevice(self, type): + """Get the device controller handling a message type. + + @param type message type + @returns controller or None + """ + return self.devs_by_type.get(type) + + def getMessageType(self, msg): + hdr = msg.get_header() + return (hdr['type'], hdr.get('subtype')) + + def __repr__(self): + return ('<Channel dom=%d ports=%d:%d>' + % (self.dom, + self.port.local_port, + self.port.remote_port)) + + def notificationReceived(self, type): + #print 'notificationReceived> type=', type, self + if self.closed: return + if type == self.factory.notifier.EXCEPTION: + print 'notificationReceived> EXCEPTION' + info = xc.evtchn_status(self.idx) + if info['status'] == 'unbound': + print 'notificationReceived> EXCEPTION closing...' + self.close() + return + work = 0 + work += self.handleRequests() + work += self.handleResponses() + work += self.handleWrites() + if work: + self.notify() + #print 'notificationReceived<', work + + def notify(self): + #print 'notify>', self + self.port.notify() + + def handleRequests(self): + #print 'handleRequests>' + work = 0 + while 1: + #print 'handleRequests>', work + msg = self.readRequest() + #print 'handleRequests> msg=', msg + if not msg: break + self.requestReceived(msg) + work += 1 + #print 'handleRequests<', work + return work + + def requestReceived(self, msg): + (ty, subty) = self.getMessageType(msg) + #print 'requestReceived>', ty, subty, self + #todo: Must respond before writing any more messages. + #todo: Should automate this (respond on write) + self.port.write_response(msg) + dev = self.getDevice(ty) + if dev: + dev.requestReceived(msg, ty, subty) + else: + print ("requestReceived> No device: Message type %s %d:%d" + % (msgTypeName(ty, subty), ty, subty)), self + + def handleResponses(self): + #print 'handleResponses>', self + work = 0 + while 1: + #print 'handleResponses>', work + msg = self.readResponse() + #print 'handleResponses> msg=', msg + if not msg: break + self.responseReceived(msg) + work += 1 + #print 'handleResponses<', work + return work + + def responseReceived(self, msg): + (ty, subty) = self.getMessageType(msg) + #print 'responseReceived>', ty, subty + dev = self.getDevice(ty) + if dev: + dev.responseReceived(msg, ty, subty) + else: + print ("responseReceived> No device: Message type %d:%d" + % (msgTypeName(ty, subty), ty, subty)), self + + def handleWrites(self): + #print 'handleWrites>', self + work = 0 + # Pull data from producers. + #print 'handleWrites> pull...' + for dev in self.devs: + work += dev.produceRequests() + # Flush the queue. + #print 'handleWrites> flush...' + while self.queue and self.port.space_to_write_request(): + msg = self.queue.pop(0) + self.port.write_request(msg) + work += 1 + #print 'handleWrites<', work + return work + + def writeRequest(self, msg, notify=1): + #print 'writeRequest>', self + if self.closed: + val = -1 + elif self.writeReady(): + self.port.write_request(msg) + if notify: self.notify() + val = 1 + else: + self.queue.append(msg) + val = 0 + #print 'writeRequest<', val + return val + + def writeResponse(self, msg): + #print 'writeResponse>', self + if self.closed: return -1 + self.port.write_response(msg) + return 1 + + def writeReady(self): + if self.closed or self.queue: return 0 + return self.port.space_to_write_request() + + def readRequest(self): + #print 'readRequest>', self + if self.closed: + #print 'readRequest> closed' + return None + if self.port.request_to_read(): + val = self.port.read_request() + else: + val = None + #print 'readRequest< ', val + return val + + def readResponse(self): + #print 'readResponse>', self + if self.closed: + #print 'readResponse> closed' + return None + if self.port.response_to_read(): + val = self.port.read_response() + else: + val = None + #print 'readResponse<', val + return val diff --git a/tools/xenmgr/lib/server/console.py b/tools/xenmgr/lib/server/console.py new file mode 100755 index 0000000000..6db905dc0b --- /dev/null +++ b/tools/xenmgr/lib/server/console.py @@ -0,0 +1,220 @@ + +from twisted.internet import reactor +from twisted.internet import protocol +from twisted.protocols import telnet + +import xend.utils + +from xenmgr import EventServer +eserver = EventServer.instance() + +import controller +from messages import * +from params import * + +"""Telnet binary option.""" +TRANSMIT_BINARY = '0' +WILL = chr(251) +IAC = chr(255) + +class ConsoleProtocol(protocol.Protocol): + """Asynchronous handler for a console TCP socket. + """ + + def __init__(self, controller, idx): + self.controller = controller + self.idx = idx + self.addr = None + self.binary = 0 + + def connectionMade(self): + peer = self.transport.getPeer() + self.addr = (peer.host, peer.port) + if self.controller.connect(self.addr, self): + self.transport.write("Cannot connect to console %d on domain %d\n" + % (self.idx, self.controller.dom)) + self.loseConnection() + return + else: + self.transport.write("Connected to console %d on domain %d\n" + % (self.idx, self.controller.dom)) + self.setTelnetTransmitBinary() + eserver.inject('xend.console.connect', + [self.idx, self.addr[0], self.addr[1]]) + + def setTelnetTransmitBinary(self): + """Send the sequence to set the telnet TRANSMIT-BINARY option. + """ + self.write(IAC + WILL + TRANSMIT_BINARY) + + def dataReceived(self, data): + if self.controller.handleInput(self, data): + self.loseConnection() + + def write(self, data): + #if not self.connected: return -1 + self.transport.write(data) + return len(data) + + def connectionLost(self, reason=None): + eserver.inject('xend.console.disconnect', + [self.idx, self.addr[0], self.addr[1]]) + self.controller.disconnect() + + def loseConnection(self): + self.transport.loseConnection() + +class ConsoleFactory(protocol.ServerFactory): + """Asynchronous handler for a console server socket. + """ + protocol = ConsoleProtocol + + def __init__(self, controller, idx): + #protocol.ServerFactory.__init__(self) + self.controller = controller + self.idx = idx + + def buildProtocol(self, addr): + proto = self.protocol(self.controller, self.idx) + proto.factory = self + return proto + +class ConsoleControllerFactory(controller.ControllerFactory): + """Factory for creating console controllers. + """ + + def createInstance(self, dom, console_port=None): + if console_port is None: + console_port = CONSOLE_PORT_BASE + dom + console = ConsoleController(self, dom, console_port) + self.addInstance(console) + eserver.inject('xend.console.create', + [console.idx, console.dom, console.console_port]) + return console + + def consoleClosed(self, console): + eserver.inject('xend.console.close', console.idx) + self.delInstance(console) + +class ConsoleController(controller.Controller): + """Console controller for a domain. + Does not poll for i/o itself, but relies on the notifier to post console + output and the connected TCP sockets to post console input. + """ + + def __init__(self, factory, dom, console_port): + #print 'ConsoleController> dom=', dom + controller.Controller.__init__(self, factory, dom) + self.majorTypes = [ CMSG_CONSOLE ] + self.status = "new" + self.addr = None + self.conn = None + self.rbuf = xend.utils.buffer() + self.wbuf = xend.utils.buffer() + self.console_port = console_port + + self.registerChannel() + self.listener = None + self.listen() + #print 'ConsoleController<', 'dom=', self.dom, 'idx=', self.idx + + def sxpr(self): + val =['console', + ['id', self.idx ], + ['domain', self.dom ], + ['local_port', self.channel.getLocalPort() ], + ['remote_port', self.channel.getRemotePort() ], + ['console_port', self.console_port ] ] + if self.addr: + val.append(['connected', self.addr[0], self.addr[1]]) + return val + + def ready(self): + return not (self.closed() or self.rbuf.empty()) + + def closed(self): + return self.status == 'closed' + + def connected(self): + return self.status == 'connected' + + def close(self): + self.status = "closed" + self.listener.stopListening() + self.deregisterChannel() + self.lostChannel() + + def listen(self): + """Listen for TCP connections to the console port.. + """ + if self.closed(): return + self.status = "listening" + if self.listener: + #self.listener.startListening() + pass + else: + f = ConsoleFactory(self, self.idx) + self.listener = reactor.listenTCP(self.console_port, f) + + def connect(self, addr, conn): + if self.closed(): return -1 + if self.connected(): return -1 + self.addr = addr + self.conn = conn + self.status = "connected" + self.handleOutput() + return 0 + + def disconnect(self): + self.addr = None + self.conn = None + self.listen() + + def requestReceived(self, msg, type, subtype): + #print '***Console', self.dom, msg.get_payload() + self.rbuf.write(msg.get_payload()) + self.handleOutput() + + def responseReceived(self, msg, type, subtype): + pass + + def produceRequests(self): + # Send as much pending console data as there is room for. + work = 0 + while not self.wbuf.empty() and self.channel.writeReady(): + msg = xend.utils.message(CMSG_CONSOLE, 0, 0) + msg.append_payload(self.wbuf.read(msg.MAX_PAYLOAD)) + work += self.channel.writeRequest(msg, notify=0) + return work + + def handleInput(self, conn, data): + """Handle some external input aimed at the console. + Called from a TCP connection (conn). + """ + if self.closed(): return -1 + if conn != self.conn: return 0 + self.wbuf.write(data) + if self.produceRequests(): + self.channel.notify() + return 0 + + def handleOutput(self): + """Handle buffered output from the console. + Sends it to the connected console (if any). + """ + if self.closed(): + #print 'Console>handleOutput> closed' + return -1 + if not self.conn: + #print 'Console>handleOutput> not connected' + return 0 + while not self.rbuf.empty(): + try: + #print 'Console>handleOutput> writing...' + bytes = self.conn.write(self.rbuf.peek()) + if bytes > 0: + self.rbuf.discard(bytes) + except socket.error, error: + pass + #print 'Console>handleOutput<' + return 0 diff --git a/tools/xenmgr/lib/server/controller.py b/tools/xenmgr/lib/server/controller.py new file mode 100755 index 0000000000..793ac85968 --- /dev/null +++ b/tools/xenmgr/lib/server/controller.py @@ -0,0 +1,133 @@ +from twisted.internet import defer + +import channel +from messages import msgTypeName + +class CtrlMsgRcvr: + """Abstract class for things that deal with a control interface to a domain. + """ + + + def __init__(self): + self.channelFactory = channel.channelFactory() + self.majorTypes = [ ] + self.subTypes = {} + self.dom = None + self.channel = None + self.idx = None + + def requestReceived(self, msg, type, subtype): + method = self.subTypes.get(subtype) + if method: + method(msg, 1) + else: + print ('requestReceived> No handler: Message type %s %d:%d' + % (msgTypeName(type, subtype), type, subtype)), self + + def responseReceived(self, msg, type, subtype): + method = self.subTypes.get(subtype) + if method: + method(msg, 0) + else: + print ('responseReceived> No handler: Message type %s %d:%d' + % (msgTypeName(type, subtype), type, subtype)), self + + def lostChannel(self): + pass + + def registerChannel(self): + self.channel = self.channelFactory.domChannel(self.dom) + #print 'registerChannel> channel=', self.channel, self + self.idx = self.channel.getIndex() + #print 'registerChannel> idx=', self.idx + if self.majorTypes: + self.channel.registerDevice(self.majorTypes, self) + + def deregisterChannel(self): + if self.channel: + self.channel.deregisterDevice(self) + del self.channel + + def produceRequests(self): + return 0 + + def writeRequest(self, msg): + if self.channel: + self.channel.writeRequest(msg) + else: + print 'CtrlMsgRcvr>writeRequest>', 'no channel!', self + + def writeResponse(self, msg): + if self.channel: + self.channel.writeResponse(msg) + else: + print 'CtrlMsgRcvr>writeResponse>', 'no channel!', self + +class ControllerFactory(CtrlMsgRcvr): + """Abstract class for factories creating controllers. + Maintains a table of instances. + """ + + def __init__(self): + CtrlMsgRcvr.__init__(self) + self.instances = {} + self.dlist = [] + self.dom = 0 + + def addInstance(self, instance): + self.instances[instance.idx] = instance + + def getInstance(self, idx): + return self.instances.get(idx) + + def getInstances(self): + return self.instances.values() + + def getInstanceByDom(self, dom): + for inst in self.instances.values(): + if inst.dom == dom: + return inst + return None + + def delInstance(self, instance): + if instance in self.instances: + del self.instances[instance.idx] + + def createInstance(self, dom): + raise NotImplementedError() + + def instanceClosed(self, instance): + self.delInstance(instance) + + def addDeferred(self): + d = defer.Deferred() + self.dlist.append(d) + return d + + def callDeferred(self, *args): + if self.dlist: + d = self.dlist.pop(0) + d.callback(*args) + + def errDeferred(self, *args): + if self.dlist: + d = self.dlist.pop(0) + d.errback(*args) + +class Controller(CtrlMsgRcvr): + """Abstract class for a device controller attached to a domain. + """ + + def __init__(self, factory, dom): + CtrlMsgRcvr.__init__(self) + self.factory = factory + self.dom = dom + self.channel = None + self.idx = None + + def close(self): + self.deregisterChannel() + self.lostChannel(self) + + def lostChannel(self): + self.factory.instanceClosed(self) diff --git a/tools/xenmgr/lib/server/cstruct.py b/tools/xenmgr/lib/server/cstruct.py new file mode 100755 index 0000000000..880931b41f --- /dev/null +++ b/tools/xenmgr/lib/server/cstruct.py @@ -0,0 +1,269 @@ +import struct + +class Struct: + + maxDepth = 10 + + base = ['x', 'B', 'H', 'I', 'L', 'Q', 'c', 'h', 'i', 'l', 'q', ] + + sizes = {'B': 1, + 'H': 2, + 'I': 4, + 'L': 4, + 'Q': 8, + 'c': 1, + 'h': 2, + 'i': 4, + 'l': 4, + 'q': 8, + 'x': 1, + } + + formats = { + 'int8' : 'B', + 'int16' : 'H', + 'int32' : 'I', + 'int64' : 'Q', + 'u8' : 'B', + 'u16' : 'H', + 'u32' : 'I', + 'u64' : 'Q' + } + + def typedef(self, name, val): + self.formats[name] = val + + def struct(self, name, *f): + self.typedef(name, StructInfo(self, f)) + + def getType(self, name): + return self.formats[name] + + def format(self, ty): + d = 0 + f = ty + while d < self.maxDepth: + d += 1 + f = self.formats[f] + if isinstance(f, StructInfo): + return f.format() + if f in self.base: + return f + return -1 + + def alignedformat(self, ty): + fmt = self.format(ty) + #print 'alignedformat> %s |%s|' %(ty, fmt) + afmt = self.align(fmt) + #print 'alignedformat< %s |%s| |%s|' % (ty, fmt, afmt) + return afmt + + def align(self, fmt): + n1 = 0 + afmt = '' + for a in fmt: + n2 = self.getSize(a) + m = n1 % n2 + if m: + d = (n2 - m) + afmt += 'x' * d + n1 += d + afmt += a + n1 += n2 + return afmt + + def fmtsize(self, fmt): + s = 0 + for f in fmt: + s += self.getSize(f) + return s + + def getSize(self, f): + return self.sizes[f] + + def pack(self, ty, data): + return self.getType(ty).pack(data) + + def unpack(self, ty, data): + return self.getType(ty).unpack(data) + + def show(self): + l = self.formats.keys() + l.sort() + for v in l: + print "%-35s %-10s %s" % (v, self.format(v), self.alignedformat(v)) + + +class StructInfo: + + def __init__(self, s, f): + self.fmt = None + self.structs = s + self.fields = f + + def alignedformat(self): + if self.afmt: return self.afmt + self.afmt = self.structs.align(self.format()) + return self.afmt + + def format(self): + if self.fmt: return self.fmt + fmt = "" + for (ty, name) in self.fields: + fmt += self.formatString(ty) + self.fmt = fmt + return fmt + + def formatString(self, ty): + if ty in self.fields: + ty = self.fields[ty] + return self.structs.format(ty) + + def pack(self, *args): + return struct.pack(self.alignedformat(), *args) + + def unpack(self, data): + return struct.unpack(self.alignedformat(), data) + +types = Struct() + +types.typedef('short' , 'h') +types.typedef('int' , 'i') +types.typedef('long' , 'l') +types.typedef('unsigned short', 'H') +types.typedef('unsigned int' , 'I') +types.typedef('unsigned long' , 'L') +types.typedef('domid_t' , 'u64') +types.typedef('blkif_vdev_t' , 'u16') +types.typedef('blkif_pdev_t' , 'u16') +types.typedef('blkif_sector_t', 'u64') + +types.struct('u8[6]', + ('u8', 'a1'), + ('u8', 'a2'), + ('u8', 'a3'), + ('u8', 'a4'), + ('u8', 'a5'), + ('u8', 'a6')) + +types.struct('blkif_fe_interface_status_changed_t', + ('unsigned int', 'handle'), + ('unsigned int', 'status'), + ('unsigned int', 'evtchn')) + +types.struct('blkif_fe_driver_status_changed_t', + ('unsigned int', 'status'), + ('unsigned int', 'nr_interfaces')) + +types.struct('blkif_fe_interface_connect_t', + ('unsigned int' , 'handle'), + ('unsigned long', 'shmem_frame')) + +types.struct('blkif_fe_interface_disconnect_t', + ('unsigned int', 'handle')) + +types.struct('blkif_extent_t', + ('blkif_pdev_t' , 'device'), + ('blkif_sector_t', 'sector_start'), + ('blkif_sector_t', 'sector_length')) + +types.struct('blkif_be_create_t', + ('domid_t' , 'domid'), + ('unsigned int', 'blkif_handle'), + ('unsigned int', 'status')) + +types.struct('blkif_be_destroy_t', + ('domid_t' , 'domid'), + ('unsigned int', 'blkif_handle'), + ('unsigned int', 'status')) + +types.struct('blkif_be_connect_t', + ('domid_t' , 'domid'), + ('unsigned int' , 'blkif_handle'), + ('unsigned int' , 'evtchn'), + ('unsigned long', 'shmem_frame'), + ('unsigned int' , 'status')) + +types.struct('blkif_be_disconnect_t', + ('domid_t' , 'domid'), + ('unsigned int', 'blkif_handle'), + ('unsigned int', 'status')) + +types.struct('blkif_be_vbd_create_t', + ('domid_t' , 'domid'), #Q + ('unsigned int', 'blkif_handle'), #I + ('blkif_vdev_t', 'vdevice'), #H + ('int' , 'readonly'), #i + ('unsigned int', 'status')) #I + +types.struct('blkif_be_vbd_destroy_t', + ('domid_t' , 'domid'), + ('unsigned int', 'blkif_handle'), + ('blkif_vdev_t', 'vdevice'), + ('unsigned int', 'status')) + +types.struct('blkif_be_vbd_grow_t', + ('domid_t' , 'domid'), #Q + ('unsigned int' , 'blkif_handle'), #I + ('blkif_vdev_t' , 'vdevice'), #H + ('blkif_extent_t', 'extent'), #HQQ + ('unsigned int' , 'status')) #I + +types.struct('blkif_be_vbd_shrink_t', + ('domid_t' , 'domid'), + ('unsigned int', 'blkif_handle'), + ('blkif_vdev_t', 'vdevice'), + ('unsigned int', 'status')) + +types.struct('blkif_be_driver_status_changed_t', + ('unsigned int', 'status'), + ('unsigned int', 'nr_interfaces')) + +types.struct('netif_fe_interface_status_changed_t', + ('unsigned int', 'handle'), + ('unsigned int', 'status'), + ('unsigned int', 'evtchn'), + ('u8[6]', 'mac')) + +types.struct('netif_fe_driver_status_changed_t', + ('unsigned int', 'status'), + ('unsigned int', 'nr_interfaces')) + +types.struct('netif_fe_interface_connect_t', + ('unsigned int', 'handle'), + ('unsigned long', 'tx_shmem_frame'), + ('unsigned long', 'rx_shmem_frame')) + +types.struct('netif_fe_interface_disconnect_t', + ('unsigned int', 'handle')) + +types.struct('netif_be_create_t', + ('domid_t' , 'domid'), + ('unsigned int', 'netif_handle'), + ('u8[6]' , 'mac'), + ('unsigned int', 'status')) + +types.struct('netif_be_destroy_t', + ('domid_t' , 'domid'), + ('unsigned int', 'netif_handle'), + ('unsigned int', 'status')) + +types.struct('netif_be_connect_t', + ('domid_t' , 'domid'), + ('unsigned int' , 'netif_handle'), + ('unsigned int' , 'evtchn'), + ('unsigned long', 'tx_shmem_frame'), + ('unsigned long', 'rx_shmem_frame'), + ('unsigned int' , 'status')) + +types.struct('netif_be_disconnect_t', + ('domid_t' , 'domid'), + ('unsigned int', 'netif_handle'), + ('unsigned int', 'status')) + +types.struct('netif_be_driver_status_changed_t', + ('unsigned int', 'status'), + ('unsigned int', 'nr_interfaces')) + +if 1 or __name__ == "__main__": + types.show() diff --git a/tools/xenmgr/lib/server/messages.py b/tools/xenmgr/lib/server/messages.py new file mode 100644 index 0000000000..b45a5004de --- /dev/null +++ b/tools/xenmgr/lib/server/messages.py @@ -0,0 +1,186 @@ +import struct + +import xend.utils + +""" All message formats. +Added to incrementally for the various message types. +See below. +""" +msg_formats = {} + +#============================================================================ +# Console message types. +#============================================================================ + +CMSG_CONSOLE = 0 + +console_formats = { 'console_data': (CMSG_CONSOLE, 0, "?") } + +msg_formats.update(console_formats) + +#============================================================================ +# Block interface message types. +#============================================================================ + +CMSG_BLKIF_BE = 1 +CMSG_BLKIF_FE = 2 + +CMSG_BLKIF_FE_INTERFACE_STATUS_CHANGED = 0 +CMSG_BLKIF_FE_DRIVER_STATUS_CHANGED = 32 +CMSG_BLKIF_FE_INTERFACE_CONNECT = 33 +CMSG_BLKIF_FE_INTERFACE_DISCONNECT = 34 + +CMSG_BLKIF_BE_CREATE = 0 +CMSG_BLKIF_BE_DESTROY = 1 +CMSG_BLKIF_BE_CONNECT = 2 +CMSG_BLKIF_BE_DISCONNECT = 3 +CMSG_BLKIF_BE_VBD_CREATE = 4 +CMSG_BLKIF_BE_VBD_DESTROY = 5 +CMSG_BLKIF_BE_VBD_GROW = 6 +CMSG_BLKIF_BE_VBD_SHRINK = 7 +CMSG_BLKIF_BE_DRIVER_STATUS_CHANGED = 32 + +BLKIF_DRIVER_STATUS_DOWN = 0 +BLKIF_DRIVER_STATUS_UP = 1 + +BLKIF_INTERFACE_STATUS_DESTROYED = 0 #/* Interface doesn't exist. */ +BLKIF_INTERFACE_STATUS_DISCONNECTED = 1 #/* Exists but is disconnected. */ +BLKIF_INTERFACE_STATUS_CONNECTED = 2 #/* Exists and is connected. */ + +BLKIF_BE_STATUS_OKAY = 0 +BLKIF_BE_STATUS_ERROR = 1 +BLKIF_BE_STATUS_INTERFACE_EXISTS = 2 +BLKIF_BE_STATUS_INTERFACE_NOT_FOUND = 3 +BLKIF_BE_STATUS_INTERFACE_CONNECTED = 4 +BLKIF_BE_STATUS_VBD_EXISTS = 5 +BLKIF_BE_STATUS_VBD_NOT_FOUND = 6 +BLKIF_BE_STATUS_OUT_OF_MEMORY = 7 +BLKIF_BE_STATUS_EXTENT_NOT_FOUND = 8 +BLKIF_BE_STATUS_MAPPING_ERROR = 9 + +blkif_formats = { + 'blkif_be_connect_t': + (CMSG_BLKIF_BE, CMSG_BLKIF_BE_CONNECT, "QIILI"), + + 'blkif_be_create_t': + (CMSG_BLKIF_BE, CMSG_BLKIF_BE_CREATE, "QII"), + + 'blkif_be_destroy_t': + (CMSG_BLKIF_BE, CMSG_BLKIF_BE_DESTROY, "QII"), + + 'blkif_be_vbd_create_t': + (CMSG_BLKIF_BE, CMSG_BLKIF_BE_VBD_CREATE, "QIHII"), + + 'blkif_be_vbd_grow_t': + (CMSG_BLKIF_BE, CMSG_BLKIF_BE_VBD_GROW , "QIHHHQQI"), + + 'blkif_fe_interface_status_changed_t': + (CMSG_BLKIF_FE, CMSG_BLKIF_FE_INTERFACE_STATUS_CHANGED, "III"), + + 'blkif_fe_driver_status_changed_t': + (CMSG_BLKIF_FE, CMSG_BLKIF_FE_DRIVER_STATUS_CHANGED, "?"), + + 'blkif_fe_interface_connect_t': + (CMSG_BLKIF_FE, CMSG_BLKIF_FE_INTERFACE_CONNECT, "IL"), +} + +msg_formats.update(blkif_formats) + +#============================================================================ +# Network interface message types. +#============================================================================ + +CMSG_NETIF_BE = 3 +CMSG_NETIF_FE = 4 + +CMSG_NETIF_FE_INTERFACE_STATUS_CHANGED = 0 +CMSG_NETIF_FE_DRIVER_STATUS_CHANGED = 32 +CMSG_NETIF_FE_INTERFACE_CONNECT = 33 +CMSG_NETIF_FE_INTERFACE_DISCONNECT = 34 + +CMSG_NETIF_BE_CREATE = 0 +CMSG_NETIF_BE_DESTROY = 1 +CMSG_NETIF_BE_CONNECT = 2 +CMSG_NETIF_BE_DISCONNECT = 3 +CMSG_NETIF_BE_DRIVER_STATUS_CHANGED = 32 + +NETIF_INTERFACE_STATUS_DESTROYED = 0 #/* Interface doesn't exist. */ +NETIF_INTERFACE_STATUS_DISCONNECTED = 1 #/* Exists but is disconnected. */ +NETIF_INTERFACE_STATUS_CONNECTED = 2 #/* Exists and is connected. */ + +NETIF_DRIVER_STATUS_DOWN = 0 +NETIF_DRIVER_STATUS_UP = 1 + +netif_formats = { + 'netif_be_connect_t': + (CMSG_NETIF_BE, CMSG_NETIF_BE_CONNECT, "QIILLI"), + + 'netif_be_create_t': + (CMSG_NETIF_BE, CMSG_NETIF_BE_CREATE, "QIBBBBBBBBI"), + + 'netif_be_destroy_t': + (CMSG_NETIF_BE, CMSG_NETIF_BE_DESTROY, "QII"), + + 'netif_be_disconnect_t': + (CMSG_NETIF_BE, CMSG_NETIF_BE_DISCONNECT, "QII"), + + 'netif_be_driver_status_changed_t': + (CMSG_NETIF_BE, CMSG_NETIF_BE_DRIVER_STATUS_CHANGED, "QII"), + + 'netif_fe_driver_status_changed_t': + (CMSG_NETIF_FE, CMSG_NETIF_FE_DRIVER_STATUS_CHANGED, "II"), + + 'netif_fe_interface_connect_t': + (CMSG_NETIF_FE, CMSG_NETIF_FE_INTERFACE_CONNECT, "ILL"), + + 'netif_fe_interface_status_changed_t': + (CMSG_NETIF_FE, CMSG_NETIF_FE_INTERFACE_STATUS_CHANGED, "IIIBBBBBBBB"), + } + +msg_formats.update(netif_formats) + +#============================================================================ + +class Msg: + pass + +def packMsg(ty, params): + print '>packMsg', ty, params + (major, minor, packing) = msg_formats[ty] + args = {} + for (k, v) in params.items(): + if k == 'mac': + for i in range(0, 6): + args['mac[%d]' % i] = v[i] + else: + args[k] = v + for (k, v) in args.items(): + print 'packMsg>', k, v, type(v) + msgid = 0 + msg = xend.utils.message(major, minor, msgid, args) + return msg + +def unpackMsg(ty, msg): + args = msg.get_payload() + mac = [0, 0, 0, 0, 0, 0] + macs = [] + for (k, v) in args.items(): + if k.startswith('mac['): + macs += k + i = int(k[4:5]) + mac[i] = v + else: + pass + if macs: + args['mac'] = mac + for k in macs: + del args[k] + print '<unpackMsg', ty, args + return args + +def msgTypeName(ty, subty): + for (name, info) in msg_formats.items(): + if info[0] == ty and info[1] == subty: + return name + return None + diff --git a/tools/xenmgr/lib/server/netif.py b/tools/xenmgr/lib/server/netif.py new file mode 100755 index 0000000000..a6b1c99b19 --- /dev/null +++ b/tools/xenmgr/lib/server/netif.py @@ -0,0 +1,205 @@ +import random + +import channel +import controller +from messages import * + +class NetifControllerFactory(controller.ControllerFactory): + """Factory for creating network interface controllers. + Also handles the 'back-end' channel to dom0. + """ + # todo: add support for setting dom controlling blkifs (don't assume 0). + # todo: add support for 'recovery'. + + def __init__(self): + controller.ControllerFactory.__init__(self) + + self.majorTypes = [ CMSG_NETIF_BE ] + + self.subTypes = { + CMSG_NETIF_BE_CREATE : self.recv_be_create, + CMSG_NETIF_BE_CONNECT: self.recv_be_connect, + CMSG_NETIF_BE_DRIVER_STATUS_CHANGED: self.recv_be_driver_status_changed, + } + self.attached = 1 + self.registerChannel() + + def createInstance(self, dom): + #print 'netif>createInstance> dom=', dom + netif = self.getInstanceByDom(dom) + if netif is None: + netif = NetifController(self, dom) + self.addInstance(netif) + return netif + + def setControlDomain(self, dom): + self.deregisterChannel() + self.attached = 0 + self.dom = dom + self.registerChannel() + # + #if xend.netif.be_port.remote_dom != 0: + # xend.netif.recovery = True + # xend.netif.be_port = xend.main.port_from_dom(dom) + # + pass + + def recv_be_create(self, msg, req): + self.callDeferred(0) + + def recv_be_connect(self, msg, req): + val = unpackMsg('netif_be_connect_t', msg) + dom = val['domid'] + vif = val['netif_handle'] + netif = self.getInstanceByDom(dom) + if netif: + netif.send_interface_connected(vif) + else: + print "recv_be_connect> unknown vif=", vif + pass + + def recv_be_driver_status_changed(self, msg, req): + val = unpackMsg('netif_be_driver_status_changed_t', msg) + status = val['status'] + if status == NETIF_DRIVER_STATUS_UP and not self.attached: + for netif in self.getInstances(): + netif.reattach_devices() + self.attached = 1 + +## pl = msg.get_payload() +## status = pl['status'] +## if status == NETIF_DRIVER_STATUS_UP: +## if xend.netif.recovery: +## print "New netif backend now UP, notifying guests:" +## for netif_key in interface.list.keys(): +## netif = interface.list[netif_key] +## netif.create() +## print " Notifying %d" % netif.dom +## msg = xend.utils.message( +## CMSG_NETIF_FE, +## CMSG_NETIF_FE_INTERFACE_STATUS_CHANGED, 0, +## { 'handle' : 0, 'status' : 1 }) +## netif.ctrlif_tx_req(xend.main.port_from_dom(netif.dom),msg) +## print "Done notifying guests" +## recovery = False + +class NetDev: + """Info record for a network device. + """ + + def __init__(self, vif, mac): + self.vif = vif + self.mac = mac + self.evtchn = None + +class NetifController(controller.Controller): + """Network interface controller. Handles all network devices for a domain. + """ + + def __init__(self, factory, dom): + #print 'NetifController> dom=', dom + controller.Controller.__init__(self, factory, dom) + self.devices = {} + + self.majorTypes = [ CMSG_NETIF_FE ] + + self.subTypes = { + CMSG_NETIF_FE_DRIVER_STATUS_CHANGED: + self.recv_fe_driver_status_changed, + CMSG_NETIF_FE_INTERFACE_CONNECT : + self.recv_fe_interface_connect, + } + self.registerChannel() + #print 'NetifController<', 'dom=', self.dom, 'idx=', self.idx + + + def randomMAC(self): + # VIFs get a random MAC address with a "special" vendor id. + # + # NB. The vendor is currently an "obsolete" one that used to belong + # to DEC (AA-00-00). Using it is probably a bit rude :-) + # + # NB2. The first bit of the first random octet is set to zero for + # all dynamic MAC addresses. This may allow us to manually specify + # MAC addresses for some VIFs with no fear of clashes. + mac = [ 0xaa, 0x00, 0x00, + random.randint(0x00, 0x7f), + random.randint(0x00, 0xff), + random.randint(0x00, 0xff) ] + return mac + + def attach_device(self, vif, vmac): + if vmac is None: + mac = self.randomMAC() + else: + mac = [ int(x, 16) for x in vmac.split(':') ] + if len(mac) != 6: raise ValueError("invalid mac") + #print "attach_device>", "vif=", vif, "mac=", mac + self.devices[vif] = NetDev(vif, mac) + d = self.factory.addDeferred() + self.send_be_create(vif) + return d + + def reattach_devices(self): + d = self.factory.addDeferred() + self.send_be_create(vif) + self.attach_fe_devices(0) + + def attach_fe_devices(self): + for dev in self.devices.values(): + msg = packMsg('netif_fe_interface_status_changed_t', + { 'handle' : dev.vif, + 'status' : NETIF_INTERFACE_STATUS_DISCONNECTED, + 'evtchn' : 0, + 'mac' : dev.mac }) + self.writeRequest(msg) + + def recv_fe_driver_status_changed(self, msg, req): + if not req: return + msg = packMsg('netif_fe_driver_status_changed_t', + { 'status' : NETIF_DRIVER_STATUS_UP, + 'nr_interfaces' : len(self.devices) }) + self.writeRequest(msg) + self.attach_fe_devices() + + def recv_fe_interface_connect(self, msg, req): + val = unpackMsg('netif_fe_interface_connect_t', msg) + dev = self.devices[val['handle']] + dev.evtchn = channel.eventChannel(0, self.dom) + msg = packMsg('netif_be_connect_t', + { 'domid' : self.dom, + 'netif_handle' : dev.vif, + 'evtchn' : dev.evtchn['port1'], + 'tx_shmem_frame' : val['tx_shmem_frame'], + 'rx_shmem_frame' : val['rx_shmem_frame'] }) + self.factory.writeRequest(msg) + + #def recv_fe_interface_status_changed(self): + # print 'recv_fe_interface_status_changed>' + # pass + + def send_interface_connected(self, vif): + dev = self.devices[vif] + msg = packMsg('netif_fe_interface_status_changed_t', + { 'handle' : dev.vif, + 'status' : NETIF_INTERFACE_STATUS_CONNECTED, + 'evtchn' : dev.evtchn['port2'], + 'mac' : dev.mac }) + self.writeRequest(msg) + + def send_be_create(self, vif): + dev = self.devices[vif] + msg = packMsg('netif_be_create_t', + { 'domid' : self.dom, + 'netif_handle' : dev.vif, + 'mac' : dev.mac }) + self.factory.writeRequest(msg) + + def send_be_destroy(self, vif): + print 'send_be_destroy>', 'dom=', self.dom, 'vif=', vif + dev = self.devices[vif] + del self.devices[vif] + msg = packMsg('netif_be_destroy_t', + { 'domid' : self.dom, + 'netif_handle' : vif }) + self.factory.writeRequest(msg) diff --git a/tools/xenmgr/lib/server/params.py b/tools/xenmgr/lib/server/params.py new file mode 100644 index 0000000000..d8f064cf0c --- /dev/null +++ b/tools/xenmgr/lib/server/params.py @@ -0,0 +1,10 @@ +# The following parameters could be placed in a configuration file. +PID_FILE = '/var/run/xend.pid' +LOG_FILE = '/var/log/xend.log' +USER = 'root' +CONTROL_DIR = '/var/run/xend' +MGMT_SOCK = 'xenmgrsock' # relative to CONTROL_DIR +EVENT_PORT = 8001 + +CONSOLE_PORT_BASE = 9600 + |