diff options
author | kaf24@firebug.cl.cam.ac.uk <kaf24@firebug.cl.cam.ac.uk> | 2005-10-08 19:19:27 +0100 |
---|---|---|
committer | kaf24@firebug.cl.cam.ac.uk <kaf24@firebug.cl.cam.ac.uk> | 2005-10-08 19:19:27 +0100 |
commit | 7429d6e0329522f4a7bc5e861e3edf08f44c2d5e (patch) | |
tree | d327d762b02dab236c59afd7554808634808d119 /tools/xenstore/xs.c | |
parent | 47f583a947813e2ee67cf6f8044a273f4b099661 (diff) | |
download | xen-7429d6e0329522f4a7bc5e861e3edf08f44c2d5e.tar.gz xen-7429d6e0329522f4a7bc5e861e3edf08f44c2d5e.tar.bz2 xen-7429d6e0329522f4a7bc5e861e3edf08f44c2d5e.zip |
Make libxenstore thread-safe. It also spawns an internal
thread to read messages from the comms channel.
Signed-off-by: Keir Fraser <keir@xensource.com>
Diffstat (limited to 'tools/xenstore/xs.c')
-rw-r--r-- | tools/xenstore/xs.c | 372 |
1 files changed, 272 insertions, 100 deletions
diff --git a/tools/xenstore/xs.c b/tools/xenstore/xs.c index 1b405f18f4..6dc4c4532c 100644 --- a/tools/xenstore/xs.c +++ b/tools/xenstore/xs.c @@ -32,80 +32,151 @@ #include <stdint.h> #include <errno.h> #include <sys/ioctl.h> +#include <pthread.h> #include "xs.h" +#include "list.h" #include "utils.h" -struct xs_handle -{ +struct xs_stored_msg { + struct list_head list; + struct xsd_sockmsg hdr; + char *body; +}; + +struct xs_handle { + /* Communications channel to xenstore daemon. */ int fd; + + /* + * A read thread which pulls messages off the comms channel and + * signals waiters. + */ + pthread_t read_thr; + + /* + * A list of fired watch messages, protected by a mutex. Users can + * wait on the conditional variable until a watch is pending. + */ + struct list_head watch_list; + pthread_mutex_t watch_mutex; + pthread_cond_t watch_condvar; + + /* Clients can select() on this pipe to wait for a watch to fire. */ + int watch_pipe[2]; + + /* + * A list of replies. Currently only one will ever be outstanding + * because we serialise requests. The requester can wait on the + * conditional variable for its response. + */ + struct list_head reply_list; + pthread_mutex_t reply_mutex; + pthread_cond_t reply_condvar; + + /* One request at a time. */ + pthread_mutex_t request_mutex; + + /* One transaction at a time. */ + pthread_mutex_t transaction_mutex; }; -/* Get the socket from the store daemon handle. - */ +static void *read_thread(void *arg); + int xs_fileno(struct xs_handle *h) { - return h->fd; + char c = 0; + + pthread_mutex_lock(&h->watch_mutex); + + if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) { + /* Kick things off if the watch list is already non-empty. */ + if (!list_empty(&h->watch_list)) + while (write(h->watch_pipe[1], &c, 1) != 1) + continue; + } + + pthread_mutex_unlock(&h->watch_mutex); + + return h->watch_pipe[0]; } -static struct xs_handle *get_socket(const char *connect_to) +static int get_socket(const char *connect_to) { struct sockaddr_un addr; int sock, saved_errno; - struct xs_handle *h = NULL; sock = socket(PF_UNIX, SOCK_STREAM, 0); if (sock < 0) - return NULL; + return -1; addr.sun_family = AF_UNIX; strcpy(addr.sun_path, connect_to); - if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) { - h = malloc(sizeof(*h)); - if (h) { - h->fd = sock; - return h; - } + if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) { + saved_errno = errno; + close(sock); + errno = saved_errno; + return -1; } - saved_errno = errno; - close(sock); - errno = saved_errno; - return NULL; + return sock; } -static struct xs_handle *get_dev(const char *connect_to) +static int get_dev(const char *connect_to) { - int fd, saved_errno; - struct xs_handle *h; - - fd = open(connect_to, O_RDWR); - if (fd < 0) - return NULL; - - h = malloc(sizeof(*h)); - if (h) { - h->fd = fd; - return h; - } - - saved_errno = errno; - close(fd); - errno = saved_errno; - return NULL; + return open(connect_to, O_RDWR); } static struct xs_handle *get_handle(const char *connect_to) { struct stat buf; + struct xs_handle *h = NULL; + int fd = -1, saved_errno; if (stat(connect_to, &buf) != 0) - return NULL; + goto error; if (S_ISSOCK(buf.st_mode)) - return get_socket(connect_to); + fd = get_socket(connect_to); else - return get_dev(connect_to); + fd = get_dev(connect_to); + + if (fd == -1) + goto error; + + h = malloc(sizeof(*h)); + if (h == NULL) + goto error; + + h->fd = fd; + + /* Watch pipe is allocated on demand in xs_fileno(). */ + h->watch_pipe[0] = h->watch_pipe[1] = -1; + + INIT_LIST_HEAD(&h->watch_list); + pthread_mutex_init(&h->watch_mutex, NULL); + pthread_cond_init(&h->watch_condvar, NULL); + + INIT_LIST_HEAD(&h->reply_list); + pthread_mutex_init(&h->reply_mutex, NULL); + pthread_cond_init(&h->reply_condvar, NULL); + + pthread_mutex_init(&h->request_mutex, NULL); + pthread_mutex_init(&h->transaction_mutex, NULL); + + if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) + goto error; + + return h; + + error: + saved_errno = errno; + if (h != NULL) + free(h); + if (fd != -1) + close(fd); + errno = saved_errno; + return NULL; } struct xs_handle *xs_daemon_open(void) @@ -125,8 +196,39 @@ struct xs_handle *xs_domain_open(void) void xs_daemon_close(struct xs_handle *h) { - if (h->fd >= 0) - close(h->fd); + struct xs_stored_msg *msg, *tmsg; + + pthread_mutex_lock(&h->transaction_mutex); + pthread_mutex_lock(&h->request_mutex); + pthread_mutex_lock(&h->reply_mutex); + pthread_mutex_lock(&h->watch_mutex); + + /* XXX FIXME: May leak an unpublished message buffer. */ + pthread_cancel(h->read_thr); + pthread_join(h->read_thr, NULL); + + list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) { + free(msg->body); + free(msg); + } + + list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) { + free(msg->body); + free(msg); + } + + pthread_mutex_unlock(&h->transaction_mutex); + pthread_mutex_unlock(&h->request_mutex); + pthread_mutex_unlock(&h->reply_mutex); + pthread_mutex_unlock(&h->watch_mutex); + + if (h->watch_pipe[0] != -1) { + close(h->watch_pipe[0]); + close(h->watch_pipe[1]); + } + + close(h->fd); + free(h); } @@ -169,31 +271,28 @@ static int get_error(const char *errorstring) } /* Adds extra nul terminator, because we generally (always?) hold strings. */ -static void *read_reply(int fd, enum xsd_sockmsg_type *type, unsigned int *len) +static void *read_reply( + struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len) { - struct xsd_sockmsg msg; - void *ret; - int saved_errno; + struct xs_stored_msg *msg; + char *body; - if (!read_all(fd, &msg, sizeof(msg))) - return NULL; + pthread_mutex_lock(&h->reply_mutex); + while (list_empty(&h->reply_list)) + pthread_cond_wait(&h->reply_condvar, &h->reply_mutex); + msg = list_top(&h->reply_list, struct xs_stored_msg, list); + list_del(&msg->list); + assert(list_empty(&h->reply_list)); + pthread_mutex_unlock(&h->reply_mutex); - ret = malloc(msg.len + 1); - if (!ret) - return NULL; + *type = msg->hdr.type; + if (len) + *len = msg->hdr.len; + body = msg->body; - if (!read_all(fd, ret, msg.len)) { - saved_errno = errno; - free(ret); - errno = saved_errno; - return NULL; - } + free(msg); - *type = msg.type; - if (len) - *len = msg.len; - ((char *)ret)[msg.len] = '\0'; - return ret; + return body; } /* Send message to xs, get malloc'ed reply. NULL and set errno on error. */ @@ -217,6 +316,8 @@ static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type, ignorepipe.sa_flags = 0; sigaction(SIGPIPE, &ignorepipe, &oldact); + pthread_mutex_lock(&h->request_mutex); + if (!xs_write_all(h->fd, &msg, sizeof(msg))) goto fail; @@ -224,14 +325,11 @@ static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type, if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len)) goto fail; - /* Watches can have fired before reply comes: daemon detects - * and re-transmits, so we can ignore this. */ - do { - free(ret); - ret = read_reply(h->fd, &msg.type, len); - if (!ret) - goto fail; - } while (msg.type == XS_WATCH_EVENT); + ret = read_reply(h, &msg.type, len); + if (!ret) + goto fail; + + pthread_mutex_unlock(&h->request_mutex); sigaction(SIGPIPE, &oldact, NULL); if (msg.type == XS_ERROR) { @@ -252,6 +350,7 @@ static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type, fail: /* We're in a bad state, so close fd. */ saved_errno = errno; + pthread_mutex_unlock(&h->request_mutex); sigaction(SIGPIPE, &oldact, NULL); close_fd: close(h->fd); @@ -449,39 +548,45 @@ bool xs_watch(struct xs_handle *h, const char *path, const char *token) */ char **xs_read_watch(struct xs_handle *h, unsigned int *num) { - struct xsd_sockmsg msg; - char **ret; - char *strings; + struct xs_stored_msg *msg; + char **ret, *strings, c = 0; unsigned int num_strings, i; - if (!read_all(h->fd, &msg, sizeof(msg))) - return NULL; + pthread_mutex_lock(&h->watch_mutex); - assert(msg.type == XS_WATCH_EVENT); - strings = malloc(msg.len); - if (!strings) - return NULL; + /* Wait on the condition variable for a watch to fire. */ + while (list_empty(&h->watch_list)) + pthread_cond_wait(&h->watch_condvar, &h->watch_mutex); + msg = list_top(&h->watch_list, struct xs_stored_msg, list); + list_del(&msg->list); - if (!read_all(h->fd, strings, msg.len)) { - free_no_errno(strings); - return NULL; - } + /* Clear the pipe token if there are no more pending watches. */ + if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1)) + while (read(h->watch_pipe[0], &c, 1) != 1) + continue; - num_strings = xs_count_strings(strings, msg.len); + pthread_mutex_unlock(&h->watch_mutex); - ret = malloc(sizeof(char*) * num_strings + msg.len); + assert(msg->hdr.type == XS_WATCH_EVENT); + + strings = msg->body; + num_strings = xs_count_strings(strings, msg->hdr.len); + + ret = malloc(sizeof(char*) * num_strings + msg->hdr.len); if (!ret) { free_no_errno(strings); + free_no_errno(msg); return NULL; } ret[0] = (char *)(ret + num_strings); - memcpy(ret[0], strings, msg.len); + memcpy(ret[0], strings, msg->hdr.len); + free(strings); + free(msg); - for (i = 1; i < num_strings; i++) { + for (i = 1; i < num_strings; i++) ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1; - } *num = num_strings; @@ -519,6 +624,7 @@ bool xs_unwatch(struct xs_handle *h, const char *path, const char *token) */ bool xs_transaction_start(struct xs_handle *h) { + pthread_mutex_lock(&h->transaction_mutex); return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL)); } @@ -530,12 +636,18 @@ bool xs_transaction_start(struct xs_handle *h) bool xs_transaction_end(struct xs_handle *h, bool abort) { char abortstr[2]; + bool rc; if (abort) strcpy(abortstr, "F"); else strcpy(abortstr, "T"); - return xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL)); + + rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL)); + + pthread_mutex_unlock(&h->transaction_mutex); + + return rc; } /* Introduce a new domain. @@ -584,18 +696,6 @@ char *xs_get_domain_path(struct xs_handle *h, domid_t domid) return xs_single(h, XS_GET_DOMAIN_PATH, domid_str, NULL); } -bool xs_shutdown(struct xs_handle *h) -{ - bool ret = xs_bool(xs_single(h, XS_SHUTDOWN, "", NULL)); - if (ret) { - char c; - /* Wait for it to actually shutdown. */ - while ((read(h->fd, &c, 1) < 0) && (errno == EINTR)) - continue; - } - return ret; -} - /* Only useful for DEBUG versions */ char *xs_debug_command(struct xs_handle *h, const char *cmd, void *data, unsigned int len) @@ -609,3 +709,75 @@ char *xs_debug_command(struct xs_handle *h, const char *cmd, return xs_talkv(h, XS_DEBUG, iov, ARRAY_SIZE(iov), NULL); } + +static void *read_thread(void *arg) +{ + struct xs_handle *h = arg; + struct xs_stored_msg *msg = NULL; + char *body = NULL; + + for (;;) { + msg = NULL; + body = NULL; + + /* Allocate message structure and read the message header. */ + msg = malloc(sizeof(*msg)); + if (msg == NULL) + goto error; + if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr))) + goto error; + + /* Allocate and read the message body. */ + body = msg->body = malloc(msg->hdr.len + 1); + if (body == NULL) + goto error; + if (!read_all(h->fd, body, msg->hdr.len)) + goto error; + body[msg->hdr.len] = '\0'; + + if (msg->hdr.type == XS_WATCH_EVENT) { + pthread_mutex_lock(&h->watch_mutex); + + /* Kick users out of their select() loop. */ + if (list_empty(&h->watch_list) && + (h->watch_pipe[1] != -1)) + while (write(h->watch_pipe[1], body, 1) != 1) + continue; + + list_add_tail(&msg->list, &h->watch_list); + pthread_cond_signal(&h->watch_condvar); + + pthread_mutex_unlock(&h->watch_mutex); + } else { + pthread_mutex_lock(&h->reply_mutex); + + /* There should only ever be one response pending! */ + if (!list_empty(&h->reply_list)) { + pthread_mutex_unlock(&h->reply_mutex); + goto error; + } + + list_add_tail(&msg->list, &h->reply_list); + pthread_cond_signal(&h->reply_condvar); + + pthread_mutex_unlock(&h->reply_mutex); + } + } + + error: + if (body != NULL) + free(body); + if (msg != NULL) + free(msg); + return NULL; +} + +/* + * Local variables: + * c-file-style: "linux" + * indent-tabs-mode: t + * c-indent-level: 8 + * c-basic-offset: 8 + * tab-width: 8 + * End: + */ |