aboutsummaryrefslogtreecommitdiffstats
path: root/tools/libxl/libxl_event.c
diff options
context:
space:
mode:
authorIan Jackson <ian.jackson@eu.citrix.com>2012-01-27 17:01:23 +0000
committerIan Jackson <ian.jackson@eu.citrix.com>2012-01-27 17:01:23 +0000
commit62a394f4eb7c8b4f5e98eaed30c4c522d8249162 (patch)
tree52729346b063c9a438bd48224873e76e5652c3ae /tools/libxl/libxl_event.c
parent705fdc03cd7d78643c4dece8564d0bba04eb2c0f (diff)
downloadxen-62a394f4eb7c8b4f5e98eaed30c4c522d8249162.tar.gz
xen-62a394f4eb7c8b4f5e98eaed30c4c522d8249162.tar.bz2
xen-62a394f4eb7c8b4f5e98eaed30c4c522d8249162.zip
libxl: Permit multithreaded event waiting
Previously, the context would be locked whenever we were waiting in libxl's own call to poll (waiting for operating system events). This would mean that multiple simultaneous calls to libxl_event_wait in different threads with different parameters would not work properly. If we simply unlock the context, it would be possible for another thread to discover the occurrence of the event we were waiting for, without us even waking up, and we would remain in poll. So we need a way to wake up other threads: a pipe, one for each thread in poll. We also need to move some variables from globals in the ctx to be per-polling-thread. Signed-off-by: Ian Jackson <ian.jackson@eu.citrix.com> Acked-by: Ian Campbell <ian.campbell@citrix.com> Committed-by: Ian Jackson <Ian.Jackson@eu.citrix.com>
Diffstat (limited to 'tools/libxl/libxl_event.c')
-rw-r--r--tools/libxl/libxl_event.c196
1 files changed, 158 insertions, 38 deletions
diff --git a/tools/libxl/libxl_event.c b/tools/libxl/libxl_event.c
index 69ad318a2d..73dfd9d90a 100644
--- a/tools/libxl/libxl_event.c
+++ b/tools/libxl/libxl_event.c
@@ -510,9 +510,9 @@ void libxl__ev_xswatch_deregister(libxl__gc *gc, libxl__ev_xswatch *w)
* osevent poll
*/
-static int beforepoll_internal(libxl__gc *gc, int *nfds_io,
- struct pollfd *fds, int *timeout_upd,
- struct timeval now)
+static int beforepoll_internal(libxl__gc *gc, libxl__poller *poller,
+ int *nfds_io, struct pollfd *fds,
+ int *timeout_upd, struct timeval now)
{
libxl__ev_fd *efd;
int rc;
@@ -534,7 +534,7 @@ static int beforepoll_internal(libxl__gc *gc, int *nfds_io,
* not to mess with fd_rindex.
*/
- int maxfd = 0;
+ int maxfd = poller->wakeup_pipe[0] + 1;
LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
if (!efd->events)
continue;
@@ -542,30 +542,39 @@ static int beforepoll_internal(libxl__gc *gc, int *nfds_io,
maxfd = efd->fd + 1;
}
/* make sure our array is as big as *nfds_io */
- if (CTX->fd_rindex_allocd < maxfd) {
+ if (poller->fd_rindex_allocd < maxfd) {
assert(maxfd < INT_MAX / sizeof(int) / 2);
- int *newarray = realloc(CTX->fd_rindex, sizeof(int) * maxfd);
+ int *newarray = realloc(poller->fd_rindex, sizeof(int) * maxfd);
if (!newarray) { rc = ERROR_NOMEM; goto out; }
- memset(newarray + CTX->fd_rindex_allocd, 0,
- sizeof(int) * (maxfd - CTX->fd_rindex_allocd));
- CTX->fd_rindex = newarray;
- CTX->fd_rindex_allocd = maxfd;
+ memset(newarray + poller->fd_rindex_allocd, 0,
+ sizeof(int) * (maxfd - poller->fd_rindex_allocd));
+ poller->fd_rindex = newarray;
+ poller->fd_rindex_allocd = maxfd;
}
}
int used = 0;
- LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
- if (!efd->events)
- continue;
- if (used < *nfds_io) {
- fds[used].fd = efd->fd;
- fds[used].events = efd->events;
- fds[used].revents = 0;
- assert(efd->fd < CTX->fd_rindex_allocd);
- CTX->fd_rindex[efd->fd] = used;
- }
- used++;
- }
+
+#define REQUIRE_FD(req_fd, req_events, efd) do{ \
+ if ((req_events)) { \
+ if (used < *nfds_io) { \
+ fds[used].fd = (req_fd); \
+ fds[used].events = (req_events); \
+ fds[used].revents = 0; \
+ assert((req_fd) < poller->fd_rindex_allocd); \
+ poller->fd_rindex[(req_fd)] = used; \
+ } \
+ used++; \
+ } \
+ }while(0)
+
+ LIBXL_LIST_FOREACH(efd, &CTX->efds, entry)
+ REQUIRE_FD(efd->fd, efd->events, efd);
+
+ REQUIRE_FD(poller->wakeup_pipe[0], POLLIN, 0);
+
+#undef REQUIRE_FD
+
rc = used <= *nfds_io ? 0 : ERROR_BUFFERFULL;
*nfds_io = used;
@@ -599,22 +608,23 @@ int libxl_osevent_beforepoll(libxl_ctx *ctx, int *nfds_io,
{
EGC_INIT(ctx);
CTX_LOCK;
- int rc = beforepoll_internal(gc, nfds_io, fds, timeout_upd, now);
+ int rc = beforepoll_internal(gc, &ctx->poller_app,
+ nfds_io, fds, timeout_upd, now);
CTX_UNLOCK;
EGC_FREE;
return rc;
}
-static int afterpoll_check_fd(libxl_ctx *ctx,
+static int afterpoll_check_fd(libxl__poller *poller,
const struct pollfd *fds, int nfds,
int fd, int events)
/* returns mask of events which were requested and occurred */
{
- if (fd >= ctx->fd_rindex_allocd)
+ if (fd >= poller->fd_rindex_allocd)
/* added after we went into poll, have to try again */
return 0;
- int slot = ctx->fd_rindex[fd];
+ int slot = poller->fd_rindex[fd];
if (slot >= nfds)
/* stale slot entry; again, added afterwards */
@@ -630,22 +640,31 @@ static int afterpoll_check_fd(libxl_ctx *ctx,
return revents;
}
-static void afterpoll_internal(libxl__egc *egc,
+static void afterpoll_internal(libxl__egc *egc, libxl__poller *poller,
int nfds, const struct pollfd *fds,
struct timeval now)
{
EGC_GC;
libxl__ev_fd *efd;
+
LIBXL_LIST_FOREACH(efd, &CTX->efds, entry) {
if (!efd->events)
continue;
- int revents = afterpoll_check_fd(CTX,fds,nfds, efd->fd,efd->events);
+ int revents = afterpoll_check_fd(poller,fds,nfds, efd->fd,efd->events);
if (revents)
efd->func(egc, efd, efd->fd, efd->events, revents);
}
+ if (afterpoll_check_fd(poller,fds,nfds, poller->wakeup_pipe[0],POLLIN)) {
+ char buf[256];
+ int r = read(poller->wakeup_pipe[0], buf, sizeof(buf));
+ if (r < 0)
+ if (errno != EINTR && errno != EWOULDBLOCK)
+ LIBXL__EVENT_DISASTER(egc, "read wakeup", errno, 0);
+ }
+
for (;;) {
libxl__ev_time *etime = LIBXL_TAILQ_FIRST(&CTX->etimes);
if (!etime)
@@ -667,7 +686,7 @@ void libxl_osevent_afterpoll(libxl_ctx *ctx, int nfds, const struct pollfd *fds,
{
EGC_INIT(ctx);
CTX_LOCK;
- afterpoll_internal(egc, nfds, fds, now);
+ afterpoll_internal(egc, &ctx->poller_app, nfds, fds, now);
CTX_UNLOCK;
EGC_FREE;
}
@@ -790,7 +809,10 @@ void libxl__event_occurred(libxl__egc *egc, libxl_event *event)
LIBXL_TAILQ_INSERT_TAIL(&egc->occurred_for_callback, event, link);
return;
} else {
+ libxl__poller *poller;
LIBXL_TAILQ_INSERT_TAIL(&CTX->occurred, event, link);
+ LIBXL_LIST_FOREACH(poller, &CTX->pollers_event, entry)
+ libxl__poller_wakeup(egc, poller);
}
}
@@ -858,7 +880,94 @@ int libxl_event_check(libxl_ctx *ctx, libxl_event **event_r,
return rc;
}
-static int eventloop_iteration(libxl__egc *egc) {
+/*
+ * Manipulation of pollers
+ */
+
+int libxl__poller_init(libxl_ctx *ctx, libxl__poller *p)
+{
+ int r, rc;
+ p->fd_polls = 0;
+ p->fd_rindex = 0;
+
+ r = pipe(p->wakeup_pipe);
+ if (r) {
+ LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "cannot create poller pipe");
+ rc = ERROR_FAIL;
+ goto out;
+ }
+
+ rc = libxl_fd_set_nonblock(ctx, p->wakeup_pipe[0], 1);
+ if (rc) goto out;
+
+ rc = libxl_fd_set_nonblock(ctx, p->wakeup_pipe[1], 1);
+ if (rc) goto out;
+
+ return 0;
+
+ out:
+ libxl__poller_dispose(p);
+ return rc;
+}
+
+void libxl__poller_dispose(libxl__poller *p)
+{
+ if (p->wakeup_pipe[1] > 0) close(p->wakeup_pipe[1]);
+ if (p->wakeup_pipe[0] > 0) close(p->wakeup_pipe[0]);
+ free(p->fd_polls);
+ free(p->fd_rindex);
+}
+
+libxl__poller *libxl__poller_get(libxl_ctx *ctx)
+{
+ /* must be called with ctx locked */
+ int rc;
+
+ libxl__poller *p = LIBXL_LIST_FIRST(&ctx->pollers_idle);
+ if (p)
+ return p;
+
+ p = malloc(sizeof(*p));
+ if (!p) {
+ LIBXL__LOG_ERRNO(ctx, LIBXL__LOG_ERROR, "cannot allocate poller");
+ return 0;
+ }
+ memset(p, 0, sizeof(*p));
+
+ rc = libxl__poller_init(ctx, p);
+ if (rc) return NULL;
+
+ return p;
+}
+
+void libxl__poller_put(libxl_ctx *ctx, libxl__poller *p)
+{
+ LIBXL_LIST_INSERT_HEAD(&ctx->pollers_idle, p, entry);
+}
+
+void libxl__poller_wakeup(libxl__egc *egc, libxl__poller *p)
+{
+ static const char buf[1] = "";
+
+ for (;;) {
+ int r = write(p->wakeup_pipe[1], buf, 1);
+ if (r==1) return;
+ assert(r==-1);
+ if (errno == EINTR) continue;
+ if (errno == EWOULDBLOCK) return;
+ LIBXL__EVENT_DISASTER(egc, "cannot poke watch pipe", errno, 0);
+ return;
+ }
+}
+
+/*
+ * Main event loop iteration
+ */
+
+static int eventloop_iteration(libxl__egc *egc, libxl__poller *poller) {
+ /* The CTX must be locked EXACTLY ONCE so that this function
+ * can unlock it when it polls.
+ */
EGC_GC;
int rc;
struct timeval now;
@@ -871,23 +980,27 @@ static int eventloop_iteration(libxl__egc *egc) {
int timeout;
for (;;) {
- int nfds = CTX->fd_polls_allocd;
+ int nfds = poller->fd_polls_allocd;
timeout = -1;
- rc = beforepoll_internal(gc, &nfds, CTX->fd_polls, &timeout, now);
+ rc = beforepoll_internal(gc, poller, &nfds, poller->fd_polls,
+ &timeout, now);
if (!rc) break;
if (rc != ERROR_BUFFERFULL) goto out;
struct pollfd *newarray =
(nfds > INT_MAX / sizeof(struct pollfd) / 2) ? 0 :
- realloc(CTX->fd_polls, sizeof(*newarray) * nfds);
+ realloc(poller->fd_polls, sizeof(*newarray) * nfds);
if (!newarray) { rc = ERROR_NOMEM; goto out; }
- CTX->fd_polls = newarray;
- CTX->fd_polls_allocd = nfds;
+ poller->fd_polls = newarray;
+ poller->fd_polls_allocd = nfds;
}
- rc = poll(CTX->fd_polls, CTX->fd_polls_allocd, timeout);
+ CTX_UNLOCK;
+ rc = poll(poller->fd_polls, poller->fd_polls_allocd, timeout);
+ CTX_LOCK;
+
if (rc < 0) {
if (errno == EINTR)
return 0; /* will go round again if caller requires */
@@ -900,7 +1013,8 @@ static int eventloop_iteration(libxl__egc *egc) {
rc = libxl__gettimeofday(gc, &now);
if (rc) goto out;
- afterpoll_internal(egc, CTX->fd_polls_allocd, CTX->fd_polls, now);
+ afterpoll_internal(egc, poller,
+ poller->fd_polls_allocd, poller->fd_polls, now);
CTX_UNLOCK;
@@ -914,15 +1028,19 @@ int libxl_event_wait(libxl_ctx *ctx, libxl_event **event_r,
libxl_event_predicate *pred, void *pred_user)
{
int rc;
+ libxl__poller *poller = NULL;
EGC_INIT(ctx);
CTX_LOCK;
+ poller = libxl__poller_get(ctx);
+ if (!poller) { rc = ERROR_FAIL; goto out; }
+
for (;;) {
rc = event_check_internal(egc, event_r, typemask, pred, pred_user);
if (rc != ERROR_NOT_READY) goto out;
- rc = eventloop_iteration(egc);
+ rc = eventloop_iteration(egc, poller);
if (rc) goto out;
/* we unlock and cleanup the egc each time we go through this loop,
@@ -936,6 +1054,8 @@ int libxl_event_wait(libxl_ctx *ctx, libxl_event **event_r,
}
out:
+ libxl__poller_put(ctx, poller);
+
CTX_UNLOCK;
EGC_FREE;
return rc;