1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
|
# Remus device interface
#
# Coordinates with devices at suspend, resume, and commit hooks
import os, re, fcntl
import netlink, qdisc, util
class ReplicatedDiskException(Exception): pass
class BufferedNICException(Exception): pass
class CheckpointedDevice(object):
'Base class for buffered devices'
def postsuspend(self):
'called after guest has suspended'
pass
def preresume(self):
'called before guest resumes'
pass
def commit(self):
'called when backup has acknowledged checkpoint reception'
pass
class ReplicatedDisk(CheckpointedDevice):
"""
Send a checkpoint message to a replicated disk while the domain
is paused between epochs.
"""
FIFODIR = '/var/run/tap'
SEND_CHECKPOINT = 20
WAIT_CHECKPOINT_ACK = 30
def __init__(self, disk):
# look up disk, make sure it is tap:buffer, and set up socket
# to request commits.
self.ctlfd = None
self.msgfd = None
self.is_drbd = False
self.ackwait = False
if disk.uname.startswith('tap:remus:') or disk.uname.startswith('tap:tapdisk:remus:'):
fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_')
absfifo = os.path.join(self.FIFODIR, fifo)
absmsgfifo = absfifo + '.msg'
self.installed = False
self.ctlfd = open(absfifo, 'w+b')
self.msgfd = open(absmsgfifo, 'r+b')
elif disk.uname.startswith('drbd:'):
#get the drbd device associated with this resource
drbdres = re.match("drbd:(.*)", disk.uname).group(1)
drbddev = util.runcmd("drbdadm sh-dev %s" % drbdres).rstrip()
#check for remus supported drbd installation
rconf = util.runcmd("drbdsetup %s show" % drbddev)
if rconf.find('protocol D;') == -1:
raise ReplicatedDiskException('Remus support for DRBD disks requires the '
'resources to operate in protocol D. Please make '
'sure that you have installed the remus supported DRBD '
'version from git://aramis.nss.cs.ubc.ca/drbd-8.3-remus '
'and enabled protocol D in the resource config')
#check if resource is in connected state
cstate = util.runcmd("drbdadm cstate %s" % drbdres).rstrip()
if cstate != 'Connected':
raise ReplicatedDiskException('DRBD resource %s is not in connected state!'
% drbdres)
#open a handle to the resource so that we could issue chkpt ioctls
self.ctlfd = open(drbddev, 'r')
self.is_drbd = True
else:
raise ReplicatedDiskException('Disk is not replicated: %s' %
str(disk))
def __del__(self):
self.uninstall()
def uninstall(self):
if self.ctlfd:
self.ctlfd.close()
self.ctlfd = None
def postsuspend(self):
if not self.is_drbd:
os.write(self.ctlfd.fileno(), 'flush')
elif not self.ackwait:
if (fcntl.ioctl(self.ctlfd.fileno(), self.SEND_CHECKPOINT, 0) > 0):
self.ackwait = False
else:
self.ackwait = True
def preresume(self):
if self.is_drbd and self.ackwait:
fcntl.ioctl(self.ctlfd.fileno(), self.WAIT_CHECKPOINT_ACK, 0)
self.ackwait = False
def commit(self):
if not self.is_drbd:
msg = os.read(self.msgfd.fileno(), 4)
if msg != 'done':
print 'Unknown message: %s' % msg
### Network
# shared rtnl handle
_rth = None
def getrth():
global _rth
if not _rth:
_rth = netlink.rtnl()
return _rth
class Netbuf(object):
"Proxy for netdev with a queueing discipline"
@staticmethod
def devclass():
"returns the name of this device class"
return 'unknown'
@classmethod
def available(cls):
"returns True if this module can proxy the device"
return cls._hasdev(cls.devclass())
def __init__(self, devname):
self.devname = devname
self.vif = None
# override in subclasses
def install(self, vif):
"set up proxy on device"
raise BufferedNICException('unimplemented')
def uninstall(self):
"remove proxy on device"
raise BufferedNICException('unimplemented')
# protected
@staticmethod
def _hasdev(devclass):
"""check for existence of device, attempting to load kernel
module if not present"""
devname = '%s0' % devclass
rth = getrth()
if rth.getlink(devname):
return True
if util.modprobe(devclass) and rth.getlink(devname):
return True
return False
class IFBBuffer(Netbuf):
"""Capture packets arriving on a VIF using an ingress filter and tc
mirred action to forward them to an IFB device.
"""
@staticmethod
def devclass():
return 'ifb'
def install(self, vif):
self.vif = vif
# voodoo from http://www.linuxfoundation.org/collaborate/workgroups/networking/ifb#Typical_Usage
util.runcmd('ip link set %s up' % self.devname)
try:
util.runcmd('tc qdisc add dev %s ingress' % vif.dev)
except util.PipeException, e:
# check if error indicates that ingress qdisc
# already exists on the vif. If so, ignore it.
ignoreme = 'RTNETLINK answers: File exists'
if ignoreme in str(e):
pass
else:
raise e
util.runcmd('tc filter add dev %s parent ffff: proto ip pref 10 '
'u32 match u32 0 0 action mirred egress redirect '
'dev %s' % (vif.dev, self.devname))
def uninstall(self):
try:
util.runcmd('tc qdisc del dev %s ingress' % self.vif.dev)
except util.PipeException, e:
pass
util.runcmd('ip link set %s down' % self.devname)
class IMQBuffer(Netbuf):
"""Redirect packets coming in on vif to an IMQ device."""
imqebt = '/usr/lib/xen/bin/imqebt'
@staticmethod
def devclass():
return 'imq'
def install(self, vif):
# stopgap hack to set up IMQ for an interface. Wrong in many ways.
self.vif = vif
for mod in ['imq', 'ebt_imq']:
util.runcmd(['modprobe', mod])
util.runcmd("ip link set %s up" % self.devname)
util.runcmd("%s -F FORWARD" % self.imqebt)
util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (self.imqebt, vif.dev, self.devname))
def uninstall(self):
util.runcmd("%s -F FORWARD" % self.imqebt)
util.runcmd('ip link set %s down' % self.devname)
# in order of desirability
netbuftypes = [IFBBuffer, IMQBuffer]
def selectnetbuf():
"Find the best available buffer type"
for driver in netbuftypes:
if driver.available():
return driver
raise BufferedNICException('no net buffer available')
class Netbufpool(object):
"""Allocates/releases proxy netdevs (IMQ/IFB)
A file contains a list of entries of the form <pid>:<device>\n
To allocate a device, lock the file, then claim a new device if
one is free. If there are no free devices, check each PID for liveness
and take a device if the PID is dead, otherwise return failure.
Add an entry to the file before releasing the lock.
"""
def __init__(self, netbufclass):
"Create a pool of Device"
self.netbufclass = netbufclass
self.path = '/var/run/remus/' + self.netbufclass.devclass()
self.devices = self.getdevs()
pooldir = os.path.dirname(self.path)
if not os.path.exists(pooldir):
os.makedirs(pooldir, 0755)
def get(self):
"allocate a free device"
def getfreedev(table):
for dev in self.devices:
if dev not in table or not util.checkpid(table[dev]):
return dev
return None
lock = util.Lock(self.path)
table = self.load()
dev = getfreedev(table)
if not dev:
lock.unlock()
raise BufferedNICException('no free devices')
dev = self.netbufclass(dev)
table[dev.devname] = os.getpid()
self.save(table)
lock.unlock()
return dev
def put(self, dev):
"release claim on device"
lock = util.Lock(self.path)
table = self.load()
del table[dev.devname]
self.save(table)
lock.unlock()
# private
def load(self):
"""load and parse allocation table"""
table = {}
if not os.path.exists(self.path):
return table
fd = open(self.path)
for line in fd.readlines():
iface, pid = line.strip().split()
table[iface] = int(pid)
fd.close()
return table
def save(self, table):
"""write table to disk"""
lines = ['%s %d\n' % (iface, table[iface]) for iface in sorted(table)]
fd = open(self.path, 'w')
fd.writelines(lines)
fd.close()
def getdevs(self):
"""find all available devices of our device type"""
ifaces = []
for line in util.runcmd('ifconfig -a -s').splitlines():
iface = line.split()[0]
if iface.startswith(self.netbufclass.devclass()):
ifaces.append(iface)
return ifaces
class BufferedNIC(CheckpointedDevice):
"""
Buffer a protected domain's network output between rounds so that
nothing is issued that a failover might not know about.
"""
def __init__(self, vif):
self.installed = False
self.vif = vif
self.pool = Netbufpool(selectnetbuf())
self.rth = getrth()
self.setup()
def __del__(self):
self.uninstall()
def postsuspend(self):
if not self.installed:
self.install()
self._sendqmsg(qdisc.TC_PLUG_BUFFER)
def commit(self):
'''Called when checkpoint has been acknowledged by
the backup'''
self._sendqmsg(qdisc.TC_PLUG_RELEASE_ONE)
# private
def _sendqmsg(self, action):
self.q.action = action
req = qdisc.changerequest(self.bufdevno, self.handle, self.q)
self.rth.talk(req.pack())
return True
def setup(self):
"""install Remus plug on VIF outbound traffic"""
self.bufdev = self.pool.get()
devname = self.bufdev.devname
bufdev = self.rth.getlink(devname)
if not bufdev:
raise BufferedNICException('could not find device %s' % devname)
self.bufdev.install(self.vif)
self.bufdevno = bufdev['index']
self.handle = qdisc.TC_H_ROOT
self.q = qdisc.PlugQdisc()
if not util.modprobe('sch_plug'):
raise BufferedNICException('could not load sch_plug module')
def install(self):
devname = self.bufdev.devname
q = self.rth.getqdisc(self.bufdevno)
if q:
if q['kind'] == 'plug':
self.installed = True
return
if q['kind'] not in ('ingress', 'pfifo_fast', 'mq'):
raise BufferedNICException('there is already a queueing '
'discipline %s on %s' % (q['kind'], devname))
print ('installing buffer on %s... ' % devname),
req = qdisc.addrequest(self.bufdevno, self.handle, self.q)
self.rth.talk(req.pack())
self.installed = True
print 'done.'
def uninstall(self):
if self.installed:
try:
req = qdisc.delrequest(self.bufdevno, self.handle)
self.rth.talk(req.pack())
except IOError, e:
pass
self.installed = False
try:
self.bufdev.uninstall()
except util.PipeException, e:
pass
self.pool.put(self.bufdev)
|