aboutsummaryrefslogtreecommitdiffstats
path: root/scripts/qmp/qmp-shell
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/qmp/qmp-shell')
-rwxr-xr-xscripts/qmp/qmp-shell390
1 files changed, 390 insertions, 0 deletions
diff --git a/scripts/qmp/qmp-shell b/scripts/qmp/qmp-shell
new file mode 100755
index 00000000..65280d29
--- /dev/null
+++ b/scripts/qmp/qmp-shell
@@ -0,0 +1,390 @@
+#!/usr/bin/python
+#
+# Low-level QEMU shell on top of QMP.
+#
+# Copyright (C) 2009, 2010 Red Hat Inc.
+#
+# Authors:
+# Luiz Capitulino <lcapitulino@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+# Usage:
+#
+# Start QEMU with:
+#
+# # qemu [...] -qmp unix:./qmp-sock,server
+#
+# Run the shell:
+#
+# $ qmp-shell ./qmp-sock
+#
+# Commands have the following format:
+#
+# < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
+#
+# For example:
+#
+# (QEMU) device_add driver=e1000 id=net1
+# {u'return': {}}
+# (QEMU)
+
+import qmp
+import json
+import ast
+import readline
+import sys
+import pprint
+
+class QMPCompleter(list):
+ def complete(self, text, state):
+ for cmd in self:
+ if cmd.startswith(text):
+ if not state:
+ return cmd
+ else:
+ state -= 1
+
+class QMPShellError(Exception):
+ pass
+
+class QMPShellBadPort(QMPShellError):
+ pass
+
+class FuzzyJSON(ast.NodeTransformer):
+ '''This extension of ast.NodeTransformer filters literal "true/false/null"
+ values in an AST and replaces them by proper "True/False/None" values that
+ Python can properly evaluate.'''
+ def visit_Name(self, node):
+ if node.id == 'true':
+ node.id = 'True'
+ if node.id == 'false':
+ node.id = 'False'
+ if node.id == 'null':
+ node.id = 'None'
+ return node
+
+# TODO: QMPShell's interface is a bit ugly (eg. _fill_completion() and
+# _execute_cmd()). Let's design a better one.
+class QMPShell(qmp.QEMUMonitorProtocol):
+ def __init__(self, address, pp=None):
+ qmp.QEMUMonitorProtocol.__init__(self, self.__get_address(address))
+ self._greeting = None
+ self._completer = None
+ self._pp = pp
+ self._transmode = False
+ self._actions = list()
+
+ def __get_address(self, arg):
+ """
+ Figure out if the argument is in the port:host form, if it's not it's
+ probably a file path.
+ """
+ addr = arg.split(':')
+ if len(addr) == 2:
+ try:
+ port = int(addr[1])
+ except ValueError:
+ raise QMPShellBadPort
+ return ( addr[0], port )
+ # socket path
+ return arg
+
+ def _fill_completion(self):
+ for cmd in self.cmd('query-commands')['return']:
+ self._completer.append(cmd['name'])
+
+ def __completer_setup(self):
+ self._completer = QMPCompleter()
+ self._fill_completion()
+ readline.set_completer(self._completer.complete)
+ readline.parse_and_bind("tab: complete")
+ # XXX: default delimiters conflict with some command names (eg. query-),
+ # clearing everything as it doesn't seem to matter
+ readline.set_completer_delims('')
+
+ def __parse_value(self, val):
+ try:
+ return int(val)
+ except ValueError:
+ pass
+
+ if val.lower() == 'true':
+ return True
+ if val.lower() == 'false':
+ return False
+ if val.startswith(('{', '[')):
+ # Try first as pure JSON:
+ try:
+ return json.loads(val)
+ except ValueError:
+ pass
+ # Try once again as FuzzyJSON:
+ try:
+ st = ast.parse(val, mode='eval')
+ return ast.literal_eval(FuzzyJSON().visit(st))
+ except SyntaxError:
+ pass
+ except ValueError:
+ pass
+ return val
+
+ def __cli_expr(self, tokens, parent):
+ for arg in tokens:
+ (key, _, val) = arg.partition('=')
+ if not val:
+ raise QMPShellError("Expected a key=value pair, got '%s'" % arg)
+
+ value = self.__parse_value(val)
+ optpath = key.split('.')
+ curpath = []
+ for p in optpath[:-1]:
+ curpath.append(p)
+ d = parent.get(p, {})
+ if type(d) is not dict:
+ raise QMPShellError('Cannot use "%s" as both leaf and non-leaf key' % '.'.join(curpath))
+ parent[p] = d
+ parent = d
+ if optpath[-1] in parent:
+ if type(parent[optpath[-1]]) is dict:
+ raise QMPShellError('Cannot use "%s" as both leaf and non-leaf key' % '.'.join(curpath))
+ else:
+ raise QMPShellError('Cannot set "%s" multiple times' % key)
+ parent[optpath[-1]] = value
+
+ def __build_cmd(self, cmdline):
+ """
+ Build a QMP input object from a user provided command-line in the
+ following format:
+
+ < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
+ """
+ cmdargs = cmdline.split()
+
+ # Transactional CLI entry/exit:
+ if cmdargs[0] == 'transaction(':
+ self._transmode = True
+ cmdargs.pop(0)
+ elif cmdargs[0] == ')' and self._transmode:
+ self._transmode = False
+ if len(cmdargs) > 1:
+ raise QMPShellError("Unexpected input after close of Transaction sub-shell")
+ qmpcmd = { 'execute': 'transaction',
+ 'arguments': { 'actions': self._actions } }
+ self._actions = list()
+ return qmpcmd
+
+ # Nothing to process?
+ if not cmdargs:
+ return None
+
+ # Parse and then cache this Transactional Action
+ if self._transmode:
+ finalize = False
+ action = { 'type': cmdargs[0], 'data': {} }
+ if cmdargs[-1] == ')':
+ cmdargs.pop(-1)
+ finalize = True
+ self.__cli_expr(cmdargs[1:], action['data'])
+ self._actions.append(action)
+ return self.__build_cmd(')') if finalize else None
+
+ # Standard command: parse and return it to be executed.
+ qmpcmd = { 'execute': cmdargs[0], 'arguments': {} }
+ self.__cli_expr(cmdargs[1:], qmpcmd['arguments'])
+ return qmpcmd
+
+ def _print(self, qmp):
+ jsobj = json.dumps(qmp)
+ if self._pp is not None:
+ self._pp.pprint(jsobj)
+ else:
+ print str(jsobj)
+
+ def _execute_cmd(self, cmdline):
+ try:
+ qmpcmd = self.__build_cmd(cmdline)
+ except Exception, e:
+ print 'Error while parsing command line: %s' % e
+ print 'command format: <command-name> ',
+ print '[arg-name1=arg1] ... [arg-nameN=argN]'
+ return True
+ # For transaction mode, we may have just cached the action:
+ if qmpcmd is None:
+ return True
+ if self._verbose:
+ self._print(qmpcmd)
+ resp = self.cmd_obj(qmpcmd)
+ if resp is None:
+ print 'Disconnected'
+ return False
+ self._print(resp)
+ return True
+
+ def connect(self):
+ self._greeting = qmp.QEMUMonitorProtocol.connect(self)
+ self.__completer_setup()
+
+ def show_banner(self, msg='Welcome to the QMP low-level shell!'):
+ print msg
+ version = self._greeting['QMP']['version']['qemu']
+ print 'Connected to QEMU %d.%d.%d\n' % (version['major'],version['minor'],version['micro'])
+
+ def get_prompt(self):
+ if self._transmode:
+ return "TRANS> "
+ return "(QEMU) "
+
+ def read_exec_command(self, prompt):
+ """
+ Read and execute a command.
+
+ @return True if execution was ok, return False if disconnected.
+ """
+ try:
+ cmdline = raw_input(prompt)
+ except EOFError:
+ print
+ return False
+ if cmdline == '':
+ for ev in self.get_events():
+ print ev
+ self.clear_events()
+ return True
+ else:
+ return self._execute_cmd(cmdline)
+
+ def set_verbosity(self, verbose):
+ self._verbose = verbose
+
+class HMPShell(QMPShell):
+ def __init__(self, address):
+ QMPShell.__init__(self, address)
+ self.__cpu_index = 0
+
+ def __cmd_completion(self):
+ for cmd in self.__cmd_passthrough('help')['return'].split('\r\n'):
+ if cmd and cmd[0] != '[' and cmd[0] != '\t':
+ name = cmd.split()[0] # drop help text
+ if name == 'info':
+ continue
+ if name.find('|') != -1:
+ # Command in the form 'foobar|f' or 'f|foobar', take the
+ # full name
+ opt = name.split('|')
+ if len(opt[0]) == 1:
+ name = opt[1]
+ else:
+ name = opt[0]
+ self._completer.append(name)
+ self._completer.append('help ' + name) # help completion
+
+ def __info_completion(self):
+ for cmd in self.__cmd_passthrough('info')['return'].split('\r\n'):
+ if cmd:
+ self._completer.append('info ' + cmd.split()[1])
+
+ def __other_completion(self):
+ # special cases
+ self._completer.append('help info')
+
+ def _fill_completion(self):
+ self.__cmd_completion()
+ self.__info_completion()
+ self.__other_completion()
+
+ def __cmd_passthrough(self, cmdline, cpu_index = 0):
+ return self.cmd_obj({ 'execute': 'human-monitor-command', 'arguments':
+ { 'command-line': cmdline,
+ 'cpu-index': cpu_index } })
+
+ def _execute_cmd(self, cmdline):
+ if cmdline.split()[0] == "cpu":
+ # trap the cpu command, it requires special setting
+ try:
+ idx = int(cmdline.split()[1])
+ if not 'return' in self.__cmd_passthrough('info version', idx):
+ print 'bad CPU index'
+ return True
+ self.__cpu_index = idx
+ except ValueError:
+ print 'cpu command takes an integer argument'
+ return True
+ resp = self.__cmd_passthrough(cmdline, self.__cpu_index)
+ if resp is None:
+ print 'Disconnected'
+ return False
+ assert 'return' in resp or 'error' in resp
+ if 'return' in resp:
+ # Success
+ if len(resp['return']) > 0:
+ print resp['return'],
+ else:
+ # Error
+ print '%s: %s' % (resp['error']['class'], resp['error']['desc'])
+ return True
+
+ def show_banner(self):
+ QMPShell.show_banner(self, msg='Welcome to the HMP shell!')
+
+def die(msg):
+ sys.stderr.write('ERROR: %s\n' % msg)
+ sys.exit(1)
+
+def fail_cmdline(option=None):
+ if option:
+ sys.stderr.write('ERROR: bad command-line option \'%s\'\n' % option)
+ sys.stderr.write('qemu-shell [ -v ] [ -p ] [ -H ] < UNIX socket path> | < TCP address:port >\n')
+ sys.exit(1)
+
+def main():
+ addr = ''
+ qemu = None
+ hmp = False
+ pp = None
+ verbose = False
+
+ try:
+ for arg in sys.argv[1:]:
+ if arg == "-H":
+ if qemu is not None:
+ fail_cmdline(arg)
+ hmp = True
+ elif arg == "-p":
+ if pp is not None:
+ fail_cmdline(arg)
+ pp = pprint.PrettyPrinter(indent=4)
+ elif arg == "-v":
+ verbose = True
+ else:
+ if qemu is not None:
+ fail_cmdline(arg)
+ if hmp:
+ qemu = HMPShell(arg)
+ else:
+ qemu = QMPShell(arg, pp)
+ addr = arg
+
+ if qemu is None:
+ fail_cmdline()
+ except QMPShellBadPort:
+ die('bad port number in command-line')
+
+ try:
+ qemu.connect()
+ except qmp.QMPConnectError:
+ die('Didn\'t get QMP greeting message')
+ except qmp.QMPCapabilitiesError:
+ die('Could not negotiate capabilities')
+ except qemu.error:
+ die('Could not connect to %s' % addr)
+
+ qemu.show_banner()
+ qemu.set_verbosity(verbose)
+ while qemu.read_exec_command(qemu.get_prompt()):
+ pass
+ qemu.close()
+
+if __name__ == '__main__':
+ main()