From 7fc4533b321fbe7ef6ce0ef05e3b99cebe13a39e Mon Sep 17 00:00:00 2001 From: "kaf24@firebug.cl.cam.ac.uk" Date: Thu, 8 Dec 2005 15:24:02 +0100 Subject: Merge in the newer Xenbus implementation from Linux to the Mini-OS. The new version compiles and starts up, but I'm not really sure how to test the new xenbus implementation. * Added unbind_evtchn * Copied parts of the Linux spinlock implementation to make the changes to xenbus compared to Linux smaller. Also added a dummy rwsem implementation. * Updated the xenbus-files Signed-off-by: Simon Kagstrom --- extras/mini-os/xenbus/xenbus_comms.c | 211 +++++------- extras/mini-os/xenbus/xenbus_comms.h | 2 +- extras/mini-os/xenbus/xenbus_xs.c | 615 ++++++++++++++++++++++++----------- 3 files changed, 509 insertions(+), 319 deletions(-) (limited to 'extras/mini-os/xenbus') diff --git a/extras/mini-os/xenbus/xenbus_comms.c b/extras/mini-os/xenbus/xenbus_comms.c index b299c05683..ac9080ca4a 100644 --- a/extras/mini-os/xenbus/xenbus_comms.c +++ b/extras/mini-os/xenbus/xenbus_comms.c @@ -33,174 +33,132 @@ #include #include #include +#include +#include "xenbus_comms.h" +static int xenbus_irq; -#ifdef XENBUS_COMMS_DEBUG -#define DEBUG(_f, _a...) \ - printk("MINI_OS(file=xenbus_comms.c, line=%d) " _f "\n", __LINE__, ## _a) -#else -#define DEBUG(_f, _a...) ((void)0) -#endif - - -#define RINGBUF_DATASIZE ((PAGE_SIZE / 2) - sizeof(struct ringbuf_head)) -struct ringbuf_head -{ - u32 write; /* Next place to write to */ - u32 read; /* Next place to read from */ - u8 flags; - char buf[0]; -} __attribute__((packed)); +extern void xenbus_probe(void *); +extern int xenstored_ready; DECLARE_WAIT_QUEUE_HEAD(xb_waitq); -static inline struct ringbuf_head *outbuf(void) +static inline struct xenstore_domain_interface *xenstore_domain_interface(void) { return mfn_to_virt(start_info.store_mfn); } -static inline struct ringbuf_head *inbuf(void) -{ - return (struct ringbuf_head *)((char *)mfn_to_virt(start_info.store_mfn) + PAGE_SIZE/2); -} - static void wake_waiting(int port, struct pt_regs *regs) { wake_up(&xb_waitq); } -static int check_buffer(const struct ringbuf_head *h) -{ - return (h->write < RINGBUF_DATASIZE && h->read < RINGBUF_DATASIZE); -} - -/* We can't fill last byte: would look like empty buffer. */ -static void *get_output_chunk(const struct ringbuf_head *h, - void *buf, u32 *len) +static int check_indexes(XENSTORE_RING_IDX cons, XENSTORE_RING_IDX prod) { - u32 read_mark; - - if (h->read == 0) - read_mark = RINGBUF_DATASIZE - 1; - else - read_mark = h->read - 1; - - /* Here to the end of buffer, unless they haven't read some out. */ - *len = RINGBUF_DATASIZE - h->write; - if (read_mark >= h->write) - *len = read_mark - h->write; - return (void *)((char *)buf + h->write); + return ((prod - cons) <= XENSTORE_RING_SIZE); } -static const void *get_input_chunk(const struct ringbuf_head *h, - const void *buf, u32 *len) +static void *get_output_chunk(XENSTORE_RING_IDX cons, + XENSTORE_RING_IDX prod, + char *buf, uint32_t *len) { - /* Here to the end of buffer, unless they haven't written some. */ - *len = RINGBUF_DATASIZE - h->read; - if (h->write >= h->read) - *len = h->write - h->read; - return (void *)((char *)buf + h->read); + *len = XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod); + if ((XENSTORE_RING_SIZE - (prod - cons)) < *len) + *len = XENSTORE_RING_SIZE - (prod - cons); + return buf + MASK_XENSTORE_IDX(prod); } -static void update_output_chunk(struct ringbuf_head *h, u32 len) +static const void *get_input_chunk(XENSTORE_RING_IDX cons, + XENSTORE_RING_IDX prod, + const char *buf, uint32_t *len) { - h->write += len; - if (h->write == RINGBUF_DATASIZE) - h->write = 0; -} - -static void update_input_chunk(struct ringbuf_head *h, u32 len) -{ - h->read += len; - if (h->read == RINGBUF_DATASIZE) - h->read = 0; -} - -static int output_avail(struct ringbuf_head *out) -{ - unsigned int avail; - - get_output_chunk(out, out->buf, &avail); - return avail != 0; + *len = XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(cons); + if ((prod - cons) < *len) + *len = prod - cons; + return buf + MASK_XENSTORE_IDX(cons); } int xb_write(const void *data, unsigned len) { - struct ringbuf_head h; - struct ringbuf_head *out = outbuf(); + struct xenstore_domain_interface *intf = xenstore_domain_interface(); + XENSTORE_RING_IDX cons, prod; - do { + while (len != 0) { void *dst; unsigned int avail; - wait_event(xb_waitq, output_avail(out)); + wait_event(xb_waitq, (intf->req_prod - intf->req_cons) != + XENSTORE_RING_SIZE); - /* Read, then check: not that we don't trust store. - * Hell, some of my best friends are daemons. But, - * in this post-911 world... */ - h = *out; + /* Read indexes, then verify. */ + cons = intf->req_cons; + prod = intf->req_prod; mb(); - if (!check_buffer(&h)) { - return -1; /* ETERRORIST! */ - } + if (!check_indexes(cons, prod)) + return -EIO; - dst = get_output_chunk(&h, out->buf, &avail); + dst = get_output_chunk(cons, prod, intf->req, &avail); + if (avail == 0) + continue; if (avail > len) avail = len; + memcpy(dst, data, avail); - data = (void *)((char *)data + avail); + data = (void*) ( (unsigned long)data + avail ); len -= avail; - update_output_chunk(out, avail); - notify_via_evtchn(start_info.store_evtchn); - } while (len != 0); - return 0; -} + /* Other side must not see new header until data is there. */ + wmb(); + intf->req_prod += avail; -int xs_input_avail(void) -{ - unsigned int avail; - struct ringbuf_head *in = inbuf(); + /* This implies mb() before other side sees interrupt. */ + notify_remote_via_evtchn(start_info.store_evtchn); + } - get_input_chunk(in, in->buf, &avail); - return avail != 0; + return 0; } int xb_read(void *data, unsigned len) { - struct ringbuf_head h; - struct ringbuf_head *in = inbuf(); - int was_full; + struct xenstore_domain_interface *intf = xenstore_domain_interface(); + XENSTORE_RING_IDX cons, prod; while (len != 0) { unsigned int avail; const char *src; - wait_event(xb_waitq, xs_input_avail()); - h = *in; + wait_event(xb_waitq, + intf->rsp_cons != intf->rsp_prod); + + /* Read indexes, then verify. */ + cons = intf->rsp_cons; + prod = intf->rsp_prod; mb(); - if (!check_buffer(&h)) { - return -1; - } + if (!check_indexes(cons, prod)) + return -EIO; - src = get_input_chunk(&h, in->buf, &avail); + src = get_input_chunk(cons, prod, intf->rsp, &avail); + if (avail == 0) + continue; if (avail > len) avail = len; - was_full = !output_avail(&h); + + /* We must read header before we read data. */ + rmb(); memcpy(data, src, avail); - data = (void *)((char *)data + avail); + data = (void*) ( (unsigned long)data + avail ); len -= avail; - update_input_chunk(in, avail); - DEBUG("Finished read of %i bytes (%i to go)\n", avail, len); - /* If it was full, tell them we've taken some. */ - if (was_full) - notify_via_evtchn(start_info.store_evtchn); - } - /* If we left something, wake watch thread to deal with it. */ - if (xs_input_avail()) - wake_up(&xb_waitq); + /* Other side must not see free space until we've copied out */ + mb(); + intf->rsp_cons += avail; + + printk("Finished read of %i bytes (%i to go)\n", avail, len); + + /* Implies mb(): they will see new header. */ + notify_remote_via_evtchn(start_info.store_evtchn); + } return 0; } @@ -208,24 +166,19 @@ int xb_read(void *data, unsigned len) /* Set up interrupt handler off store event channel. */ int xb_init_comms(void) { - printk("Init xenbus comms, store event channel %d\n", start_info.store_evtchn); - if (!start_info.store_evtchn) - return 0; - printk("Binding virq\n"); - bind_evtchn(start_info.store_evtchn, &wake_waiting); - - /* FIXME zero out page -- domain builder should probably do this*/ - memset(mfn_to_virt(start_info.store_mfn), 0, PAGE_SIZE); - notify_via_evtchn(start_info.store_evtchn); - return 0; -} + int err; -void xb_suspend_comms(void) -{ + if (xenbus_irq) + unbind_evtchn(xenbus_irq); + + err = bind_evtchn( + start_info.store_evtchn, wake_waiting); + if (err <= 0) { + printk("XENBUS request irq failed %i\n", err); + return err; + } - if (!start_info.store_evtchn) - return; + xenbus_irq = err; - // TODO - //unbind_evtchn_from_irqhandler(xen_start_info.store_evtchn, &xb_waitq); + return 0; } diff --git a/extras/mini-os/xenbus/xenbus_comms.h b/extras/mini-os/xenbus/xenbus_comms.h index def0ddfb4a..08bbbf861c 100644 --- a/extras/mini-os/xenbus/xenbus_comms.h +++ b/extras/mini-os/xenbus/xenbus_comms.h @@ -28,8 +28,8 @@ #ifndef _XENBUS_COMMS_H #define _XENBUS_COMMS_H +int xs_init(void); int xb_init_comms(void); -void xb_suspend_comms(void); /* Low level routines. */ int xb_write(const void *data, unsigned len); diff --git a/extras/mini-os/xenbus/xenbus_xs.c b/extras/mini-os/xenbus/xenbus_xs.c index 355133b5e6..115439a1f8 100644 --- a/extras/mini-os/xenbus/xenbus_xs.c +++ b/extras/mini-os/xenbus/xenbus_xs.c @@ -39,15 +39,63 @@ #include #include #include +#include #include #include "xenbus_comms.h" #define streq(a, b) (strcmp((a), (b)) == 0) -static char printf_buffer[4096]; +struct xs_stored_msg { + struct list_head list; + + struct xsd_sockmsg hdr; + + union { + /* Queued replies. */ + struct { + char *body; + } reply; + + /* Queued watch events. */ + struct { + struct xenbus_watch *handle; + char **vec; + unsigned int vec_size; + } watch; + } u; +}; + +struct xs_handle { + /* A list of replies. Currently only one will ever be outstanding. */ + struct list_head reply_list; + spinlock_t reply_lock; + struct wait_queue_head reply_waitq; + + /* One request at a time. */ + struct semaphore request_mutex; + + /* Protect transactions against save/restore. */ + struct rw_semaphore suspend_mutex; +}; + +static struct xs_handle xs_state; + +/* List of registered watches, and a lock to protect it. */ static LIST_HEAD(watches); -//TODO -DECLARE_MUTEX(xenbus_lock); +static DEFINE_SPINLOCK(watches_lock); + +/* List of pending watch callback events, and a lock to protect it. */ +static LIST_HEAD(watch_events); +static DEFINE_SPINLOCK(watch_events_lock); + +/* + * Details of the xenwatch callback kernel thread. The thread waits on the + * watch_events_waitq for work to do (queued on watch_events list). When it + * wakes up it acquires the xenwatch_mutex before reading the list and + * carrying out work. + */ +/* static */ DECLARE_MUTEX(xenwatch_mutex); +static DECLARE_WAIT_QUEUE_HEAD(watch_events_waitq); static int get_error(const char *errorstring) { @@ -65,47 +113,82 @@ static int get_error(const char *errorstring) static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len) { - struct xsd_sockmsg msg; - void *ret; - int err; + struct xs_stored_msg *msg; + char *body; - err = xb_read(&msg, sizeof(msg)); - if (err) - return ERR_PTR(err); + spin_lock(&xs_state.reply_lock); - ret = xmalloc_array(char, msg.len + 1); - if (!ret) - return ERR_PTR(-ENOMEM); - - err = xb_read(ret, msg.len); - if (err) { - xfree(ret); - return ERR_PTR(err); + while (list_empty(&xs_state.reply_list)) { + spin_unlock(&xs_state.reply_lock); + wait_event(xs_state.reply_waitq, + !list_empty(&xs_state.reply_list)); + spin_lock(&xs_state.reply_lock); } - ((char*)ret)[msg.len] = '\0'; - *type = msg.type; + msg = list_entry(xs_state.reply_list.next, + struct xs_stored_msg, list); + list_del(&msg->list); + + spin_unlock(&xs_state.reply_lock); + + *type = msg->hdr.type; if (len) - *len = msg.len; - return ret; + *len = msg->hdr.len; + body = msg->u.reply.body; + + free(msg); + + return body; } /* Emergency write. */ void xenbus_debug_write(const char *str, unsigned int count) { - struct xsd_sockmsg msg; + struct xsd_sockmsg msg = { 0 }; msg.type = XS_DEBUG; msg.len = sizeof("print") + count + 1; + down(&xs_state.request_mutex); xb_write(&msg, sizeof(msg)); xb_write("print", sizeof("print")); xb_write(str, count); xb_write("", 1); + up(&xs_state.request_mutex); +} + +void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg) +{ + void *ret; + struct xsd_sockmsg req_msg = *msg; + int err; + + if (req_msg.type == XS_TRANSACTION_START) + down_read(&xs_state.suspend_mutex); + + down(&xs_state.request_mutex); + + err = xb_write(msg, sizeof(*msg) + msg->len); + if (err) { + msg->type = XS_ERROR; + ret = ERR_PTR(err); + } else { + ret = read_reply(&msg->type, &msg->len); + } + + up(&xs_state.request_mutex); + + if ((msg->type == XS_TRANSACTION_END) || + ((req_msg.type == XS_TRANSACTION_START) && + (msg->type == XS_ERROR))) + up_read(&xs_state.suspend_mutex); + + return ret; } /* Send message to xs, get kmalloc'ed reply. ERR_PTR() on error. */ -static void *xs_talkv(enum xsd_sockmsg_type type, +static void *xs_talkv(struct xenbus_transaction *t, + enum xsd_sockmsg_type type, const struct kvec *iovec, unsigned int num_vecs, unsigned int *len) @@ -115,51 +198,57 @@ static void *xs_talkv(enum xsd_sockmsg_type type, unsigned int i; int err; - //WARN_ON(down_trylock(&xenbus_lock) == 0); - + msg.tx_id = (u32)(unsigned long)t; + msg.req_id = 0; msg.type = type; msg.len = 0; for (i = 0; i < num_vecs; i++) msg.len += iovec[i].iov_len; + down(&xs_state.request_mutex); + err = xb_write(&msg, sizeof(msg)); - if (err) + if (err) { + up(&xs_state.request_mutex); return ERR_PTR(err); + } for (i = 0; i < num_vecs; i++) { - err = xb_write(iovec[i].iov_base, iovec[i].iov_len); - if (err) + err = xb_write(iovec[i].iov_base, iovec[i].iov_len);; + if (err) { + up(&xs_state.request_mutex); return ERR_PTR(err); + } } - /* Watches can have fired before reply comes: daemon detects - * and re-transmits, so we can ignore this. */ - do { - xfree(ret); - ret = read_reply(&msg.type, len); - if (IS_ERR(ret)) - return ret; - } while (msg.type == XS_WATCH_EVENT); + ret = read_reply(&msg.type, len); + + up(&xs_state.request_mutex); + + if (IS_ERR(ret)) + return ret; if (msg.type == XS_ERROR) { err = get_error(ret); - xfree(ret); + free(ret); return ERR_PTR(-err); } - //BUG_ON(msg.type != type); + // BUG_ON(msg.type != type); return ret; } /* Simplified version of xs_talkv: single message. */ -static void *xs_single(enum xsd_sockmsg_type type, - const char *string, unsigned int *len) +static void *xs_single(struct xenbus_transaction *t, + enum xsd_sockmsg_type type, + const char *string, + unsigned int *len) { struct kvec iovec; iovec.iov_base = (void *)string; iovec.iov_len = strlen(string) + 1; - return xs_talkv(type, &iovec, 1, len); + return xs_talkv(t, type, &iovec, 1, len); } /* Many commands only need an ack, don't care what it says. */ @@ -167,7 +256,7 @@ static int xs_error(char *reply) { if (IS_ERR(reply)) return PTR_ERR(reply); - xfree(reply); + free(reply); return 0; } @@ -182,60 +271,76 @@ static unsigned int count_strings(const char *strings, unsigned int len) return num; } -/* Return the path to dir with /name appended. */ +/* Return the path to dir with /name appended. Buffer must be kfree()'ed. */ static char *join(const char *dir, const char *name) { - static char buffer[4096]; + char *buffer; - //BUG_ON(down_trylock(&xenbus_lock) == 0); - /* XXX FIXME: might not be correct if name == "" */ - //BUG_ON(strlen(dir) + strlen("/") + strlen(name) + 1 > sizeof(buffer)); + buffer = malloc(strlen(dir) + strlen("/") + strlen(name) + 1); + if (buffer == NULL) + return ERR_PTR(-ENOMEM); strcpy(buffer, dir); if (!streq(name, "")) { strcat(buffer, "/"); strcat(buffer, name); } + return buffer; } -char **xenbus_directory(const char *dir, const char *node, unsigned int *num) +static char **split(char *strings, unsigned int len, unsigned int *num) { - char *strings, *p, **ret; - unsigned int len; - - strings = xs_single(XS_DIRECTORY, join(dir, node), &len); - if (IS_ERR(strings)) - return (char **)strings; + char *p, **ret; /* Count the strings. */ *num = count_strings(strings, len); /* Transfer to one big alloc for easy freeing. */ - ret = (char **)xmalloc_array(char, *num * sizeof(char *) + len); + ret = malloc(*num * sizeof(char *) + len); if (!ret) { - xfree(strings); + free(strings); return ERR_PTR(-ENOMEM); } memcpy(&ret[*num], strings, len); - xfree(strings); + free(strings); strings = (char *)&ret[*num]; for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1) ret[(*num)++] = p; + return ret; } +char **xenbus_directory(struct xenbus_transaction *t, + const char *dir, const char *node, unsigned int *num) +{ + char *strings, *path; + unsigned int len; + + path = join(dir, node); + if (IS_ERR(path)) + return (char **)path; + + strings = xs_single(t, XS_DIRECTORY, path, &len); + free(path); + if (IS_ERR(strings)) + return (char **)strings; + + return split(strings, len, num); +} + /* Check if a path exists. Return 1 if it does. */ -int xenbus_exists(const char *dir, const char *node) +int xenbus_exists(struct xenbus_transaction *t, + const char *dir, const char *node) { char **d; int dir_n; - d = xenbus_directory(dir, node, &dir_n); + d = xenbus_directory(t, dir, node, &dir_n); if (IS_ERR(d)) return 0; - xfree(d); + free(d); return 1; } @@ -243,92 +348,134 @@ int xenbus_exists(const char *dir, const char *node) * Returns a kmalloced value: call free() on it after use. * len indicates length in bytes. */ -void *xenbus_read(const char *dir, const char *node, unsigned int *len) +void *xenbus_read(struct xenbus_transaction *t, + const char *dir, const char *node, unsigned int *len) { - return xs_single(XS_READ, join(dir, node), len); + char *path; + void *ret; + + path = join(dir, node); + if (IS_ERR(path)) + return (void *)path; + + ret = xs_single(t, XS_READ, path, len); + free(path); + return ret; } /* Write the value of a single file. - * Returns -err on failure. createflags can be 0, O_CREAT, or O_CREAT|O_EXCL. + * Returns -err on failure. */ -int xenbus_write(const char *dir, const char *node, - const char *string, int createflags) +int xenbus_write(struct xenbus_transaction *t, + const char *dir, const char *node, const char *string) { - const char *flags, *path; - struct kvec iovec[3]; + const char *path; + struct kvec iovec[2]; + int ret; path = join(dir, node); - /* Format: Flags (as string), path, data. */ - if (createflags == 0) - flags = XS_WRITE_NONE; - else if (createflags == O_CREAT) - flags = XS_WRITE_CREATE; - else if (createflags == (O_CREAT|O_EXCL)) - flags = XS_WRITE_CREATE_EXCL; - else - return -EINVAL; + if (IS_ERR(path)) + return PTR_ERR(path); iovec[0].iov_base = (void *)path; iovec[0].iov_len = strlen(path) + 1; - iovec[1].iov_base = (void *)flags; - iovec[1].iov_len = strlen(flags) + 1; - iovec[2].iov_base = (void *)string; - iovec[2].iov_len = strlen(string); + iovec[1].iov_base = (void *)string; + iovec[1].iov_len = strlen(string); - return xs_error(xs_talkv(XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL)); + ret = xs_error(xs_talkv(t, XS_WRITE, iovec, ARRAY_SIZE(iovec), NULL)); + free(path); + return ret; } /* Create a new directory. */ -int xenbus_mkdir(const char *dir, const char *node) +int xenbus_mkdir(struct xenbus_transaction *t, + const char *dir, const char *node) { - return xs_error(xs_single(XS_MKDIR, join(dir, node), NULL)); + char *path; + int ret; + + path = join(dir, node); + if (IS_ERR(path)) + return PTR_ERR(path); + + ret = xs_error(xs_single(t, XS_MKDIR, path, NULL)); + free(path); + return ret; } /* Destroy a file or directory (directories must be empty). */ -int xenbus_rm(const char *dir, const char *node) +int xenbus_rm(struct xenbus_transaction *t, const char *dir, const char *node) { - return xs_error(xs_single(XS_RM, join(dir, node), NULL)); + char *path; + int ret; + + path = join(dir, node); + if (IS_ERR(path)) + return PTR_ERR(path); + + ret = xs_error(xs_single(t, XS_RM, path, NULL)); + free(path); + return ret; } /* Start a transaction: changes by others will not be seen during this * transaction, and changes will not be visible to others until end. - * Transaction only applies to the given subtree. - * You can only have one transaction at any time. */ -int xenbus_transaction_start(const char *subtree) +struct xenbus_transaction *xenbus_transaction_start(void) { - return xs_error(xs_single(XS_TRANSACTION_START, subtree, NULL)); + char *id_str; + unsigned long id; + + down_read(&xs_state.suspend_mutex); + + id_str = xs_single(NULL, XS_TRANSACTION_START, "", NULL); + if (IS_ERR(id_str)) { + up_read(&xs_state.suspend_mutex); + return (struct xenbus_transaction *)id_str; + } + + id = simple_strtoul(id_str, NULL, 0); + free(id_str); + + return (struct xenbus_transaction *)id; } /* End a transaction. * If abandon is true, transaction is discarded instead of committed. */ -int xenbus_transaction_end(int abort) +int xenbus_transaction_end(struct xenbus_transaction *t, int abort) { char abortstr[2]; + int err; if (abort) strcpy(abortstr, "F"); else strcpy(abortstr, "T"); - return xs_error(xs_single(XS_TRANSACTION_END, abortstr, NULL)); + + err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL)); + + up_read(&xs_state.suspend_mutex); + + return err; } /* Single read and scanf: returns -errno or num scanned. */ -int xenbus_scanf(const char *dir, const char *node, const char *fmt, ...) +int xenbus_scanf(struct xenbus_transaction *t, + const char *dir, const char *node, const char *fmt, ...) { va_list ap; int ret; char *val; - val = xenbus_read(dir, node, NULL); + val = xenbus_read(t, dir, node, NULL); if (IS_ERR(val)) return PTR_ERR(val); va_start(ap, fmt); ret = vsscanf(val, fmt, ap); va_end(ap); - xfree(val); + free(val); /* Distinctive errno. */ if (ret == 0) return -ERANGE; @@ -336,23 +483,32 @@ int xenbus_scanf(const char *dir, const char *node, const char *fmt, ...) } /* Single printf and write: returns -errno or 0. */ -int xenbus_printf(const char *dir, const char *node, const char *fmt, ...) +int xenbus_printf(struct xenbus_transaction *t, + const char *dir, const char *node, const char *fmt, ...) { va_list ap; int ret; +#define PRINTF_BUFFER_SIZE 4096 + char *printf_buffer; + + printf_buffer = malloc(PRINTF_BUFFER_SIZE); + if (printf_buffer == NULL) + return -ENOMEM; - //BUG_ON(down_trylock(&xenbus_lock) == 0); va_start(ap, fmt); - ret = vsnprintf(printf_buffer, sizeof(printf_buffer), fmt, ap); + ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap); va_end(ap); - //BUG_ON(ret > sizeof(printf_buffer)-1); - return xenbus_write(dir, node, printf_buffer, O_CREAT); + // BUG_ON(ret > PRINTF_BUFFER_SIZE-1); + ret = xenbus_write(t, dir, node, printf_buffer); + + free(printf_buffer); + + return ret; } - /* Takes tuples of names, scanf-style args, and void **, NULL terminated. */ -int xenbus_gather(const char *dir, ...) +int xenbus_gather(struct xenbus_transaction *t, const char *dir, ...) { va_list ap; const char *name; @@ -364,7 +520,7 @@ int xenbus_gather(const char *dir, ...) void *result = va_arg(ap, void *); char *p; - p = xenbus_read(dir, name, NULL); + p = xenbus_read(t, dir, name, NULL); if (IS_ERR(p)) { ret = PTR_ERR(p); break; @@ -372,7 +528,7 @@ int xenbus_gather(const char *dir, ...) if (fmt) { if (sscanf(p, fmt, result) == 0) ret = -EINVAL; - xfree(p); + free(p); } else *(char **)result = p; } @@ -389,31 +545,8 @@ static int xs_watch(const char *path, const char *token) iov[1].iov_base = (void *)token; iov[1].iov_len = strlen(token) + 1; - return xs_error(xs_talkv(XS_WATCH, iov, ARRAY_SIZE(iov), NULL)); -} - -static char *xs_read_watch(char **token) -{ - enum xsd_sockmsg_type type; - char *ret; - - ret = read_reply(&type, NULL); - if (IS_ERR(ret)) - return ret; - - //BUG_ON(type != XS_WATCH_EVENT); - *token = ret + strlen(ret) + 1; - return ret; -} - -static int xs_acknowledge_watch(const char *token) -{ -#if 0 - return xs_error(xs_single(XS_WATCH_ACK, token, NULL)); -#else - /* XS_WATCH_ACK is no longer available */ - return 0; -#endif + return xs_error(xs_talkv(NULL, XS_WATCH, iov, + ARRAY_SIZE(iov), NULL)); } static int xs_unwatch(const char *path, const char *token) @@ -425,10 +558,10 @@ static int xs_unwatch(const char *path, const char *token) iov[1].iov_base = (char *)token; iov[1].iov_len = strlen(token) + 1; - return xs_error(xs_talkv(XS_UNWATCH, iov, ARRAY_SIZE(iov), NULL)); + return xs_error(xs_talkv(NULL, XS_UNWATCH, iov, + ARRAY_SIZE(iov), NULL)); } -/* A little paranoia: we don't just trust token. */ static struct xenbus_watch *find_watch(const char *token) { struct xenbus_watch *i, *cmp; @@ -438,6 +571,7 @@ static struct xenbus_watch *find_watch(const char *token) list_for_each_entry(i, &watches, list) if (i == cmp) return i; + return NULL; } @@ -449,111 +583,214 @@ int register_xenbus_watch(struct xenbus_watch *watch) int err; sprintf(token, "%lX", (long)watch); - //BUG_ON(find_watch(token)); -printk("Registered watch for: %s\n", token); + + down_read(&xs_state.suspend_mutex); + + spin_lock(&watches_lock); + // BUG_ON(find_watch(token)); + list_add(&watch->list, &watches); + spin_unlock(&watches_lock); + err = xs_watch(watch->node, token); - if (!err) - list_add(&watch->list, &watches); + + /* Ignore errors due to multiple registration. */ + if ((err != 0) && (err != -EEXIST)) { + spin_lock(&watches_lock); + list_del(&watch->list); + spin_unlock(&watches_lock); + } + + up_read(&xs_state.suspend_mutex); + return err; } void unregister_xenbus_watch(struct xenbus_watch *watch) { + struct xs_stored_msg *msg, *tmp; char token[sizeof(watch) * 2 + 1]; int err; sprintf(token, "%lX", (long)watch); - //BUG_ON(!find_watch(token)); - err = xs_unwatch(watch->node, token); + down_read(&xs_state.suspend_mutex); + + spin_lock(&watches_lock); + // BUG_ON(!find_watch(token)); list_del(&watch->list); + spin_unlock(&watches_lock); + err = xs_unwatch(watch->node, token); if (err) printk("XENBUS Failed to release watch %s: %i\n", watch->node, err); + + up_read(&xs_state.suspend_mutex); + + /* Cancel pending watch events. */ + spin_lock(&watch_events_lock); + list_for_each_entry_safe(msg, tmp, &watch_events, list) { + if (msg->u.watch.handle != watch) + continue; + list_del(&msg->list); + free(msg->u.watch.vec); + free(msg); + } + spin_unlock(&watch_events_lock); +} + +void xs_suspend(void) +{ + down_write(&xs_state.suspend_mutex); + down(&xs_state.request_mutex); } -/* Re-register callbacks to all watches. */ -void reregister_xenbus_watches(void) +void xs_resume(void) { struct xenbus_watch *watch; char token[sizeof(watch) * 2 + 1]; + up(&xs_state.request_mutex); + + /* No need for watches_lock: the suspend_mutex is sufficient. */ list_for_each_entry(watch, &watches, list) { sprintf(token, "%lX", (long)watch); xs_watch(watch->node, token); } + + up_write(&xs_state.suspend_mutex); } -void watch_thread(void *unused) +static void xenwatch_thread(void *unused) { + struct list_head *ent; + struct xs_stored_msg *msg; + for (;;) { - char *token; - char *node = NULL; - - wait_event(xb_waitq, xs_input_avail()); - - /* If this is a spurious wakeup caused by someone - * doing an op, they'll hold the lock and the buffer - * will be empty by the time we get there. - */ - down(&xenbus_lock); - if (xs_input_avail()) - node = xs_read_watch(&token); - - if (node && !IS_ERR(node)) { - struct xenbus_watch *w; - int err; - - err = xs_acknowledge_watch(token); - if (err) - printk("XENBUS ack %s fail %i\n", node, err); - w = find_watch(token); - //BUG_ON(!w); - w->callback(w, node); - xfree(node); - } else - printk("XENBUS xs_read_watch: %li\n", PTR_ERR(node)); - up(&xenbus_lock); + wait_event(watch_events_waitq, + !list_empty(&watch_events)); + + down(&xenwatch_mutex); + + spin_lock(&watch_events_lock); + ent = watch_events.next; + if (ent != &watch_events) + list_del(ent); + spin_unlock(&watch_events_lock); + + if (ent != &watch_events) { + msg = list_entry(ent, struct xs_stored_msg, list); + msg->u.watch.handle->callback( + msg->u.watch.handle, + (const char **)msg->u.watch.vec, + msg->u.watch.vec_size); + free(msg->u.watch.vec); + free(msg); + } + + up(&xenwatch_mutex); } } - -static void ballon_changed(struct xenbus_watch *watch, const char *node) +static int process_msg(void) { - unsigned long new_target; - int err; - err = xenbus_scanf("memory", "target", "%lu", &new_target); + struct xs_stored_msg *msg; + char *body; + int err; - if(err != 1) - { - printk("Unable to read memory/target\n"); - return; - } + msg = malloc(sizeof(*msg)); + if (msg == NULL) + return -ENOMEM; - printk("Memory target changed to: %ld bytes, ignoring.\n", new_target); -} + err = xb_read(&msg->hdr, sizeof(msg->hdr)); + if (err) { + free(msg); + return err; + } + body = malloc(msg->hdr.len + 1); + if (body == NULL) { + free(msg); + return -ENOMEM; + } -static struct xenbus_watch ballon_watch = { - .node = "memory/target", - .callback = ballon_changed, -}; + err = xb_read(body, msg->hdr.len); + if (err) { + free(body); + free(msg); + return err; + } + body[msg->hdr.len] = '\0'; + + if (msg->hdr.type == XS_WATCH_EVENT) { + msg->u.watch.vec = split(body, msg->hdr.len, + &msg->u.watch.vec_size); + if (IS_ERR(msg->u.watch.vec)) { + free(msg); + return PTR_ERR(msg->u.watch.vec); + } + + spin_lock(&watches_lock); + msg->u.watch.handle = find_watch( + msg->u.watch.vec[XS_WATCH_TOKEN]); + if (msg->u.watch.handle != NULL) { + spin_lock(&watch_events_lock); + list_add_tail(&msg->list, &watch_events); + wake_up(&watch_events_waitq); + spin_unlock(&watch_events_lock); + } else { + free(msg->u.watch.vec); + free(msg); + } + spin_unlock(&watches_lock); + } else { + msg->u.reply.body = body; + spin_lock(&xs_state.reply_lock); + list_add_tail(&msg->list, &xs_state.reply_list); + spin_unlock(&xs_state.reply_lock); + wake_up(&xs_state.reply_waitq); + } + return 0; +} + +static void xenbus_thread(void *unused) +{ + int err; + for (;;) { + err = process_msg(); + if (err) + printk("XENBUS error %d while reading " + "message\n", err); + } +} int xs_init(void) { int err; - struct thread *watcher; - printk("xb_init_comms\n"); + struct thread *kxwatcher_thread; + struct thread *kxenbus_thread; + + INIT_LIST_HEAD(&xs_state.reply_list); + spin_lock_init(&xs_state.reply_lock); + init_waitqueue_head(&xs_state.reply_waitq); + + init_MUTEX(&xs_state.request_mutex); + init_rwsem(&xs_state.suspend_mutex); + + /* Initialize the shared memory rings to talk to xenstored */ err = xb_init_comms(); if (err) return err; - - watcher = create_thread("kxwatch", watch_thread, NULL); - down(&xenbus_lock); - register_xenbus_watch(&ballon_watch); - up(&xenbus_lock); + + kxwatcher_thread = create_thread("kxwatch", xenwatch_thread, NULL); + if (IS_ERR(kxwatcher_thread)) + return PTR_ERR(kxwatcher_thread); + + kxenbus_thread = create_thread("kxenbus", xenbus_thread, NULL); + if (IS_ERR(kxenbus_thread)) + return PTR_ERR(kxenbus_thread); + return 0; } -- cgit v1.2.3