diff options
author | Baruch Sterin <baruchs@gmail.com> | 2013-01-30 23:50:53 -0800 |
---|---|---|
committer | Baruch Sterin <baruchs@gmail.com> | 2013-01-30 23:50:53 -0800 |
commit | e6be7ddfe69f74c658d04f822724bfdcf39ee412 (patch) | |
tree | 325a8cce60e6b90cb118651d098e66ad96f19fb5 | |
parent | e9286513fd615fac7f67bc1e48e5fe26b9684086 (diff) | |
download | abc-e6be7ddfe69f74c658d04f822724bfdcf39ee412.tar.gz abc-e6be7ddfe69f74c658d04f822724bfdcf39ee412.tar.bz2 abc-e6be7ddfe69f74c658d04f822724bfdcf39ee412.zip |
pyabc: allow returning large result from sub processes
-rw-r--r-- | src/python/pyabc.i | 22 | ||||
-rw-r--r-- | src/python/pyabc_split.py | 72 |
2 files changed, 83 insertions, 11 deletions
diff --git a/src/python/pyabc.i b/src/python/pyabc.i index 0e1f2c3d..74c21cc6 100644 --- a/src/python/pyabc.i +++ b/src/python/pyabc.i @@ -755,6 +755,16 @@ def _retry_read(fd): continue raise +def _retry_os_read(fd): + + while True: + try: + return os.read(fd, 1) + except OSError as e: + if e.errno == errno.EINTR: + continue + raise + def _retry_wait(): while True: @@ -817,6 +827,7 @@ def _child_wait_thread_func(fd): _active_pids.remove(pid) _terminated_pids[pid] = status + os.write(_wait_fd_write, "1") _terminated_pids_cond.notifyAll() _sigint_pipe_read_fd = -1 @@ -825,8 +836,14 @@ _sigint_pipe_write_fd = -1 _sigchld_pipe_read_fd = -1 _sigchld_pipe_write_fd = -1 +wait_fd = -1 +_wait_fd_write = -1 + def _start_threads(): + global wait_fd, _wait_fd_write + wait_fd, _wait_fd_write = os.pipe() + global _sigint_pipe_read_fd, _sigint_pipe_write_fd _sigint_pipe_read_fd, _sigint_pipe_write_fd = os.pipe() @@ -865,6 +882,9 @@ def after_fork(): _close_on_fork = [] + os.close(wait_fd) + os.close(_wait_fd_write) + os.close(_sigint_pipe_read_fd) os.close(_sigint_pipe_write_fd) @@ -934,6 +954,7 @@ def _waitpid(pid, options=0): with _active_lock: if pid in _terminated_pids: + _retry_os_read(wait_fd) status = _terminated_pids[pid] del _terminated_pids[pid] return pid, status @@ -950,6 +971,7 @@ def _wait(options=0): with _active_lock: for pid, status in _terminated_pids.iteritems(): + _retry_os_read(wait_fd) del _terminated_pids[pid] return pid, status diff --git a/src/python/pyabc_split.py b/src/python/pyabc_split.py index b889d857..bbdcb249 100644 --- a/src/python/pyabc_split.py +++ b/src/python/pyabc_split.py @@ -82,20 +82,36 @@ Author: Baruch Sterin <sterin@berkeley.edu> """ import os +import select +import fcntl import errno import sys -import pickle +import cPickle as pickle import signal +import cStringIO + from contextlib import contextmanager import pyabc +def _retry_select(rlist): + while True: + try: + rrdy,_,_ = select.select(rlist,[],[]) + if rrdy: + return rrdy + except select.error as e: + if e[0] == errno.EINTR: + continue + raise + class _splitter(object): def __init__(self, funcs): self.funcs = funcs self.pids = [] self.fds = {} + self.buffers = {} self.results = {} def is_done(self): @@ -117,6 +133,7 @@ class _splitter(object): self.results[pid] = None self.fds = {} + self.buffers = {} def child( self, fdw, f): # call function @@ -133,6 +150,9 @@ class _splitter(object): # create a pipe to communicate with the child process pr,pw = os.pipe() + # set pr to be non-blocking + fcntl.fcntl(pr, fcntl.F_SETFL, os.O_NONBLOCK) + parentpid = os.getpid() rc = 1 @@ -162,25 +182,55 @@ class _splitter(object): pid, fd = self.fork_one(f) self.pids.append(pid) self.fds[pid] = (i,fd) + self.buffers[fd] = cStringIO.StringIO() + + def communicate(self): + + rlist = [ fd for _, (_,fd) in self.fds.iteritems() ] + rlist.append(pyabc.wait_fd) + + stop = False + + while not stop: + + rrdy = _retry_select( rlist ) + + for fd in rrdy: + + if fd == pyabc.wait_fd: + stop = True + continue + + self.buffers[fd].write( os.read(fd, 16384) ) def get_next_result(self): + + # read from the pipes as needed, while waiting for the next child process to terminate + self.communicate() # wait for the next child process to terminate pid, rc = os.wait() assert pid in self.fds - # retrieve the pipe file descriptor1 + # retrieve the pipe file descriptor i, fd = self.fds[pid] del self.fds[pid] - - assert pid not in self.fds - - # read result from file - with os.fdopen( fd, "r" ) as fin: - try: - return (i,pickle.load(fin)) - except EOFError, pickle.UnpicklingError: - return (i, None) + + # retrieve the buffer + buffer = self.buffers[fd] + del self.buffers[fd] + + # fill the buffer + while True: + s = os.read(fd, 16384) + if not s: + break + buffer.write(s) + + try: + return (i, pickle.loads(buffer.getvalue())) + except EOFError, pickle.UnpicklingError: + return (i, None) @contextmanager def _splitter_wrapper(funcs): |