From cbc718d701c26e7ef0e243a44ed420bacbe0056a Mon Sep 17 00:00:00 2001 From: Baruch Sterin Date: Tue, 8 Oct 2013 12:04:07 -0700 Subject: pyabc changes for HWMCC13 --- src/python/module.make | 15 +++++++- src/python/package.py | 65 ++++++++++++++++++++++++---------- src/python/pyabc.i | 9 ++++- src/python/pyabc_split.py | 90 +++++++++++++++++++++++++++++++++-------------- src/python/redirect.py | 17 +++++++++ 5 files changed, 150 insertions(+), 46 deletions(-) (limited to 'src/python') diff --git a/src/python/module.make b/src/python/module.make index 09e2de3f..51be60af 100644 --- a/src/python/module.make +++ b/src/python/module.make @@ -34,6 +34,12 @@ ifdef ABC_PYTHON $(ABC_PYTHON_FILES_PREFIX)/dist \ pyabc.tgz + ABC_PYABC_DIR ?= pyabc + ABC_PYABC_TGZ ?= pyabc.tgz + ABC_PYABC_EXTRA_BIN ?= + ABC_PYABC_EXTRA_LIB ?= + + %_wrap.c %.py : %.i $(ABC_SWIG) -python -outdir $(>sys.stderr, 'exception while trying to kill pid=%d: '%pid, e raise - - # wait for termination and update result - for pid, _ in self.fds.iteritems(): + + # wait for termination and update result + for pid in pids: os.waitpid( pid, 0 ) - self.results[pid] = None + + def kill(self, ids): + + self._kill( [ self.pids[i] for i in ids ] ) - self.fds = {} - self.buffers = {} + def cleanup(self): + self._kill( self.fds.keys() ) def child( self, fdw, f): + # call function - res = f() + try: + res = f() + except: + traceback.print_exc() + raise # write return value into pipe with os.fdopen( fdw, "w" ) as fout: @@ -145,7 +168,7 @@ class _splitter(object): return 0 - def fork_one(self, f): + def _fork_one(self, f): # create a pipe to communicate with the child process pr,pw = os.pipe() @@ -177,13 +200,17 @@ class _splitter(object): if os.getpid() != parentpid: os._exit(rc) - def fork_all(self): - for i,f in enumerate(self.funcs): - pid, fd = self.fork_one(f) - self.pids.append(pid) - self.fds[pid] = (i,fd) - self.buffers[fd] = cStringIO.StringIO() + def fork_one(self, func): + pid, fd = self._fork_one(func) + i = len(self.pids) + self.pids.append(pid) + self.fds[pid] = (i, fd) + self.buffers[fd] = cStringIO.StringIO() + return i + def fork_all(self, funcs): + return [ self.fork_one(f) for f in funcs ] + def communicate(self): rlist = [ fd for _, (_,fd) in self.fds.iteritems() ] @@ -216,6 +243,9 @@ class _splitter(object): i, fd = self.fds[pid] del self.fds[pid] + # remove the pid + self.pids[i] = -1 + # retrieve the buffer buffer = self.buffers[fd] del self.buffers[fd] @@ -226,29 +256,37 @@ class _splitter(object): if not s: break buffer.write(s) + + os.close(fd) try: return (i, pickle.loads(buffer.getvalue())) except EOFError, pickle.UnpicklingError: return (i, None) + + def __iter__(self): + def iterator(): + while not self.is_done(): + yield self.get_next_result() + return iterator() @contextmanager -def _splitter_wrapper(funcs): +def make_splitter(): # ensure cleanup of child processes - s = _splitter(funcs) + s = _splitter() try: yield s finally: s.cleanup() - + def split_all_full(funcs): # provide an iterator for child process result - with _splitter_wrapper(funcs) as s: + with make_splitter() as s: - s.fork_all() + s.fork_all(funcs) - while not s.is_done(): - yield s.get_next_result() + for res in s: + yield res def defer(f): return lambda *args, **kwargs: lambda : f(*args,**kwargs) diff --git a/src/python/redirect.py b/src/python/redirect.py index 498fe150..0afccb77 100644 --- a/src/python/redirect.py +++ b/src/python/redirect.py @@ -24,6 +24,23 @@ def _dup( f ): yield fd os.close(fd) +@contextmanager +def save_stdout( src = sys.stdout ): + """ + Redirect + """ + fd = os.dup( src.fileno() ) + own = True + + try: + with os.fdopen( fd, "w", 0) as f: + own = False + yield f + except: + if own: + os.close(fd) + raise + @contextmanager def redirect(dst = null_file, src = sys.stdout): -- cgit v1.2.3