From 25f52c4c7401708e7f1b71b705b65dc84bc693a7 Mon Sep 17 00:00:00 2001 From: Keir Fraser Date: Fri, 13 Nov 2009 15:34:03 +0000 Subject: Remus: add control script to activate remus on a VM Signed-off-by: Brendan Cully --- tools/remus/Makefile | 20 +++ tools/remus/README | 4 + tools/remus/remus | 362 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 386 insertions(+) create mode 100644 tools/remus/Makefile create mode 100644 tools/remus/README create mode 100644 tools/remus/remus (limited to 'tools/remus') diff --git a/tools/remus/Makefile b/tools/remus/Makefile new file mode 100644 index 0000000000..17518a95f7 --- /dev/null +++ b/tools/remus/Makefile @@ -0,0 +1,20 @@ +XEN_ROOT=../.. +include $(XEN_ROOT)/tools/Rules.mk + +SCRIPTS = remus + +.PHONY: all +all: build + +.PHONY: build +build: + echo "Nothing to do" + +.PHONY: install +install: + $(INSTALL_DIR) $(DESTDIR)$(BINDIR) + $(INSTALL_PYTHON_PROG) $(SCRIPTS) $(DESTDIR)$(BINDIR) + +.PHONY: clean +clean: + echo "Nothing to do" diff --git a/tools/remus/README b/tools/remus/README new file mode 100644 index 0000000000..9e8140b17f --- /dev/null +++ b/tools/remus/README @@ -0,0 +1,4 @@ +Remus provides fault tolerance for virtual machines by sending continuous +checkpoints to a backup, which will activate if the target VM fails. + +See the website at http://nss.cs.ubc.ca/remus/ for details. diff --git a/tools/remus/remus b/tools/remus/remus new file mode 100644 index 0000000000..53d9d7b5af --- /dev/null +++ b/tools/remus/remus @@ -0,0 +1,362 @@ +#!/usr/bin/env python +# +# This is a save process which also buffers outgoing I/O between +# rounds, so that external viewers never see anything that hasn't +# been committed at the backup +# +# TODO: fencing. + +import optparse, os, re, select, signal, sys, time +from xen.remus import save, vm +from xen.xend import XendOptions +from xen.remus import netlink, qdisc, util + +class CfgException(Exception): pass + +class Cfg(object): + def __init__(self): + # must be set + self.domid = 0 + + self.host = 'localhost' + self.port = XendOptions.instance().get_xend_relocation_port() + self.interval = 200 + self.netbuffer = True + self.nobackup = False + self.timer = False + + parser = optparse.OptionParser() + parser.usage = '%prog [options] domain [destination]' + parser.add_option('-i', '--interval', dest='interval', type='int', + metavar='MS', + help='checkpoint every MS milliseconds') + parser.add_option('-p', '--port', dest='port', type='int', + help='send stream to port PORT', metavar='PORT') + parser.add_option('', '--no-net', dest='nonet', action='store_true', + help='run without net buffering (benchmark option)') + parser.add_option('', '--timer', dest='timer', action='store_true', + help='force pause at checkpoint interval (experimental)') + parser.add_option('', '--no-backup', dest='nobackup', + action='store_true', + help='prevent backup from starting up (benchmark ' + 'option)') + self.parser = parser + + def usage(self): + self.parser.print_help() + + def getargs(self): + opts, args = self.parser.parse_args() + + if opts.interval: + self.interval = opts.interval + if opts.port: + self.port = opts.port + if opts.nonet: + self.netbuffer = False + if opts.timer: + self.timer = True + + if not args: + raise CfgException('Missing domain') + self.domid = args[0] + if (len(args) > 1): + self.host = args[1] + +class ReplicatedDiskException(Exception): pass + +class BufferedDevice(object): + 'Base class for buffered devices' + + def postsuspend(self): + 'called after guest has suspended' + pass + + def preresume(self): + 'called before guest resumes' + pass + + def commit(self): + 'called when backup has acknowledged checkpoint reception' + pass + +class ReplicatedDisk(BufferedDevice): + """ + Send a checkpoint message to a replicated disk while the domain + is paused between epochs. + """ + FIFODIR = '/var/run/tap' + + def __init__(self, disk): + # look up disk, make sure it is tap:buffer, and set up socket + # to request commits. + self.ctlfd = None + + if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'): + raise ReplicatedDiskException('Disk is not replicated: %s' % + str(disk)) + fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_') + absfifo = os.path.join(self.FIFODIR, fifo) + absmsgfifo = absfifo + '.msg' + + self.installed = False + self.ctlfd = open(absfifo, 'w+b') + self.msgfd = open(absmsgfifo, 'r+b') + + def __del__(self): + self.uninstall() + + def setup(self): + #self.ctlfd.write('buffer') + #self.ctlfd.flush() + self.installed = True + + def uninstall(self): + if self.ctlfd: + self.ctlfd.close() + self.ctlfd = None + + def postsuspend(self): + if not self.installed: + self.setup() + + os.write(self.ctlfd.fileno(), 'flush') + + def commit(self): + msg = os.read(self.msgfd.fileno(), 4) + if msg != 'done': + print 'Unknown message: %s' % msg + +class NetbufferException(Exception): pass + +class Netbuffer(BufferedDevice): + """ + Buffer a protected domain's network output between rounds so that + nothing is issued that a failover might not know about. + """ + # shared rtnetlink handle + rth = None + + def __init__(self, domid): + self.installed = False + + if not self.rth: + self.rth = netlink.rtnl() + + self.devname = self._startimq(domid) + dev = self.rth.getlink(self.devname) + if not dev: + raise NetbufferException('could not find device %s' % self.devname) + self.dev = dev['index'] + self.handle = qdisc.TC_H_ROOT + self.q = qdisc.QueueQdisc() + + def __del__(self): + self.uninstall() + + def postsuspend(self): + if not self.installed: + self._setup() + + self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT) + + def commit(self): + '''Called when checkpoint has been acknowledged by + the backup''' + self._sendqmsg(qdisc.TC_QUEUE_RELEASE) + + def _sendqmsg(self, action): + self.q.action = action + req = qdisc.changerequest(self.dev, self.handle, self.q) + self.rth.talk(req.pack()) + + def _setup(self): + q = self.rth.getqdisc(self.dev) + if q: + if q['kind'] == 'queue': + self.installed = True + return + if q['kind'] != 'pfifo_fast': + raise NetbufferException('there is already a queueing ' + 'discipline on %s' % self.devname) + + print 'installing buffer on %s' % self.devname + req = qdisc.addrequest(self.dev, self.handle, self.q) + self.rth.talk(req.pack()) + self.installed = True + + def uninstall(self): + if self.installed: + req = qdisc.delrequest(self.dev, self.handle) + self.rth.talk(req.pack()) + self.installed = False + + def _startimq(self, domid): + # stopgap hack to set up IMQ for an interface. Wrong in many ways. + imqebt = '/usr/lib/xen/bin/imqebt' + imqdev = 'imq0' + vid = 'vif%d.0' % domid + for mod in ['sch_queue', 'imq', 'ebt_imq']: + util.runcmd(['modprobe', mod]) + util.runcmd("ip link set %s up" % (imqdev)) + util.runcmd("ebtables -F FORWARD") + util.runcmd("ebtables -A FORWARD -i %s -j imq --todev %s" % (vid, imqdev)) + + return imqdev + +class SignalException(Exception): pass + +def run(cfg): + closure = lambda: None + closure.cmd = None + + def sigexception(signo, frame): + raise SignalException(signo) + + def die(): + # I am not sure what the best way to die is. xm destroy is another option, + # or we could attempt to trigger some instant reboot. + print "dying..." + print util.runcmd(['sudo', 'ifdown', 'eth2']) + # dangling imq0 handle on vif locks up the system + for buf in bufs: + buf.uninstall() + print util.runcmd(['sudo', 'xm', 'destroy', cfg.domid]) + print util.runcmd(['sudo', 'ifup', 'eth2']) + + def getcommand(): + """Get a command to execute while running. + Commands include: + s: die prior to postsuspend hook + s2: die after postsuspend hook + r: die prior to preresume hook + r2: die after preresume hook + c: die prior to commit hook + c2: die after commit hook + """ + r, w, x = select.select([sys.stdin], [], [], 0) + if sys.stdin not in r: + return + + cmd = sys.stdin.readline().strip() + if cmd not in ('s', 's2', 'r', 'r2', 'c', 'c2'): + print "unknown command: %s" % cmd + closure.cmd = cmd + + signal.signal(signal.SIGTERM, sigexception) + + dom = vm.VM(cfg.domid) + + # set up I/O buffers + bufs = [] + + # disks must commit before network can be released + for disk in dom.disks: + try: + bufs.append(ReplicatedDisk(disk)) + except ReplicatedDiskException, e: + print e + continue + + if cfg.netbuffer: + for vif in dom.vifs: + bufs.append(Netbuffer(dom.domid)) + + fd = save.MigrationSocket((cfg.host, cfg.port)) + + def postsuspend(): + 'Begin external checkpointing after domain has paused' + if not cfg.timer: + # when not using a timer thread, sleep until now + interval + closure.starttime = time.time() + + if closure.cmd == 's': + die() + + for buf in bufs: + buf.postsuspend() + + if closure.cmd == 's2': + die() + + def preresume(): + 'Complete external checkpointing before domain resumes' + if closure.cmd == 'r': + die() + + for buf in bufs: + buf.preresume() + + if closure.cmd == 'r2': + die() + + def commit(): + 'commit network buffer' + if closure.cmd == 'c': + die() + + print >> sys.stderr, "PROF: flushed memory at %0.6f" % (time.time()) + + for buf in bufs: + buf.commit() + + if closure.cmd == 'c2': + die() + + # Since the domain is running at this point, it's a good time to + # check for control channel commands + getcommand() + + if not cfg.timer: + endtime = time.time() + elapsed = (endtime - closure.starttime) * 1000 + + if elapsed < cfg.interval: + time.sleep((cfg.interval - elapsed) / 1000.0) + + # False ends checkpointing + return True + + if cfg.timer: + interval = cfg.interval + else: + interval = 0 + + rc = 0 + + checkpointer = save.Saver(cfg.domid, fd, postsuspend, preresume, commit, + interval) + + try: + checkpointer.start() + except save.CheckpointError, e: + print e + rc = 1 + except KeyboardInterrupt: + pass + except SignalException: + print '*** signalled ***' + + for buf in bufs: + buf.uninstall() + + if cfg.nobackup: + # lame attempt to kill backup if protection is stopped deliberately. + # It would be much better to move this into the heartbeat "protocol". + print util.runcmd(['sudo', '-u', os.getlogin(), 'ssh', cfg.host, 'sudo', 'xm', 'destroy', dom.name]) + + sys.exit(rc) + +cfg = Cfg() +try: + cfg.getargs() +except CfgException, inst: + print str(inst) + cfg.usage() + sys.exit(1) + +try: + run(cfg) +except vm.VMException, inst: + print str(inst) + sys.exit(1) -- cgit v1.2.3