From e6be7ddfe69f74c658d04f822724bfdcf39ee412 Mon Sep 17 00:00:00 2001 From: Baruch Sterin Date: Wed, 30 Jan 2013 23:50:53 -0800 Subject: pyabc: allow returning large result from sub processes --- src/python/pyabc.i | 22 +++++++++++++++ src/python/pyabc_split.py | 72 +++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 83 insertions(+), 11 deletions(-) (limited to 'src/python') diff --git a/src/python/pyabc.i b/src/python/pyabc.i index 0e1f2c3d..74c21cc6 100644 --- a/src/python/pyabc.i +++ b/src/python/pyabc.i @@ -755,6 +755,16 @@ def _retry_read(fd): continue raise +def _retry_os_read(fd): + + while True: + try: + return os.read(fd, 1) + except OSError as e: + if e.errno == errno.EINTR: + continue + raise + def _retry_wait(): while True: @@ -817,6 +827,7 @@ def _child_wait_thread_func(fd): _active_pids.remove(pid) _terminated_pids[pid] = status + os.write(_wait_fd_write, "1") _terminated_pids_cond.notifyAll() _sigint_pipe_read_fd = -1 @@ -825,8 +836,14 @@ _sigint_pipe_write_fd = -1 _sigchld_pipe_read_fd = -1 _sigchld_pipe_write_fd = -1 +wait_fd = -1 +_wait_fd_write = -1 + def _start_threads(): + global wait_fd, _wait_fd_write + wait_fd, _wait_fd_write = os.pipe() + global _sigint_pipe_read_fd, _sigint_pipe_write_fd _sigint_pipe_read_fd, _sigint_pipe_write_fd = os.pipe() @@ -865,6 +882,9 @@ def after_fork(): _close_on_fork = [] + os.close(wait_fd) + os.close(_wait_fd_write) + os.close(_sigint_pipe_read_fd) os.close(_sigint_pipe_write_fd) @@ -934,6 +954,7 @@ def _waitpid(pid, options=0): with _active_lock: if pid in _terminated_pids: + _retry_os_read(wait_fd) status = _terminated_pids[pid] del _terminated_pids[pid] return pid, status @@ -950,6 +971,7 @@ def _wait(options=0): with _active_lock: for pid, status in _terminated_pids.iteritems(): + _retry_os_read(wait_fd) del _terminated_pids[pid] return pid, status diff --git a/src/python/pyabc_split.py b/src/python/pyabc_split.py index b889d857..bbdcb249 100644 --- a/src/python/pyabc_split.py +++ b/src/python/pyabc_split.py @@ -82,20 +82,36 @@ Author: Baruch Sterin """ import os +import select +import fcntl import errno import sys -import pickle +import cPickle as pickle import signal +import cStringIO + from contextlib import contextmanager import pyabc +def _retry_select(rlist): + while True: + try: + rrdy,_,_ = select.select(rlist,[],[]) + if rrdy: + return rrdy + except select.error as e: + if e[0] == errno.EINTR: + continue + raise + class _splitter(object): def __init__(self, funcs): self.funcs = funcs self.pids = [] self.fds = {} + self.buffers = {} self.results = {} def is_done(self): @@ -117,6 +133,7 @@ class _splitter(object): self.results[pid] = None self.fds = {} + self.buffers = {} def child( self, fdw, f): # call function @@ -133,6 +150,9 @@ class _splitter(object): # create a pipe to communicate with the child process pr,pw = os.pipe() + # set pr to be non-blocking + fcntl.fcntl(pr, fcntl.F_SETFL, os.O_NONBLOCK) + parentpid = os.getpid() rc = 1 @@ -162,25 +182,55 @@ class _splitter(object): pid, fd = self.fork_one(f) self.pids.append(pid) self.fds[pid] = (i,fd) + self.buffers[fd] = cStringIO.StringIO() + + def communicate(self): + + rlist = [ fd for _, (_,fd) in self.fds.iteritems() ] + rlist.append(pyabc.wait_fd) + + stop = False + + while not stop: + + rrdy = _retry_select( rlist ) + + for fd in rrdy: + + if fd == pyabc.wait_fd: + stop = True + continue + + self.buffers[fd].write( os.read(fd, 16384) ) def get_next_result(self): + + # read from the pipes as needed, while waiting for the next child process to terminate + self.communicate() # wait for the next child process to terminate pid, rc = os.wait() assert pid in self.fds - # retrieve the pipe file descriptor1 + # retrieve the pipe file descriptor i, fd = self.fds[pid] del self.fds[pid] - - assert pid not in self.fds - - # read result from file - with os.fdopen( fd, "r" ) as fin: - try: - return (i,pickle.load(fin)) - except EOFError, pickle.UnpicklingError: - return (i, None) + + # retrieve the buffer + buffer = self.buffers[fd] + del self.buffers[fd] + + # fill the buffer + while True: + s = os.read(fd, 16384) + if not s: + break + buffer.write(s) + + try: + return (i, pickle.loads(buffer.getvalue())) + except EOFError, pickle.UnpicklingError: + return (i, None) @contextmanager def _splitter_wrapper(funcs): -- cgit v1.2.3