aboutsummaryrefslogtreecommitdiffstats
path: root/tools/xenstore/xs.c
diff options
context:
space:
mode:
authorkaf24@firebug.cl.cam.ac.uk <kaf24@firebug.cl.cam.ac.uk>2005-10-08 19:19:27 +0100
committerkaf24@firebug.cl.cam.ac.uk <kaf24@firebug.cl.cam.ac.uk>2005-10-08 19:19:27 +0100
commit7429d6e0329522f4a7bc5e861e3edf08f44c2d5e (patch)
treed327d762b02dab236c59afd7554808634808d119 /tools/xenstore/xs.c
parent47f583a947813e2ee67cf6f8044a273f4b099661 (diff)
downloadxen-7429d6e0329522f4a7bc5e861e3edf08f44c2d5e.tar.gz
xen-7429d6e0329522f4a7bc5e861e3edf08f44c2d5e.tar.bz2
xen-7429d6e0329522f4a7bc5e861e3edf08f44c2d5e.zip
Make libxenstore thread-safe. It also spawns an internal
thread to read messages from the comms channel. Signed-off-by: Keir Fraser <keir@xensource.com>
Diffstat (limited to 'tools/xenstore/xs.c')
-rw-r--r--tools/xenstore/xs.c372
1 files changed, 272 insertions, 100 deletions
diff --git a/tools/xenstore/xs.c b/tools/xenstore/xs.c
index 1b405f18f4..6dc4c4532c 100644
--- a/tools/xenstore/xs.c
+++ b/tools/xenstore/xs.c
@@ -32,80 +32,151 @@
#include <stdint.h>
#include <errno.h>
#include <sys/ioctl.h>
+#include <pthread.h>
#include "xs.h"
+#include "list.h"
#include "utils.h"
-struct xs_handle
-{
+struct xs_stored_msg {
+ struct list_head list;
+ struct xsd_sockmsg hdr;
+ char *body;
+};
+
+struct xs_handle {
+ /* Communications channel to xenstore daemon. */
int fd;
+
+ /*
+ * A read thread which pulls messages off the comms channel and
+ * signals waiters.
+ */
+ pthread_t read_thr;
+
+ /*
+ * A list of fired watch messages, protected by a mutex. Users can
+ * wait on the conditional variable until a watch is pending.
+ */
+ struct list_head watch_list;
+ pthread_mutex_t watch_mutex;
+ pthread_cond_t watch_condvar;
+
+ /* Clients can select() on this pipe to wait for a watch to fire. */
+ int watch_pipe[2];
+
+ /*
+ * A list of replies. Currently only one will ever be outstanding
+ * because we serialise requests. The requester can wait on the
+ * conditional variable for its response.
+ */
+ struct list_head reply_list;
+ pthread_mutex_t reply_mutex;
+ pthread_cond_t reply_condvar;
+
+ /* One request at a time. */
+ pthread_mutex_t request_mutex;
+
+ /* One transaction at a time. */
+ pthread_mutex_t transaction_mutex;
};
-/* Get the socket from the store daemon handle.
- */
+static void *read_thread(void *arg);
+
int xs_fileno(struct xs_handle *h)
{
- return h->fd;
+ char c = 0;
+
+ pthread_mutex_lock(&h->watch_mutex);
+
+ if ((h->watch_pipe[0] == -1) && (pipe(h->watch_pipe) != -1)) {
+ /* Kick things off if the watch list is already non-empty. */
+ if (!list_empty(&h->watch_list))
+ while (write(h->watch_pipe[1], &c, 1) != 1)
+ continue;
+ }
+
+ pthread_mutex_unlock(&h->watch_mutex);
+
+ return h->watch_pipe[0];
}
-static struct xs_handle *get_socket(const char *connect_to)
+static int get_socket(const char *connect_to)
{
struct sockaddr_un addr;
int sock, saved_errno;
- struct xs_handle *h = NULL;
sock = socket(PF_UNIX, SOCK_STREAM, 0);
if (sock < 0)
- return NULL;
+ return -1;
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, connect_to);
- if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) == 0) {
- h = malloc(sizeof(*h));
- if (h) {
- h->fd = sock;
- return h;
- }
+ if (connect(sock, (struct sockaddr *)&addr, sizeof(addr)) != 0) {
+ saved_errno = errno;
+ close(sock);
+ errno = saved_errno;
+ return -1;
}
- saved_errno = errno;
- close(sock);
- errno = saved_errno;
- return NULL;
+ return sock;
}
-static struct xs_handle *get_dev(const char *connect_to)
+static int get_dev(const char *connect_to)
{
- int fd, saved_errno;
- struct xs_handle *h;
-
- fd = open(connect_to, O_RDWR);
- if (fd < 0)
- return NULL;
-
- h = malloc(sizeof(*h));
- if (h) {
- h->fd = fd;
- return h;
- }
-
- saved_errno = errno;
- close(fd);
- errno = saved_errno;
- return NULL;
+ return open(connect_to, O_RDWR);
}
static struct xs_handle *get_handle(const char *connect_to)
{
struct stat buf;
+ struct xs_handle *h = NULL;
+ int fd = -1, saved_errno;
if (stat(connect_to, &buf) != 0)
- return NULL;
+ goto error;
if (S_ISSOCK(buf.st_mode))
- return get_socket(connect_to);
+ fd = get_socket(connect_to);
else
- return get_dev(connect_to);
+ fd = get_dev(connect_to);
+
+ if (fd == -1)
+ goto error;
+
+ h = malloc(sizeof(*h));
+ if (h == NULL)
+ goto error;
+
+ h->fd = fd;
+
+ /* Watch pipe is allocated on demand in xs_fileno(). */
+ h->watch_pipe[0] = h->watch_pipe[1] = -1;
+
+ INIT_LIST_HEAD(&h->watch_list);
+ pthread_mutex_init(&h->watch_mutex, NULL);
+ pthread_cond_init(&h->watch_condvar, NULL);
+
+ INIT_LIST_HEAD(&h->reply_list);
+ pthread_mutex_init(&h->reply_mutex, NULL);
+ pthread_cond_init(&h->reply_condvar, NULL);
+
+ pthread_mutex_init(&h->request_mutex, NULL);
+ pthread_mutex_init(&h->transaction_mutex, NULL);
+
+ if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
+ goto error;
+
+ return h;
+
+ error:
+ saved_errno = errno;
+ if (h != NULL)
+ free(h);
+ if (fd != -1)
+ close(fd);
+ errno = saved_errno;
+ return NULL;
}
struct xs_handle *xs_daemon_open(void)
@@ -125,8 +196,39 @@ struct xs_handle *xs_domain_open(void)
void xs_daemon_close(struct xs_handle *h)
{
- if (h->fd >= 0)
- close(h->fd);
+ struct xs_stored_msg *msg, *tmsg;
+
+ pthread_mutex_lock(&h->transaction_mutex);
+ pthread_mutex_lock(&h->request_mutex);
+ pthread_mutex_lock(&h->reply_mutex);
+ pthread_mutex_lock(&h->watch_mutex);
+
+ /* XXX FIXME: May leak an unpublished message buffer. */
+ pthread_cancel(h->read_thr);
+ pthread_join(h->read_thr, NULL);
+
+ list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
+ free(msg->body);
+ free(msg);
+ }
+
+ list_for_each_entry_safe(msg, tmsg, &h->watch_list, list) {
+ free(msg->body);
+ free(msg);
+ }
+
+ pthread_mutex_unlock(&h->transaction_mutex);
+ pthread_mutex_unlock(&h->request_mutex);
+ pthread_mutex_unlock(&h->reply_mutex);
+ pthread_mutex_unlock(&h->watch_mutex);
+
+ if (h->watch_pipe[0] != -1) {
+ close(h->watch_pipe[0]);
+ close(h->watch_pipe[1]);
+ }
+
+ close(h->fd);
+
free(h);
}
@@ -169,31 +271,28 @@ static int get_error(const char *errorstring)
}
/* Adds extra nul terminator, because we generally (always?) hold strings. */
-static void *read_reply(int fd, enum xsd_sockmsg_type *type, unsigned int *len)
+static void *read_reply(
+ struct xs_handle *h, enum xsd_sockmsg_type *type, unsigned int *len)
{
- struct xsd_sockmsg msg;
- void *ret;
- int saved_errno;
+ struct xs_stored_msg *msg;
+ char *body;
- if (!read_all(fd, &msg, sizeof(msg)))
- return NULL;
+ pthread_mutex_lock(&h->reply_mutex);
+ while (list_empty(&h->reply_list))
+ pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
+ msg = list_top(&h->reply_list, struct xs_stored_msg, list);
+ list_del(&msg->list);
+ assert(list_empty(&h->reply_list));
+ pthread_mutex_unlock(&h->reply_mutex);
- ret = malloc(msg.len + 1);
- if (!ret)
- return NULL;
+ *type = msg->hdr.type;
+ if (len)
+ *len = msg->hdr.len;
+ body = msg->body;
- if (!read_all(fd, ret, msg.len)) {
- saved_errno = errno;
- free(ret);
- errno = saved_errno;
- return NULL;
- }
+ free(msg);
- *type = msg.type;
- if (len)
- *len = msg.len;
- ((char *)ret)[msg.len] = '\0';
- return ret;
+ return body;
}
/* Send message to xs, get malloc'ed reply. NULL and set errno on error. */
@@ -217,6 +316,8 @@ static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type,
ignorepipe.sa_flags = 0;
sigaction(SIGPIPE, &ignorepipe, &oldact);
+ pthread_mutex_lock(&h->request_mutex);
+
if (!xs_write_all(h->fd, &msg, sizeof(msg)))
goto fail;
@@ -224,14 +325,11 @@ static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type,
if (!xs_write_all(h->fd, iovec[i].iov_base, iovec[i].iov_len))
goto fail;
- /* Watches can have fired before reply comes: daemon detects
- * and re-transmits, so we can ignore this. */
- do {
- free(ret);
- ret = read_reply(h->fd, &msg.type, len);
- if (!ret)
- goto fail;
- } while (msg.type == XS_WATCH_EVENT);
+ ret = read_reply(h, &msg.type, len);
+ if (!ret)
+ goto fail;
+
+ pthread_mutex_unlock(&h->request_mutex);
sigaction(SIGPIPE, &oldact, NULL);
if (msg.type == XS_ERROR) {
@@ -252,6 +350,7 @@ static void *xs_talkv(struct xs_handle *h, enum xsd_sockmsg_type type,
fail:
/* We're in a bad state, so close fd. */
saved_errno = errno;
+ pthread_mutex_unlock(&h->request_mutex);
sigaction(SIGPIPE, &oldact, NULL);
close_fd:
close(h->fd);
@@ -449,39 +548,45 @@ bool xs_watch(struct xs_handle *h, const char *path, const char *token)
*/
char **xs_read_watch(struct xs_handle *h, unsigned int *num)
{
- struct xsd_sockmsg msg;
- char **ret;
- char *strings;
+ struct xs_stored_msg *msg;
+ char **ret, *strings, c = 0;
unsigned int num_strings, i;
- if (!read_all(h->fd, &msg, sizeof(msg)))
- return NULL;
+ pthread_mutex_lock(&h->watch_mutex);
- assert(msg.type == XS_WATCH_EVENT);
- strings = malloc(msg.len);
- if (!strings)
- return NULL;
+ /* Wait on the condition variable for a watch to fire. */
+ while (list_empty(&h->watch_list))
+ pthread_cond_wait(&h->watch_condvar, &h->watch_mutex);
+ msg = list_top(&h->watch_list, struct xs_stored_msg, list);
+ list_del(&msg->list);
- if (!read_all(h->fd, strings, msg.len)) {
- free_no_errno(strings);
- return NULL;
- }
+ /* Clear the pipe token if there are no more pending watches. */
+ if (list_empty(&h->watch_list) && (h->watch_pipe[0] != -1))
+ while (read(h->watch_pipe[0], &c, 1) != 1)
+ continue;
- num_strings = xs_count_strings(strings, msg.len);
+ pthread_mutex_unlock(&h->watch_mutex);
- ret = malloc(sizeof(char*) * num_strings + msg.len);
+ assert(msg->hdr.type == XS_WATCH_EVENT);
+
+ strings = msg->body;
+ num_strings = xs_count_strings(strings, msg->hdr.len);
+
+ ret = malloc(sizeof(char*) * num_strings + msg->hdr.len);
if (!ret) {
free_no_errno(strings);
+ free_no_errno(msg);
return NULL;
}
ret[0] = (char *)(ret + num_strings);
- memcpy(ret[0], strings, msg.len);
+ memcpy(ret[0], strings, msg->hdr.len);
+
free(strings);
+ free(msg);
- for (i = 1; i < num_strings; i++) {
+ for (i = 1; i < num_strings; i++)
ret[i] = ret[i - 1] + strlen(ret[i - 1]) + 1;
- }
*num = num_strings;
@@ -519,6 +624,7 @@ bool xs_unwatch(struct xs_handle *h, const char *path, const char *token)
*/
bool xs_transaction_start(struct xs_handle *h)
{
+ pthread_mutex_lock(&h->transaction_mutex);
return xs_bool(xs_single(h, XS_TRANSACTION_START, "", NULL));
}
@@ -530,12 +636,18 @@ bool xs_transaction_start(struct xs_handle *h)
bool xs_transaction_end(struct xs_handle *h, bool abort)
{
char abortstr[2];
+ bool rc;
if (abort)
strcpy(abortstr, "F");
else
strcpy(abortstr, "T");
- return xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
+
+ rc = xs_bool(xs_single(h, XS_TRANSACTION_END, abortstr, NULL));
+
+ pthread_mutex_unlock(&h->transaction_mutex);
+
+ return rc;
}
/* Introduce a new domain.
@@ -584,18 +696,6 @@ char *xs_get_domain_path(struct xs_handle *h, domid_t domid)
return xs_single(h, XS_GET_DOMAIN_PATH, domid_str, NULL);
}
-bool xs_shutdown(struct xs_handle *h)
-{
- bool ret = xs_bool(xs_single(h, XS_SHUTDOWN, "", NULL));
- if (ret) {
- char c;
- /* Wait for it to actually shutdown. */
- while ((read(h->fd, &c, 1) < 0) && (errno == EINTR))
- continue;
- }
- return ret;
-}
-
/* Only useful for DEBUG versions */
char *xs_debug_command(struct xs_handle *h, const char *cmd,
void *data, unsigned int len)
@@ -609,3 +709,75 @@ char *xs_debug_command(struct xs_handle *h, const char *cmd,
return xs_talkv(h, XS_DEBUG, iov, ARRAY_SIZE(iov), NULL);
}
+
+static void *read_thread(void *arg)
+{
+ struct xs_handle *h = arg;
+ struct xs_stored_msg *msg = NULL;
+ char *body = NULL;
+
+ for (;;) {
+ msg = NULL;
+ body = NULL;
+
+ /* Allocate message structure and read the message header. */
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto error;
+ if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
+ goto error;
+
+ /* Allocate and read the message body. */
+ body = msg->body = malloc(msg->hdr.len + 1);
+ if (body == NULL)
+ goto error;
+ if (!read_all(h->fd, body, msg->hdr.len))
+ goto error;
+ body[msg->hdr.len] = '\0';
+
+ if (msg->hdr.type == XS_WATCH_EVENT) {
+ pthread_mutex_lock(&h->watch_mutex);
+
+ /* Kick users out of their select() loop. */
+ if (list_empty(&h->watch_list) &&
+ (h->watch_pipe[1] != -1))
+ while (write(h->watch_pipe[1], body, 1) != 1)
+ continue;
+
+ list_add_tail(&msg->list, &h->watch_list);
+ pthread_cond_signal(&h->watch_condvar);
+
+ pthread_mutex_unlock(&h->watch_mutex);
+ } else {
+ pthread_mutex_lock(&h->reply_mutex);
+
+ /* There should only ever be one response pending! */
+ if (!list_empty(&h->reply_list)) {
+ pthread_mutex_unlock(&h->reply_mutex);
+ goto error;
+ }
+
+ list_add_tail(&msg->list, &h->reply_list);
+ pthread_cond_signal(&h->reply_condvar);
+
+ pthread_mutex_unlock(&h->reply_mutex);
+ }
+ }
+
+ error:
+ if (body != NULL)
+ free(body);
+ if (msg != NULL)
+ free(msg);
+ return NULL;
+}
+
+/*
+ * Local variables:
+ * c-file-style: "linux"
+ * indent-tabs-mode: t
+ * c-indent-level: 8
+ * c-basic-offset: 8
+ * tab-width: 8
+ * End:
+ */