aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.rootkeys1
-rw-r--r--tools/python/xen/xend/XendDomain.py4
-rw-r--r--tools/python/xen/xend/XendMigrate.py585
3 files changed, 1 insertions, 589 deletions
diff --git a/.rootkeys b/.rootkeys
index 17ee3a6b3c..adfd52f597 100644
--- a/.rootkeys
+++ b/.rootkeys
@@ -838,7 +838,6 @@
40c9c4685ykq87_n1kVUbMr9flx9fg tools/python/xen/xend/XendDomainInfo.py
40f50d99YiiaMI1fZBh1VCDFLD57qg tools/python/xen/xend/XendError.py
40ffc44eGsgTEY355E3nN4mPLZHhMQ tools/python/xen/xend/XendLogging.py
-40c9c46854nsHmuxHQHncKk5rAs5NA tools/python/xen/xend/XendMigrate.py
40c9c468M96gA1EYDvNa5w5kQNYLFA tools/python/xen/xend/XendNode.py
4151594bhib4aUerB2SMKDl-iCtc4Q tools/python/xen/xend/XendProtocol.py
40c9c4686jruMyZIqiaZRMiMoqMJtg tools/python/xen/xend/XendRoot.py
diff --git a/tools/python/xen/xend/XendDomain.py b/tools/python/xen/xend/XendDomain.py
index 8a12311bd8..55553809d3 100644
--- a/tools/python/xen/xend/XendDomain.py
+++ b/tools/python/xen/xend/XendDomain.py
@@ -19,7 +19,6 @@ import XendRoot; xroot = XendRoot.instance()
import XendCheckpoint
import XendDB
import XendDomainInfo
-import XendMigrate
import EventServer; eserver = EventServer.instance()
from XendError import XendError
from XendLogging import log
@@ -511,8 +510,7 @@ class XendDomain:
# Need a cancel too?
# Don't forget to cancel restart for it.
dominfo = self.domain_lookup(id)
- xmigrate = XendMigrate.instance()
- return xmigrate.migrate_begin(dominfo, dst, live=live, resource=resource)
+ return None
def domain_save(self, id, dst, progress=False):
"""Start saving a domain to file.
diff --git a/tools/python/xen/xend/XendMigrate.py b/tools/python/xen/xend/XendMigrate.py
deleted file mode 100644
index 5264d68f5a..0000000000
--- a/tools/python/xen/xend/XendMigrate.py
+++ /dev/null
@@ -1,585 +0,0 @@
-# Copyright (C) 2004 Mike Wray <mike.wray@hp.com>
-
-import traceback
-import threading
-
-import errno
-import sys
-import socket
-import time
-import types
-
-from xen.web import reactor
-from xen.web.protocol import Protocol, ClientFactory
-
-import scheduler
-import sxp
-import XendDB
-import EventServer; eserver = EventServer.instance()
-from XendError import XendError
-from XendLogging import log
-
-"""The port for the migrate/save daemon xfrd."""
-XFRD_PORT = 8002
-
-"""The transfer protocol major version number."""
-XFR_PROTO_MAJOR = 1
-"""The transfer protocol minor version number."""
-XFR_PROTO_MINOR = 0
-
-class Xfrd(Protocol):
- """Protocol handler for a connection to the migration/save daemon xfrd.
- """
-
- def __init__(self, xinfo):
- self.parser = sxp.Parser()
- self.xinfo = xinfo
-
- def connectionMade(self, addr=None):
- # Send hello.
- self.request(['xfr.hello', XFR_PROTO_MAJOR, XFR_PROTO_MINOR])
- # Send request.
- self.xinfo.request(self)
- # Run the transport mainLoop which reads from the peer.
- self.transport.mainLoop()
-
- def request(self, req):
- sxp.show(req, out=self.transport)
-
- def loseConnection(self):
- self.transport.loseConnection()
-
- def connectionLost(self, reason):
- self.xinfo.connectionLost(reason)
-
- def dataReceived(self, data):
- self.parser.input(data)
- if self.parser.ready():
- val = self.parser.get_val()
- self.xinfo.dispatch(self, val)
- if self.parser.at_eof():
- self.loseConnection()
-
-class XfrdClientFactory(ClientFactory):
- """Factory for clients of the migration/save daemon xfrd.
- """
-
- def __init__(self, xinfo):
- #ClientFactory.__init__(self)
- self.xinfo = xinfo
- self.readyCond = threading.Condition()
- self.ready = False
- self.err = None
-
- def start(self):
- print 'XfrdClientFactory>start>'
- reactor.connectTCP('localhost', XFRD_PORT, self)
- try:
- self.readyCond.acquire()
- while not self.ready:
- self.readyCond.wait()
- finally:
- self.readyCond.release()
- print 'XfrdClientFactory>start>', 'err=', self.err
- if self.err:
- raise self.err
- return 0
-
- def notifyReady(self):
- try:
- self.readyCond.acquire()
- self.ready = True
- self.err = self.xinfo.error_summary()
- self.readyCond.notify()
- finally:
- self.readyCond.release()
-
- def startedConnecting(self, connector):
- pass
-
- def buildProtocol(self, addr):
- return Xfrd(self.xinfo)
-
- def clientConnectionLost(self, connector, reason):
- print "XfrdClientFactory>clientConnectionLost>", reason
- self.notifyReady()
-
- def clientConnectionFailed(self, connector, reason):
- print "XfrdClientFactory>clientConnectionFailed>", reason
- self.xinfo.error(reason)
- self.notifyReady()
-
-class SuspendHandler:
-
- def __init__(self, xinfo, vmid, timeout):
- self.xinfo = xinfo
- self.vmid = vmid
- self.timeout = timeout
- self.readyCond = threading.Condition()
- self.ready = False
- self.err = None
-
- def start(self):
- self.subscribe(on=True)
- timer = scheduler.later(self.timeout, self.onTimeout)
- try:
- self.readyCond.acquire()
- while not self.ready:
- self.readyCond.wait()
- finally:
- self.readyCond.release()
- self.subscribe(on=False)
- timer.cancel()
- if self.err:
- raise XendError(self.err)
-
- def notifyReady(self, err=None):
- try:
- self.readyCond.acquire()
- if not self.ready:
- self.ready = True
- self.err = err
- self.readyCond.notify()
- finally:
- self.readyCond.release()
-
- def subscribe(self, on=True):
- # Subscribe to 'suspended' events so we can tell when the
- # suspend completes. Subscribe to 'died' events so we can tell if
- # the domain died.
- if on:
- action = eserver.subscribe
- else:
- action = eserver.unsubscribe
- action('xend.domain.suspended', self.onSuspended)
- action('xend.domain.died', self.onDied)
-
- def onSuspended(self, e, v):
- if v[1] != self.vmid: return
- print 'SuspendHandler>onSuspended>', e, v
- self.notifyReady()
-
- def onDied(self, e, v):
- if v[1] != self.vmid: return
- print 'SuspendHandler>onDied>', e, v
- self.notifyReady('Domain %s died while suspending' % self.vmid)
-
- def onTimeout(self):
- print 'SuspendHandler>onTimeout>'
- self.notifyReady('Domain %s suspend timed out' % self.vmid)
-
-class XfrdInfo:
- """Abstract class for info about a session with xfrd.
- Has subclasses for save and migrate.
- """
-
- """Suspend timeout (seconds).
- We set a timeout because suspending a domain can hang."""
- timeout = 30
-
- def __init__(self):
- from xen.xend import XendDomain
- self.xd = XendDomain.instance()
- self.suspended = {}
- self.paused = {}
- self.state = 'init'
- # List of errors encountered.
- self.errors = []
-
- def vmconfig(self):
- dominfo = self.xd.domain_get(self.src_dom)
- if dominfo:
- val = sxp.to_string(dominfo.sxpr())
- else:
- val = None
- return val
-
- def add_error(self, err):
- """Add an error to the error list.
- Returns the error added.
- """
- if err not in self.errors:
- self.errors.append(err)
- return err
-
- def error_summary(self, msg=None):
- """Get a XendError summarising the errors (if any).
- """
- if not self.errors:
- return None
- if msg is None:
- msg = "errors"
- if self.errors:
- errmsg = msg + ': ' + ', '.join(map(str, self.errors))
- else:
- errmsg = msg
- return XendError(errmsg)
-
- def get_errors(self):
- """Get the list of errors.
- """
- return self.errors
-
- def error(self, err):
- print 'XfrdInfo>error>', err
- self.state = 'error'
- self.add_error(err)
-
- def dispatch(self, xfrd, val):
- print 'XfrdInfo>dispatch>', val
- op = sxp.name(val)
- op = op.replace('.', '_')
- if op.startswith('xfr_'):
- fn = getattr(self, op, self.unknown)
- else:
- fn = self.unknown
- try:
- val = fn(xfrd, val)
- if val:
- sxp.show(val, out=xfrd.transport)
- except Exception, err:
- print 'XfrdInfo>dispatch> error:', err
- val = ['xfr.err', errno.EINVAL]
- sxp.show(val, out=xfrd.transport)
- self.error(err)
-
- def unknown(self, xfrd, val):
- xfrd.loseConnection()
- return None
-
- def xfr_err(self, xfrd, val):
- # If we get an error with non-zero code the operation failed.
- # An error with code zero indicates hello success.
- print 'XfrdInfo>xfr_err>', val
- v = sxp.child0(val)
- err = int(sxp.child0(val))
- if not err: return
- self.error("transfer daemon (xfrd) error: " + str(err))
- xfrd.loseConnection()
- return None
-
- def xfr_progress(self, xfrd, val):
- return None
-
- def xfr_vm_destroy(self, xfrd, val):
- try:
- vmid = sxp.child0(val)
- val = self.xd.domain_destroy(vmid)
- if vmid in self.paused:
- del self.paused[vmid]
- if vmid in self.suspended:
- del self.suspended[vmid]
- except StandardError, err:
- self.add_error("vm_destroy failed")
- self.add_error(err)
- val = errno.EINVAL
- return ['xfr.err', val]
-
- def xfr_vm_pause(self, xfrd, val):
- try:
- vmid = sxp.child0(val)
- val = self.xd.domain_pause(vmid)
- self.paused[vmid] = 1
- except StandardError, err:
- self.add_error("vm_pause failed")
- self.add_error(err)
- val = errno.EINVAL
- return ['xfr.err', val]
-
- def xfr_vm_unpause(self, xfrd, val):
- try:
- vmid = sxp.child0(val)
- val = self.xd.domain_unpause(vmid)
- if vmid in self.paused:
- del self.paused[vmid]
- except StandardError, err:
- self.add_error("vm_unpause failed")
- self.add_error(err)
- val = errno.EINVAL
- return ['xfr.err', val]
-
- def xfr_vm_suspend(self, xfrd, val):
- """Suspend a domain.
- Suspending can hang, so we set a timeout and fail if it
- takes too long.
- """
- try:
- vmid = sxp.child0(val)
- h = SuspendHandler(self, vmid, self.timeout)
- val = self.xd.domain_shutdown(vmid, reason='suspend')
- self.suspended[vmid] = 1
- h.start()
- print 'xfr_vm_suspend> suspended', vmid
- except Exception, err:
- print 'xfr_vm_suspend> err', err
- self.add_error("suspend failed")
- self.add_error(err)
- traceback.print_exc()
- val = errno.EINVAL
- return ['xfr.err', val]
-
- def connectionLost(self, reason=None):
- print 'XfrdInfo>connectionLost>', reason
- for vmid in self.suspended:
- try:
- self.xd.domain_destroy(vmid)
- except:
- pass
- for vmid in self.paused:
- try:
- self.xd.domain_unpause(vmid)
- except:
- pass
-
-class XendMigrateInfo(XfrdInfo):
- """Representation of a migrate in-progress and its interaction with xfrd.
- """
-
- def __init__(self, xid, dominfo, host, port, live=0, resource=0):
- XfrdInfo.__init__(self)
- self.xid = xid
- self.dominfo = dominfo
- self.state = 'begin'
- self.src_host = socket.gethostname()
- self.src_dom = dominfo.id
- self.dst_host = host
- self.dst_port = port
- self.dst_dom = None
- self.live = live
- self.resource = resource
- self.start = 0
-
- def sxpr(self):
- sxpr = ['migrate',
- ['id', self.xid ],
- ['state', self.state ],
- ['live', self.live ],
- ['resource', self.resource ] ]
- sxpr_src = ['src', ['host', self.src_host], ['domain', self.src_dom] ]
- sxpr.append(sxpr_src)
- sxpr_dst = ['dst', ['host', self.dst_host] ]
- if self.dst_dom:
- sxpr_dst.append(['domain', self.dst_dom])
- sxpr.append(sxpr_dst)
- return sxpr
-
- def request(self, xfrd):
- vmconfig = self.vmconfig()
- if not vmconfig:
- self.error(XendError("vm config not found"))
- xfrd.loseConnection()
- return
- log.info('Migrate BEGIN: %s' % str(self.sxpr()))
- eserver.inject('xend.domain.migrate',
- [ self.dominfo.name, self.dominfo.id, "begin", self.sxpr() ])
- xfrd.request(['xfr.migrate',
- self.src_dom,
- vmconfig,
- self.dst_host,
- self.dst_port,
- self.live,
- self.resource ])
-
- def xfr_migrate_ok(self, xfrd, val):
- dom = int(sxp.child0(val))
- self.state = 'ok'
- self.dst_dom = dom
- self.xd.domain_destroy(self.src_dom)
-
- def connectionLost(self, reason=None):
- print 'XendMigrateInfo>connectionLost>', reason
- XfrdInfo.connectionLost(self, reason)
- if self.state =='ok':
- log.info('Migrate OK: ' + str(self.sxpr()))
- else:
- self.state = 'error'
- self.error("migrate failed")
- log.info('Migrate ERROR: ' + str(self.sxpr()))
- eserver.inject('xend.domain.migrate',
- [ self.dominfo.name, self.dominfo.id, self.state, self.sxpr() ])
-
-class XendSaveInfo(XfrdInfo):
- """Representation of a save in-progress and its interaction with xfrd.
- """
-
- def __init__(self, xid, dominfo, file):
- XfrdInfo.__init__(self)
- self.xid = xid
- self.dominfo = dominfo
- self.state = 'begin'
- self.src_dom = dominfo.id
- self.file = file
- self.start = 0
-
- def sxpr(self):
- sxpr = ['save',
- ['id', self.xid],
- ['state', self.state],
- ['domain', self.src_dom],
- ['file', self.file] ]
- return sxpr
-
- def request(self, xfrd):
- vmconfig = self.vmconfig()
- if not vmconfig:
- self.error(XendError("vm config not found"))
- xfrd.loseConnection()
- return
- log.info('Save BEGIN: ' + str(self.sxpr()))
- eserver.inject('xend.domain.save',
- [ self.dominfo.name, self.dominfo.id,
- "begin", self.sxpr() ])
- xfrd.request(['xfr.save', self.src_dom, vmconfig, self.file ])
-
- def xfr_save_ok(self, xfrd, val):
- self.state = 'ok'
- self.xd.domain_destroy(self.src_dom)
-
- def connectionLost(self, reason=None):
- print 'XendSaveInfo>connectionLost>', reason
- XfrdInfo.connectionLost(self, reason)
- if self.state =='ok':
- log.info('Save OK: ' + str(self.sxpr()))
- else:
- self.state = 'error'
- self.error("save failed")
- log.info('Save ERROR: ' + str(self.sxpr()))
- eserver.inject('xend.domain.save',
- [ self.dominfo.name, self.dominfo.id,
- self.state, self.sxpr() ])
-
-class XendRestoreInfo(XfrdInfo):
- """Representation of a restore in-progress and its interaction with xfrd.
- """
-
- def __init__(self, xid, file):
- XfrdInfo.__init__(self)
- self.xid = xid
- self.state = 'begin'
- self.file = file
-
- def sxpr(self):
- sxpr = ['restore',
- ['id', self.xid],
- ['file', self.file] ]
- return sxpr
-
- def request(self, xfrd):
- log.info('restore BEGIN: ' + str(self.sxpr()))
- eserver.inject('xend.restore', [ 'begin', self.sxpr()])
-
- xfrd.request(['xfr.restore', self.file ])
-
- def xfr_restore_ok(self, xfrd, val):
- dom = int(sxp.child0(val))
- dominfo = self.xd.domain_get(dom)
- self.state = 'ok'
-
- def connectionLost(self, reason=None):
- XfrdInfo.connectionLost(self, reason)
- if self.state =='ok':
- log.info('Restore OK: ' + self.file)
- else:
- self.state = 'error'
- self.error("restore failed")
- log.info('Restore ERROR: ' + str(self.sxpr()))
- eserver.inject('xend.restore', [ self.state, self.sxpr()])
-
-class XendMigrate:
- """External api for interaction with xfrd for migrate and save.
- Singleton.
- """
- # Use log for indications of begin/end/errors?
- # Need logging of: domain create/halt, migrate begin/end/fail
- # Log via event server?
-
- dbpath = "migrate"
-
- def __init__(self):
- self.db = XendDB.XendDB(self.dbpath)
- self.session = {}
- self.session_db = self.db.fetchall("")
- self.xid = 0
-
- def nextid(self):
- self.xid += 1
- return "%d" % self.xid
-
- def sync(self):
- self.db.saveall("", self.session_db)
-
- def sync_session(self, xid):
- self.db.save(xid, self.session_db[xid])
-
- def close(self):
- pass
-
- def _add_session(self, info):
- xid = info.xid
- self.session[xid] = info
- self.session_db[xid] = info.sxpr()
- self.sync_session(xid)
-
- def _delete_session(self, xid):
- if xid in self.session:
- del self.session[xid]
- if xid in self.session_db:
- del self.session_db[xid]
- self.db.delete(xid)
-
- def session_ls(self):
- return self.session.keys()
-
- def sessions(self):
- return self.session.values()
-
- def session_get(self, xid):
- return self.session.get(xid)
-
- def session_begin(self, info):
- """Add the session to the table and start it.
- Remove the session from the table when it finishes.
-
- @param info: session
- """
- self._add_session(info)
- try:
- xcf = XfrdClientFactory(info)
- return xcf.start()
- finally:
- self._delete_session(info.xid)
-
- def migrate_begin(self, dominfo, host, port=XFRD_PORT, live=0, resource=0):
- """Begin to migrate a domain to another host.
-
- @param dominfo: domain info
- @param host: destination host
- @param port: destination port
- """
- xid = self.nextid()
- info = XendMigrateInfo(xid, dominfo, host, port, live, resource)
- return self.session_begin(info)
-
- def save_begin(self, dominfo, file):
- """Begin saving a domain to file.
-
- @param dominfo: domain info
- @param file: destination file
- """
- xid = self.nextid()
- info = XendSaveInfo(xid, dominfo, file)
- return self.session_begin(info)
-
- def restore_begin(self, file):
- xid = self.nextid()
- info = XendRestoreInfo(xid, file)
- return self.session_begin(info)
-
-
-def instance():
- global inst
- try:
- inst
- except:
- inst = XendMigrate()
- return inst