From a47cb321e9a8311cf400d3cb44d8e9f255cd3139 Mon Sep 17 00:00:00 2001 From: Keir Fraser Date: Tue, 4 May 2010 09:34:23 +0100 Subject: Remus: move device handling into its own module Signed-off-by: Brendan Cully --- tools/remus/remus | 138 ++---------------------------------------------------- 1 file changed, 3 insertions(+), 135 deletions(-) (limited to 'tools/remus') diff --git a/tools/remus/remus b/tools/remus/remus index f734e7725e..f80abf4380 100644 --- a/tools/remus/remus +++ b/tools/remus/remus @@ -7,9 +7,10 @@ # TODO: fencing. import optparse, os, re, select, signal, sys, time -from xen.remus import save, vm + +from xen.remus import save, util, vm +from xen.remus.device import ReplicatedDisk, BufferedNIC from xen.xend import XendOptions -from xen.remus import netlink, qdisc, util class CfgException(Exception): pass @@ -58,139 +59,6 @@ class Cfg(object): if (len(args) > 1): self.host = args[1] -class ReplicatedDiskException(Exception): pass - -class BufferedDevice(object): - 'Base class for buffered devices' - - def postsuspend(self): - 'called after guest has suspended' - pass - - def preresume(self): - 'called before guest resumes' - pass - - def commit(self): - 'called when backup has acknowledged checkpoint reception' - pass - -class ReplicatedDisk(BufferedDevice): - """ - Send a checkpoint message to a replicated disk while the domain - is paused between epochs. - """ - FIFODIR = '/var/run/tap' - - def __init__(self, disk): - # look up disk, make sure it is tap:buffer, and set up socket - # to request commits. - self.ctlfd = None - - if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'): - raise ReplicatedDiskException('Disk is not replicated: %s' % - str(disk)) - fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_') - absfifo = os.path.join(self.FIFODIR, fifo) - absmsgfifo = absfifo + '.msg' - - self.installed = False - self.ctlfd = open(absfifo, 'w+b') - self.msgfd = open(absmsgfifo, 'r+b') - - def __del__(self): - self.uninstall() - - def uninstall(self): - if self.ctlfd: - self.ctlfd.close() - self.ctlfd = None - - def postsuspend(self): - os.write(self.ctlfd.fileno(), 'flush') - - def commit(self): - msg = os.read(self.msgfd.fileno(), 4) - if msg != 'done': - print 'Unknown message: %s' % msg - -class NetbufferException(Exception): pass - -class Netbuffer(BufferedDevice): - """ - Buffer a protected domain's network output between rounds so that - nothing is issued that a failover might not know about. - """ - # shared rtnetlink handle - rth = None - - def __init__(self, domid): - self.installed = False - - if not self.rth: - self.rth = netlink.rtnl() - - self.devname = self._startimq(domid) - dev = self.rth.getlink(self.devname) - if not dev: - raise NetbufferException('could not find device %s' % self.devname) - self.dev = dev['index'] - self.handle = qdisc.TC_H_ROOT - self.q = qdisc.QueueQdisc() - - def __del__(self): - self.uninstall() - - def postsuspend(self): - if not self.installed: - self._setup() - - self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT) - - def commit(self): - '''Called when checkpoint has been acknowledged by - the backup''' - self._sendqmsg(qdisc.TC_QUEUE_RELEASE) - - def _sendqmsg(self, action): - self.q.action = action - req = qdisc.changerequest(self.dev, self.handle, self.q) - self.rth.talk(req.pack()) - - def _setup(self): - q = self.rth.getqdisc(self.dev) - if q: - if q['kind'] == 'queue': - self.installed = True - return - if q['kind'] != 'pfifo_fast': - raise NetbufferException('there is already a queueing ' - 'discipline on %s' % self.devname) - - print 'installing buffer on %s' % self.devname - req = qdisc.addrequest(self.dev, self.handle, self.q) - self.rth.talk(req.pack()) - self.installed = True - - def uninstall(self): - if self.installed: - req = qdisc.delrequest(self.dev, self.handle) - self.rth.talk(req.pack()) - self.installed = False - - def _startimq(self, domid): - # stopgap hack to set up IMQ for an interface. Wrong in many ways. - imqebt = '/usr/lib/xen/bin/imqebt' - imqdev = 'imq0' - vid = 'vif%d.0' % domid - for mod in ['sch_queue', 'imq', 'ebt_imq']: - util.runcmd(['modprobe', mod]) - util.runcmd("ip link set %s up" % (imqdev)) - util.runcmd("%s -F FORWARD" % (imqebt)) - util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev)) - - return imqdev - class SignalException(Exception): pass def run(cfg): -- cgit v1.2.3