diff options
Diffstat (limited to 'tools/python/xen/xend/server/SrvDaemon.py')
-rw-r--r-- | tools/python/xen/xend/server/SrvDaemon.py | 751 |
1 files changed, 751 insertions, 0 deletions
diff --git a/tools/python/xen/xend/server/SrvDaemon.py b/tools/python/xen/xend/server/SrvDaemon.py new file mode 100644 index 0000000000..c8284dc485 --- /dev/null +++ b/tools/python/xen/xend/server/SrvDaemon.py @@ -0,0 +1,751 @@ +########################################################### +## Xen controller daemon +## Copyright (c) 2004, K A Fraser (University of Cambridge) +## Copyright (C) 2004, Mike Wray <mike.wray@hp.com> +########################################################### + +import os +import os.path +import signal +import sys +import threading +import linecache +import socket +import pwd +import re +import StringIO + +from twisted.internet import pollreactor +pollreactor.install() + +from twisted.internet import reactor +from twisted.internet import protocol +from twisted.internet import abstract +from twisted.internet import defer + +from xen.ext import xu + +from xen.xend import sxp +from xen.xend import PrettyPrint +from xen.xend import EventServer +eserver = EventServer.instance() + +from xen.xend.server import SrvServer + +import channel +import blkif +import netif +import console +import domain +from params import * + +DEBUG = 1 + +class MgmtProtocol(protocol.DatagramProtocol): + """Handler for the management socket (unix-domain). + """ + + def __init__(self, daemon): + #protocol.DatagramProtocol.__init__(self) + self.daemon = daemon + + def write(self, data, addr): + return self.transport.write(data, addr) + + def datagramReceived(self, data, addr): + if DEBUG: print 'datagramReceived> addr=', addr, 'data=', data + io = StringIO.StringIO(data) + try: + vals = sxp.parse(io) + res = self.dispatch(vals[0]) + self.send_result(addr, res) + except SystemExit: + raise + except: + if DEBUG: + raise + else: + self.send_error(addr) + + def send_reply(self, addr, sxpr): + io = StringIO.StringIO() + sxp.show(sxpr, out=io) + io.seek(0) + self.write(io.getvalue(), addr) + + def send_result(self, addr, res): + + def fn(res, self=self, addr=addr): + self.send_reply(addr, ['ok', res]) + + if isinstance(res, defer.Deferred): + res.addCallback(fn) + else: + fn(res) + + def send_error(self, addr): + (extype, exval) = sys.exc_info()[:2] + self.send_reply(addr, ['err', + ['type', str(extype) ], + ['value', str(exval) ] ] ) + + def opname(self, name): + """Get the name of the method for an operation. + """ + return 'op_' + name.replace('.', '_') + + def operror(self, name, v): + """Default operation handler - signals an error. + """ + raise NotImplementedError('Invalid operation: ' +name) + + def dispatch(self, req): + """Dispatch a request to its handler. + """ + op_name = sxp.name(req) + op_method_name = self.opname(op_name) + op_method = getattr(self, op_method_name, self.operror) + return op_method(op_name, req) + + def op_console_create(self, name, req): + """Create a new control interface - console for a domain. + """ + print name, req + dom = sxp.child_value(req, 'domain') + if not dom: raise ValueError('Missing domain') + dom = int(dom) + console_port = sxp.child_value(req, 'console_port') + if console_port: + console_port = int(console_port) + resp = self.daemon.console_create(dom, console_port) + print name, resp + return resp + + def op_consoles(self, name, req): + """Get a list of the consoles. + """ + return self.daemon.consoles() + + def op_console_disconnect(self, name, req): + id = sxp.child_value(req, 'id') + if not id: + raise ValueError('Missing console id') + id = int(id) + console = self.daemon.get_console(id) + if not console: + raise ValueError('Invalid console id') + if console.conn: + console.conn.loseConnection() + return ['ok'] + + def op_blkifs(self, name, req): + pass + + def op_blkif_devs(self, name, req): + pass + + def op_blkif_create(self, name, req): + pass + + def op_blkif_dev_create(self, name, req): + pass + + def op_netifs(self, name, req): + pass + + def op_netif_devs(self, name, req): + pass + + def op_netif_create(self, name, req): + pass + + def op_netif_dev_create(self, name, req): + pass + +class NotifierProtocol(protocol.Protocol): + """Asynchronous handler for i/o on the notifier (event channel). + """ + + def __init__(self, channelFactory): + self.channelFactory = channelFactory + + def notificationReceived(self, idx, type): + #print 'NotifierProtocol>notificationReceived>', idx, type + channel = self.channelFactory.getChannel(idx) + if not channel: + return + #print 'NotifierProtocol>notificationReceived> channel', channel + channel.notificationReceived(type) + + def connectionLost(self, reason=None): + pass + + def doStart(self): + pass + + def doStop(self): + pass + + def startProtocol(self): + pass + + def stopProtocol(self): + pass + +class NotifierPort(abstract.FileDescriptor): + """Transport class for the event channel. + """ + + def __init__(self, daemon, notifier, proto, reactor=None): + assert isinstance(proto, NotifierProtocol) + abstract.FileDescriptor.__init__(self, reactor) + self.daemon = daemon + self.notifier = notifier + self.protocol = proto + + def startListening(self): + self._bindNotifier() + self._connectToProtocol() + + def stopListening(self): + if self.connected: + result = self.d = defer.Deferred() + else: + result = None + self.loseConnection() + return result + + def fileno(self): + return self.notifier.fileno() + + def _bindNotifier(self): + self.connected = 1 + + def _connectToProtocol(self): + self.protocol.makeConnection(self) + self.startReading() + + def loseConnection(self): + if self.connected: + self.stopReading() + self.disconnecting = 1 + reactor.callLater(0, self.connectionLost) + + def connectionLost(self, reason=None): + abstract.FileDescriptor.connectionLost(self, reason) + if hasattr(self, 'protocol'): + self.protocol.doStop() + self.connected = 0 + #self.notifier.close() # Not implemented. + os.close(self.fileno()) + del self.notifier + if hasattr(self, 'd'): + self.d.callback(None) + del self.d + + def doRead(self): + #print 'NotifierPort>doRead>', self + count = 0 + while 1: + #print 'NotifierPort>doRead>', count + notification = self.notifier.read() + if not notification: + break + (idx, type) = notification + self.protocol.notificationReceived(idx, type) + self.notifier.unmask(idx) + count += 1 + #print 'NotifierPort>doRead<' + +class EventProtocol(protocol.Protocol): + """Asynchronous handler for a connected event socket. + """ + + def __init__(self, daemon): + #protocol.Protocol.__init__(self) + self.daemon = daemon + # Event queue. + self.queue = [] + # Subscribed events. + self.events = [] + self.parser = sxp.Parser() + self.pretty = 0 + + # For debugging subscribe to everything and make output pretty. + self.subscribe(['*']) + self.pretty = 1 + + def dataReceived(self, data): + try: + self.parser.input(data) + if self.parser.ready(): + val = self.parser.get_val() + res = self.dispatch(val) + self.send_result(res) + if self.parser.at_eof(): + self.loseConnection() + except SystemExit: + raise + except: + if DEBUG: + raise + else: + self.send_error() + + def loseConnection(self): + if self.transport: + self.transport.loseConnection() + if self.connected: + reactor.callLater(0, self.connectionLost) + + def connectionLost(self, reason=None): + self.unsubscribe() + + def send_reply(self, sxpr): + io = StringIO.StringIO() + if self.pretty: + PrettyPrint.prettyprint(sxpr, out=io) + else: + sxp.show(sxpr, out=io) + print >> io + io.seek(0) + return self.transport.write(io.getvalue()) + + def send_result(self, res): + return self.send_reply(['ok', res]) + + def send_error(self): + (extype, exval) = sys.exc_info()[:2] + return self.send_reply(['err', + ['type', str(extype)], + ['value', str(exval)]]) + + def send_event(self, val): + return self.send_reply(['event', val[0], val[1]]) + + def unsubscribe(self): + for event in self.events: + eserver.unsubscribe(event, self.queue_event) + + def subscribe(self, events): + self.unsubscribe() + for event in events: + eserver.subscribe(event, self.queue_event) + self.events = events + + def queue_event(self, name, v): + # Despite the name we dont' queue the event here. + # We send it because the transport will queue it. + self.send_event([name, v]) + + def opname(self, name): + return 'op_' + name.replace('.', '_') + + def operror(self, name, req): + raise NotImplementedError('Invalid operation: ' +name) + + def dispatch(self, req): + op_name = sxp.name(req) + op_method_name = self.opname(op_name) + op_method = getattr(self, op_method_name, self.operror) + return op_method(op_name, req) + + def op_help(self, name, req): + def nameop(x): + if x.startswith('op_'): + return x[3:].replace('_', '.') + else: + return x + + l = [ nameop(k) for k in dir(self) if k.startswith('op_') ] + return l + + def op_quit(self, name, req): + self.loseConnection() + + def op_exit(self, name, req): + sys.exit(0) + + def op_pretty(self, name, req): + self.pretty = 1 + return ['ok'] + + def op_console_disconnect(self, name, req): + id = sxp.child_value(req, 'id') + if not id: + raise ValueError('Missing console id') + self.daemon.console_disconnect(id) + return ['ok'] + + def op_info(self, name, req): + val = ['info'] + val += self.daemon.consoles() + val += self.daemon.blkifs() + val += self.daemon.netifs() + return val + + def op_sys_subscribe(self, name, v): + # (sys.subscribe event*) + # Subscribe to the events: + self.subscribe(v[1:]) + return ['ok'] + + def op_sys_inject(self, name, v): + # (sys.inject event) + event = v[1] + eserver.inject(sxp.name(event), event) + return ['ok'] + + +class EventFactory(protocol.Factory): + """Asynchronous handler for the event server socket. + """ + protocol = EventProtocol + service = None + + def __init__(self, daemon): + #protocol.Factory.__init__(self) + self.daemon = daemon + + def buildProtocol(self, addr): + proto = self.protocol(self.daemon) + proto.factory = self + return proto + +class VirqClient: + def __init__(self, daemon): + self.daemon = daemon + + def virqReceived(self, virq): + print 'VirqClient.virqReceived>', virq + eserver.inject('xend.virq', virq) + + def lostChannel(self, channel): + print 'VirqClient.lostChannel>', channel + +class Daemon: + """The xend daemon. + """ + def __init__(self): + self.shutdown = 0 + + def daemon_pids(self): + pids = [] + pidex = '(?P<pid>\d+)' + pythonex = '(?P<python>\S*python\S*)' + cmdex = '(?P<cmd>.*)' + procre = re.compile('^\s*' + pidex + '\s*' + pythonex + '\s*' + cmdex + '$') + xendre = re.compile('^/usr/sbin/xend\s*(start|restart)\s*.*$') + procs = os.popen('ps -e -o pid,args 2>/dev/null') + for proc in procs: + pm = procre.match(proc) + if not pm: continue + xm = xendre.match(pm.group('cmd')) + if not xm: continue + #print 'pid=', pm.group('pid'), 'cmd=', pm.group('cmd') + pids.append(int(pm.group('pid'))) + return pids + + def new_cleanup(self, kill=0): + err = 0 + pids = self.daemon_pids() + if kill: + for pid in pids: + print "Killing daemon pid=%d" % pid + os.kill(pid, signal.SIGHUP) + elif pids: + err = 1 + print "Daemon already running: ", pids + return err + + def cleanup(self, kill=False): + # No cleanup to do if PID_FILE is empty. + if not os.path.isfile(PID_FILE) or not os.path.getsize(PID_FILE): + return 0 + # Read the pid of the previous invocation and search active process list. + pid = open(PID_FILE, 'r').read() + lines = os.popen('ps ' + pid + ' 2>/dev/null').readlines() + for line in lines: + if re.search('^ *' + pid + '.+xend', line): + if not kill: + print "Daemon is already running (pid %d)" % int(pid) + return 1 + # Old daemon is still active: terminate it. + os.kill(int(pid), 1) + # Delete the stale PID_FILE. + os.remove(PID_FILE) + return 0 + + def install_child_reaper(self): + #signal.signal(signal.SIGCHLD, self.onSIGCHLD) + # Ensure that zombie children are automatically reaped. + xu.autoreap() + + def onSIGCHLD(self, signum, frame): + code = 1 + while code > 0: + code = os.waitpid(-1, os.WNOHANG) + + def start(self,trace=0): + if self.cleanup(kill=False): + return 1 + + # Detach from TTY. + if not DEBUG: + os.setsid() + + if self.set_user(): + return 1 + + self.install_child_reaper() + + # Fork -- parent writes PID_FILE and exits. + pid = os.fork() + if pid: + # Parent + pidfile = open(PID_FILE, 'w') + pidfile.write(str(pid)) + pidfile.close() + return 0 + # Child + logfile = self.open_logfile() + self.redirect_output(logfile) + if trace: + self.tracefile = open('/var/log/xend.trace', 'w+', 1) + self.traceindent = 0 + sys.settrace(self.trace) + try: + threading.settrace(self.trace) # Only in Python >= 2.3 + except: + pass + self.run() + return 0 + + def print_trace(self,str): + for i in range(self.traceindent): + self.tracefile.write(" ") + self.tracefile.write(str) + + def trace(self, frame, event, arg): + if event == 'call': + code = frame.f_code + filename = code.co_filename + m = re.search('.*xenmgr/(.*)', code.co_filename) + if not m: + return None + modulename = m.group(1) + if re.search('sxp.py', modulename): + return None + self.traceindent += 1 + self.print_trace("++++ %s:%s\n" + % (modulename, code.co_name)) + elif event == 'line': + filename = frame.f_code.co_filename + lineno = frame.f_lineno + self.print_trace("%4d %s" % + (lineno, linecache.getline(filename, lineno))) + elif event == 'return': + code = frame.f_code + filename = code.co_filename + m = re.search('.*xenmgr/(.*)', code.co_filename) + if not m: + return None + modulename = m.group(1) + self.print_trace("---- %s:%s\n" + % (modulename, code.co_name)) + self.traceindent -= 1 + elif event == 'exception': + pass + return self.trace + + def open_logfile(self): + if not os.path.exists(CONTROL_DIR): + os.makedirs(CONTROL_DIR) + + # Open log file. Truncate it if non-empty, and request line buffering. + if os.path.isfile(LOG_FILE): + os.rename(LOG_FILE, LOG_FILE+'.old') + logfile = open(LOG_FILE, 'w+', 1) + return logfile + + def set_user(self): + # Set the UID. + try: + os.setuid(pwd.getpwnam(USER)[2]) + return 0 + except KeyError, error: + print "Error: no such user '%s'" % USER + return 1 + + def redirect_output(self, logfile): + if DEBUG: return + # Close down standard file handles + try: + os.close(0) # stdin + os.close(1) # stdout + os.close(2) # stderr + except: + pass + # Redirect output to log file. + sys.stdout = sys.stderr = logfile + + def stop(self): + return self.cleanup(kill=True) + + def run(self): + self.createFactories() + self.listenMgmt() + self.listenEvent() + self.listenNotifier() + self.listenVirq() + SrvServer.create(bridge=1) + reactor.run() + + def createFactories(self): + self.channelF = channel.channelFactory() + self.domainCF = domain.DomainControllerFactory() + self.blkifCF = blkif.BlkifControllerFactory() + self.netifCF = netif.NetifControllerFactory() + self.consoleCF = console.ConsoleControllerFactory() + + def listenMgmt(self): + protocol = MgmtProtocol(self) + s = os.path.join(CONTROL_DIR, MGMT_SOCK) + if os.path.exists(s): + os.unlink(s) + return reactor.listenUNIXDatagram(s, protocol) + + def listenEvent(self): + protocol = EventFactory(self) + return reactor.listenTCP(EVENT_PORT, protocol) + + def listenNotifier(self): + protocol = NotifierProtocol(self.channelF) + p = NotifierPort(self, self.channelF.notifier, protocol, reactor) + p.startListening() + return p + + def listenVirq(self): + virqChan = self.channelF.virqChannel(channel.VIRQ_DOM_EXC) + virqChan.registerClient(VirqClient(self)) + + def exit(self): + reactor.diconnectAll() + sys.exit(0) + + def blkif_set_control_domain(self, dom, recreate=0): + """Set the block device backend control domain. + """ + return self.blkifCF.setControlDomain(dom, recreate=recreate) + + def blkif_get_control_domain(self, dom): + """Get the block device backend control domain. + """ + return self.blkifCF.getControlDomain() + + def blkif_create(self, dom, recreate=0): + """Create a block device interface controller. + + Returns Deferred + """ + d = self.blkifCF.createInstance(dom, recreate=recreate) + return d + + def blkifs(self): + return [ x.sxpr() for x in self.blkifCF.getInstances() ] + + def blkif_get(self, dom): + return self.blkifCF.getInstanceByDom(dom) + + def blkif_dev(self, dom, vdev): + return self.blkifCF.getDomainDevice(dom, vdev) + + def blkif_dev_create(self, dom, vdev, mode, segment, recreate=0): + """Create a block device. + + Returns Deferred + """ + ctrl = self.blkifCF.getInstanceByDom(dom) + if not ctrl: + raise ValueError('No blkif controller: %d' % dom) + print 'blkif_dev_create>', dom, vdev, mode, segment + d = ctrl.attachDevice(vdev, mode, segment, recreate=recreate) + return d + + def netif_set_control_domain(self, dom, recreate=0): + """Set the network interface backend control domain. + """ + return self.netifCF.setControlDomain(dom, recreate=recreate) + + def netif_get_control_domain(self, dom): + """Get the network interface backend control domain. + """ + return self.netifCF.getControlDomain() + + def netif_create(self, dom, recreate=0): + """Create a network interface controller. + + """ + return self.netifCF.createInstance(dom, recreate=recreate) + + def netifs(self): + return [ x.sxpr() for x in self.netifCF.getInstances() ] + + def netif_get(self, dom): + return self.netifCF.getInstanceByDom(dom) + + def netif_dev_create(self, dom, vif, vmac, recreate=0): + """Create a network device. + + todo + """ + ctrl = self.netifCF.getInstanceByDom(dom) + if not ctrl: + raise ValueError('No netif controller: %d' % dom) + d = ctrl.attachDevice(vif, vmac, recreate=recreate) + return d + + def netif_dev(self, dom, vif): + return self.netifCF.getDomainDevice(dom, vif) + + def console_create(self, dom, console_port=None): + """Create a console for a domain. + """ + console = self.consoleCF.getInstanceByDom(dom) + if console is None: + console = self.consoleCF.createInstance(dom, console_port) + return console.sxpr() + + def consoles(self): + return [ c.sxpr() for c in self.consoleCF.getInstances() ] + + def get_console(self, id): + return self.consoleCF.getInstance(id) + + def get_domain_console(self, dom): + return self.consoleCF.getInstanceByDom(dom) + + def console_disconnect(self, id): + """Disconnect any connected console client. + """ + console = self.get_console(id) + if not console: + raise ValueError('Invalid console id') + console.disconnect() + + def domain_shutdown(self, dom, reason): + """Shutdown a domain. + """ + ctrl = self.domainCF.getInstanceByDom(dom) + if not ctrl: + raise ValueError('No domain controller: %d' % dom) + ctrl.shutdown(reason) + return 0 + +def instance(): + global inst + try: + inst + except: + inst = Daemon() + return inst |