#! /usr/bin/env python # # pygrub - simple python-based bootloader for Xen # # Copyright 2005-2006 Red Hat, Inc. # Jeremy Katz # # This software may be freely redistributed under the terms of the GNU # general public license. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. # import os, sys, string, struct, tempfile, re, traceback import copy import logging import platform import xen.lowlevel.xc import curses, _curses, curses.wrapper, curses.textpad, curses.ascii import getopt import fsimage import grub.GrubConf import grub.LiloConf import grub.ExtLinuxConf PYGRUB_VER = 0.6 FS_READ_MAX = 1024 * 1024 def enable_cursor(ison): if ison: val = 2 else: val = 0 try: curses.curs_set(val) except _curses.error: pass DISK_TYPE_RAW, DISK_TYPE_HYBRIDISO, DISK_TYPE_DOS = range(3) def identify_disk_image(file): """Detect DOS partition table or HybridISO format.""" fd = os.open(file, os.O_RDONLY) buf = os.read(fd, 0x8006) os.close(fd) if len(buf) >= 512 and \ struct.unpack("H", buf[0x1fe: 0x200]) == (0xaa55,): # HybridISO contains a DOS partition table for booting from USB devices, but really is an ISO image if len(buf) >= 0x8006 and buf[0x8001:0x8006] == 'CD001': return DISK_TYPE_HYBRIDISO return DISK_TYPE_DOS return DISK_TYPE_RAW SECTOR_SIZE=512 DK_LABEL_LOC=1 DKL_MAGIC=0xdabe V_ROOT=0x2 def get_solaris_slice(file, offset): """Find the root slice in a Solaris VTOC.""" fd = os.open(file, os.O_RDONLY) os.lseek(fd, offset + (DK_LABEL_LOC * SECTOR_SIZE), 0) buf = os.read(fd, 512) os.close(fd) if struct.unpack("0: buf = os.read(fd, partsize) offsets.append(struct.unpack(" ") screen.noutrefresh() win = curses.newwin(1, 74, startx, starty + 2) curses.textpad.Textbox.__init__(self, win) self.line = list(line) self.pos = len(line) self.cancelled = False self.show_text() def show_text(self): """Show the text. One of our advantages over standard textboxes is that we can handle lines longer than the window.""" self.win.erase() p = self.pos off = 0 while p > 70: p -= 55 off += 55 l = self.line[off:off+70] self.win.addstr(0, 0, string.join(l, (""))) if self.pos > 70: self.win.addch(0, 0, curses.ACS_LARROW) self.win.move(0, p) def do_command(self, ch): # we handle escape as well as moving the line around, so have # to override some of the default handling self.lastcmd = ch if ch == 27: # esc self.cancelled = True return 0 elif curses.ascii.isprint(ch): self.line.insert(self.pos, chr(ch)) self.pos += 1 elif ch == curses.ascii.SOH: # ^a self.pos = 0 elif ch in (curses.ascii.STX,curses.KEY_LEFT): if self.pos > 0: self.pos -= 1 elif ch in (curses.ascii.BS,curses.KEY_BACKSPACE): if self.pos > 0: self.pos -= 1 if self.pos < len(self.line): self.line.pop(self.pos) elif ch == curses.ascii.EOT: # ^d if self.pos < len(self.line): self.line.pop(self.pos) elif ch == curses.ascii.ENQ: # ^e self.pos = len(self.line) elif ch in (curses.ascii.ACK, curses.KEY_RIGHT): if self.pos < len(self.line): self.pos +=1 elif ch == curses.ascii.VT: # ^k self.line = self.line[:self.pos] else: return curses.textpad.Textbox.do_command(self, ch) self.show_text() return 1 def edit(self): curses.doupdate() r = curses.textpad.Textbox.edit(self) if self.cancelled: return None return string.join(self.line, "") class Grub: ENTRY_WIN_LINES = 8 def __init__(self, file, fs = None): self.screen = None self.entry_win = None self.text_win = None if file: self.read_config(file, fs) def draw_main_windows(self): if self.screen is None: #only init stuff once self.screen = curses.initscr() self.screen.timeout(1000) if hasattr(curses, 'use_default_colors'): try: curses.use_default_colors() except: pass # Not important if we can't use colour enable_cursor(False) self.entry_win = curses.newwin(Grub.ENTRY_WIN_LINES + 2, 74, 2, 1) self.text_win = curses.newwin(10, 70, 12, 5) curses.def_prog_mode() curses.reset_prog_mode() self.screen.erase() # create basic grub screen with a box of entries and a textbox self.screen.addstr(1, 4, "pyGRUB version %s" %(PYGRUB_VER,)) self.entry_win.box() self.screen.noutrefresh() def fill_entry_list(self): self.entry_win.erase() self.entry_win.box() maxy = self.entry_win.getmaxyx()[0]-3 # maxy - 2 for the frame + index if self.selected_image > self.start_image + maxy: self.start_image = self.selected_image if self.selected_image < self.start_image: self.start_image = self.selected_image for y in range(self.start_image, len(self.cf.images)): i = self.cf.images[y] if y > self.start_image + maxy: break if y == self.selected_image: self.entry_win.attron(curses.A_REVERSE) self.entry_win.addstr(y + 1 - self.start_image, 2, i.title.expandtabs().ljust(70)) if y == self.selected_image: self.entry_win.attroff(curses.A_REVERSE) self.entry_win.noutrefresh() def edit_entry(self, origimg): def draw(): self.draw_main_windows() self.text_win.addstr(0, 0, "Use the U and D keys to select which entry is highlighted.") self.text_win.addstr(1, 0, "Press 'b' to boot, 'e' to edit the selected command in the") self.text_win.addstr(2, 0, "boot sequence, 'c' for a command-line, 'o' to open a new line") self.text_win.addstr(3, 0, "after ('O' for before) the selected line, 'd' to remove the") self.text_win.addstr(4, 0, "selected line, or escape to go back to the main menu.") self.text_win.addch(0, 8, curses.ACS_UARROW) self.text_win.addch(0, 14, curses.ACS_DARROW) (y, x) = self.text_win.getmaxyx() self.text_win.move(y - 1, x - 1) self.text_win.noutrefresh() curline = 0 pos = 0 img = copy.deepcopy(origimg) while 1: draw() self.entry_win.erase() rs = 0 re = len(img.lines) idp = 1 if re > Grub.ENTRY_WIN_LINES: rs = curline - pos re = rs + Grub.ENTRY_WIN_LINES for idx in range(rs, re): # current line should be highlighted if idx == curline: self.entry_win.attron(curses.A_REVERSE) # trim the line l = img.lines[idx].expandtabs().ljust(70) if len(l) > 70: l = l[:69] + ">" self.entry_win.addstr(idp, 2, l) if idx == curline: self.entry_win.attroff(curses.A_REVERSE) idp += 1 self.entry_win.box() self.entry_win.noutrefresh() curses.doupdate() c = self.screen.getch() if c in (ord('q'), 27): # 27 == esc break elif c == curses.KEY_UP: curline -= 1 if pos > 0: pos -= 1 elif c == curses.KEY_DOWN: curline += 1 if pos < Grub.ENTRY_WIN_LINES - 1: pos += 1 elif c == ord('b'): self.isdone = True break elif c == ord('e'): l = self.edit_line(img.lines[curline]) if l is not None: img.set_from_line(l, replace = curline) elif c == ord('d'): img.lines.pop(curline) elif c == ord('o'): img.lines.insert(curline+1, "") curline += 1 elif c == ord('O'): img.lines.insert(curline, "") elif c == ord('c'): self.command_line_mode() if self.isdone: return # bound at the top and bottom if curline < 0: curline = 0 elif curline >= len(img.lines): curline = len(img.lines) - 1 if self.isdone: # Fix to allow pygrub command-line editing in Lilo bootloader (used by IA64) if platform.machine() == 'ia64': origimg.reset(img.lines, img.path) else: origimg.reset(img.lines) def edit_line(self, line): self.screen.erase() self.screen.addstr(1, 2, "[ Minimal BASH-like line editing is supported. ") self.screen.addstr(2, 2, " ESC at any time cancels. ENTER at any time accepts your changes. ]") self.screen.noutrefresh() t = GrubLineEditor(self.screen, 5, 2, line) enable_cursor(True) ret = t.edit() if ret: return ret return None def command_line_mode(self): self.screen.erase() self.screen.addstr(1, 2, "[ Minimal BASH-like line editing is supported. ESC at any time ") self.screen.addstr(2, 2, " exits. Typing 'boot' will boot with your entered commands. ] ") self.screen.noutrefresh() y = 5 lines = [] while 1: t = GrubLineEditor(self.screen, y, 2) enable_cursor(True) ret = t.edit() if ret: if ret in ("quit", "return"): break elif ret != "boot": y += 1 lines.append(ret) continue # if we got boot, then we want to boot the entered image img = self.cf.new_image("entered", lines) self.cf.add_image(img) self.selected_image = len(self.cf.images) - 1 self.isdone = True break # else, we cancelled and should just go back break def read_config(self, fn, fs = None): """Read the given file to parse the config. If fs = None, then we're being given a raw config file rather than a disk image.""" if not os.access(fn, os.R_OK): raise RuntimeError, "Unable to access %s" %(fn,) if platform.machine() == 'ia64': cfg_list = map(lambda x: (x,grub.LiloConf.LiloConfigFile), # common distributions ["/efi/debian/elilo.conf", "/efi/gentoo/elilo.conf", "/efi/redflag/elilo.conf", "/efi/redhat/elilo.conf", "/efi/SuSE/elilo.conf",] + # fallbacks ["/efi/boot/elilo.conf", "/elilo.conf",]) else: cfg_list = map(lambda x: (x,grub.GrubConf.Grub2ConfigFile), ["/boot/grub/grub.cfg", "/grub/grub.cfg", "/boot/grub2/grub.cfg", "/grub2/grub.cfg"]) + \ map(lambda x: (x,grub.ExtLinuxConf.ExtLinuxConfigFile), ["/boot/isolinux/isolinux.cfg", "/boot/extlinux/extlinux.conf", "/boot/extlinux.conf"]) + \ map(lambda x: (x,grub.GrubConf.GrubConfigFile), ["/boot/grub/menu.lst", "/boot/grub/grub.conf", "/grub/menu.lst", "/grub/grub.conf"]) if not fs: # set the config file and parse it for f,parser in cfg_list: self.cf = parser() self.cf.filename = fn self.cf.parse() return for f,parser in cfg_list: if fs.file_exists(f): print >>sys.stderr, "Using %s to parse %s" % (parser,f) self.cf = parser() self.cf.filename = f break if self.__dict__.get('cf', None) is None: raise RuntimeError, "couldn't find bootloader config file in the image provided." f = fs.open_file(self.cf.filename) # limit read size to avoid pathological cases buf = f.read(FS_READ_MAX) del f self.cf.parse(buf) def run(self): timeout = int(self.cf.timeout) self.selected_image = self.cf.default # If the selected (default) image doesn't exist we select the first entry if self.selected_image > len(self.cf.images): self.selected_image = 0 self.isdone = False while not self.isdone: self.run_main(timeout) timeout = -1 return self.selected_image def run_main(self, timeout = -1): def draw(): # set up the screen self.draw_main_windows() if not self.cf.hasPassword() or self.cf.hasPasswordAccess(): self.text_win.addstr(0, 0, "Use the U and D keys to select which entry is highlighted.") self.text_win.addstr(1, 0, "Press enter to boot the selected OS, 'e' to edit the") self.text_win.addstr(2, 0, "commands before booting, 'a' to modify the kernel arguments ") self.text_win.addstr(3, 0, "before booting, or 'c' for a command line.") else: self.text_win.addstr(0, 0, "Use the U and D keys to select which entry is highlighted.") self.text_win.addstr(1, 0, "Press enter to boot the selected OS or `p` to enter a") self.text_win.addstr(2, 0, "password to unlock the next set of features.") self.text_win.addch(0, 8, curses.ACS_UARROW) self.text_win.addch(0, 14, curses.ACS_DARROW) (y, x) = self.text_win.getmaxyx() self.text_win.move(y - 1, x - 1) self.text_win.noutrefresh() # now loop until we hit the timeout or get a go from the user mytime = 0 self.start_image = 0 while (timeout == -1 or mytime < int(timeout)): draw() if timeout != -1 and mytime != -1: self.screen.addstr(20, 5, "Will boot selected entry in %2d seconds" %(int(timeout) - mytime)) else: self.screen.addstr(20, 5, " " * 80) self.fill_entry_list() curses.doupdate() c = self.screen.getch() if c == -1: # Timed out waiting for a keypress if mytime != -1: mytime += 1 if mytime >= int(timeout): self.isdone = True break else: # received a keypress: stop the timer mytime = -1 self.screen.timeout(-1) # handle keypresses if c == ord('c') and self.cf.hasPasswordAccess(): self.command_line_mode() break elif c == ord('a') and self.cf.hasPasswordAccess(): # find the kernel line, edit it and then boot img = self.cf.images[self.selected_image] for line in img.lines: if line.startswith("kernel") or line.startswith("linux"): l = self.edit_line(line) if l is not None: img.set_from_line(l, replace = True) self.isdone = True break break elif c == ord('e') and self.cf.hasPasswordAccess(): img = self.cf.images[self.selected_image] self.edit_entry(img) break elif c == ord('p') and self.cf.hasPassword(): self.text_win.addstr(6, 1, "Password: ") pwd = self.text_win.getstr(6, 8) if not self.cf.checkPassword(pwd): self.text_win.addstr(6, 1, "Password: ") if self.cf.passExc is not None: self.text_win.addstr(7, 0, "Exception: %s" % self.cf.passExc) else: self.text_win.addstr(7, 0, "Failed!") self.cf.setPasswordAccess( False ) else: self.cf.setPasswordAccess( True ) break elif c in (curses.KEY_ENTER, ord('\n'), ord('\r')): self.isdone = True break elif c == curses.KEY_UP: self.selected_image -= 1 elif c == curses.KEY_DOWN: self.selected_image += 1 # elif c in (ord('q'), 27): # 27 == esc # self.selected_image = -1 # self.isdone = True # break # bound at the top and bottom if self.selected_image < 0: self.selected_image = 0 elif self.selected_image >= len(self.cf.images): self.selected_image = len(self.cf.images) - 1 def get_entry_idx(cf, entry): # first, see if the given entry is numeric try: idx = string.atoi(entry) return idx except ValueError: pass # it's not, now check the labels for a match for i in range(len(cf.images)): if entry == cf.images[i].title: return i return None def run_grub(file, entry, fs, cfg_args): global g global sel def run_main(scr, *args): global sel global g sel = g.run() g = Grub(file, fs) if list_entries: for i in range(len(g.cf.images)): img = g.cf.images[i] print "title: %s" % img.title print " root: %s" % img.root print " kernel: %s" % img.kernel[1] print " args: %s" % img.args print " initrd: %s" % img.initrd[1] if interactive and not list_entries: curses.wrapper(run_main) else: sel = g.cf.default # set the entry to boot as requested if entry is not None: idx = get_entry_idx(g.cf, entry) if idx is not None and idx >= 0 and idx < len(g.cf.images): sel = idx if sel == -1: print "No kernel image selected!" sys.exit(1) try: img = g.cf.images[sel] except IndexError: img = g.cf.images[0] grubcfg = { "kernel": None, "ramdisk": None, "args": "" } grubcfg["kernel"] = img.kernel[1] if img.initrd: grubcfg["ramdisk"] = img.initrd[1] if img.args: grubcfg["args"] += img.args if cfg_args: grubcfg["args"] += " " + cfg_args return grubcfg def supports64bitPVguest(): xc = xen.lowlevel.xc.xc() caps = xc.xeninfo()['xen_caps'].split(" ") for cap in caps: if cap == "xen-3.0-x86_64": return True return False # If nothing has been specified, look for a Solaris domU. If found, perform the # necessary tweaks. def sniff_solaris(fs, cfg): if not fs.file_exists("/platform/i86xpv/kernel/unix") and \ not fs.file_exists("/platform/i86xpv/kernel/amd64/unix"): return cfg if not cfg["kernel"]: if supports64bitPVguest() and \ fs.file_exists("/platform/i86xpv/kernel/amd64/unix"): cfg["kernel"] = "/platform/i86xpv/kernel/amd64/unix" cfg["ramdisk"] = "/platform/i86pc/amd64/boot_archive" elif fs.file_exists("/platform/i86xpv/kernel/unix"): cfg["kernel"] = "/platform/i86xpv/kernel/unix" cfg["ramdisk"] = "/platform/i86pc/boot_archive" else: return cfg # Unpleasant. Typically we'll have 'root=foo -k' or 'root=foo /kernel -k', # and we need to maintain Xen properties (root= and ip=) and the kernel # before any user args. xenargs = "" userargs = "" if not cfg["args"]: cfg["args"] = cfg["kernel"] else: for arg in cfg["args"].split(): if re.match("^root=", arg) or re.match("^ip=", arg): xenargs += arg + " " elif arg != cfg["kernel"]: userargs += arg + " " cfg["args"] = xenargs + " " + cfg["kernel"] + " " + userargs return cfg def sniff_netware(fs, cfg): if not fs.file_exists("/nwserver/xnloader.sys"): return cfg if not cfg["kernel"]: cfg["kernel"] = "/nwserver/xnloader.sys" return cfg def format_sxp(kernel, ramdisk, args): s = "linux (kernel %s)" % kernel if ramdisk: s += "(ramdisk %s)" % ramdisk if args: s += "(args \"%s\")" % args return s def format_simple(kernel, ramdisk, args, sep): s = ("kernel %s" % kernel) + sep if ramdisk: s += ("ramdisk %s" % ramdisk) + sep if args: s += ("args %s" % args) + sep s += sep return s if __name__ == "__main__": sel = None def usage(): print >> sys.stderr, "Usage: %s [-q|--quiet] [-i|--interactive] [-l|--list-entries] [-n|--not-really] [--output=] [--kernel=] [--ramdisk=] [--args=] [--entry=] [--output-directory=] [--output-format=sxp|simple|simple0] [--offset=] " %(sys.argv[0],) def copy_from_image(fs, file_to_read, file_type, output_directory, not_really): if not_really: if fs.file_exists(file_to_read): return "<%s:%s>" % (file_type, file_to_read) else: sys.exit("The requested %s file does not exist" % file_type) try: datafile = fs.open_file(file_to_read) except Exception, e: print >>sys.stderr, e sys.exit("Error opening %s in guest" % file_to_read) (tfd, ret) = tempfile.mkstemp(prefix="boot_"+file_type+".", dir=output_directory) dataoff = 0 while True: data = datafile.read(FS_READ_MAX, dataoff) if len(data) == 0: os.close(tfd) del datafile return ret try: os.write(tfd, data) except Exception, e: print >>sys.stderr, e os.close(tfd) os.unlink(ret) del datafile sys.exit("Error writing temporary copy of "+file_type) dataoff += len(data) try: opts, args = getopt.gnu_getopt(sys.argv[1:], 'qilnh::', ["quiet", "interactive", "list-entries", "not-really", "help", "output=", "output-format=", "output-directory=", "offset=", "entry=", "kernel=", "ramdisk=", "args=", "isconfig", "debug"]) except getopt.GetoptError: usage() sys.exit(1) if len(args) < 1: usage() sys.exit(1) file = args[0] output = None entry = None interactive = True list_entries = False isconfig = False part_offs = None debug = False not_really = False output_format = "sxp" output_directory = "/var/run/xend/boot" # what was passed in incfg = { "kernel": None, "ramdisk": None, "args": "" } # what grub or sniffing chose chosencfg = { "kernel": None, "ramdisk": None, "args": None } # what to boot bootcfg = { "kernel": None, "ramdisk": None, "args": None } for o, a in opts: if o in ("-q", "--quiet"): interactive = False elif o in ("-i", "--interactive"): interactive = True elif o in ("-l", "--list-entries"): list_entries = True elif o in ("-n", "--not-really"): not_really = True elif o in ("-h", "--help"): usage() sys.exit() elif o in ("--output",): output = a elif o in ("--kernel",): incfg["kernel"] = a elif o in ("--ramdisk",): incfg["ramdisk"] = a elif o in ("--args",): incfg["args"] = a elif o in ("--offset",): try: part_offs = [ int(a) ] except ValueError: print "offset value must be an integer" usage() sys.exit(1) elif o in ("--entry",): entry = a # specifying the entry to boot implies non-interactive interactive = False elif o in ("--isconfig",): isconfig = True elif o in ("--debug",): debug = True elif o in ("--output-format",): if a not in ["sxp", "simple", "simple0"]: print "unknown output format %s" % a usage() sys.exit(1) output_format = a elif o in ("--output-directory",): output_directory = a if debug: logging.basicConfig(level=logging.DEBUG) if output is None or output == "-": fd = sys.stdout.fileno() else: fd = os.open(output, os.O_WRONLY) # debug if isconfig: chosencfg = run_grub(file, entry, fs, incfg["args"]) print " kernel: %s" % chosencfg["kernel"] if chosencfg["ramdisk"]: print " initrd: %s" % chosencfg["ramdisk"] print " args: %s" % chosencfg["args"] sys.exit(0) # if boot filesystem is set then pass to fsimage.open bootfsargs = '"%s"' % incfg["args"] bootfsgroup = re.findall('zfs-bootfs=(.*?)[\s\,\"]', bootfsargs) if bootfsgroup: bootfsoptions = bootfsgroup[0] else: bootfsoptions = "" # get list of offsets into file which start partitions if part_offs is None: part_offs = get_partition_offsets(file) for offset in part_offs: try: fs = fsimage.open(file, offset, bootfsoptions) chosencfg = sniff_solaris(fs, incfg) if not chosencfg["kernel"]: chosencfg = sniff_netware(fs, incfg) if not chosencfg["kernel"]: chosencfg = run_grub(file, entry, fs, incfg["args"]) # Break as soon as we've found the kernel so that we continue # to use this fsimage object if chosencfg["kernel"]: break fs = None except: # IOErrors raised by fsimage.open # RuntimeErrors raised by run_grub if no menu.lst present if debug: traceback.print_exc() fs = None continue if list_entries: sys.exit(0) # Did looping through partitions find us a kernel? if not fs: raise RuntimeError, "Unable to find partition containing kernel" bootcfg["kernel"] = copy_from_image(fs, chosencfg["kernel"], "kernel", output_directory, not_really) if chosencfg["ramdisk"]: try: bootcfg["ramdisk"] = copy_from_image(fs, chosencfg["ramdisk"], "ramdisk", output_directory, not_really) except: if not not_really: os.unlink(bootcfg["kernel"]) raise else: initrd = None args = None if chosencfg["args"]: zfsinfo = fsimage.getbootstring(fs) if zfsinfo is not None: e = re.compile("zfs-bootfs=[\w\-\.\:@/]+" ) (chosencfg["args"],count) = e.subn(zfsinfo, chosencfg["args"]) if count == 0: chosencfg["args"] += " -B %s" % zfsinfo args = chosencfg["args"] if output_format == "sxp": ostring = format_sxp(bootcfg["kernel"], bootcfg["ramdisk"], args) elif output_format == "simple": ostring = format_simple(bootcfg["kernel"], bootcfg["ramdisk"], args, "\n") elif output_format == "simple0": ostring = format_simple(bootcfg["kernel"], bootcfg["ramdisk"], args, "\0") sys.stdout.flush() os.write(fd, ostring)