diff options
Diffstat (limited to 'tools/xenstore/xenstored_watch.c')
-rw-r--r-- | tools/xenstore/xenstored_watch.c | 189 |
1 files changed, 124 insertions, 65 deletions
diff --git a/tools/xenstore/xenstored_watch.c b/tools/xenstore/xenstored_watch.c index 2df83e1a54..d0e00f53c2 100644 --- a/tools/xenstore/xenstored_watch.c +++ b/tools/xenstore/xenstored_watch.c @@ -21,6 +21,8 @@ #include <sys/types.h> #include <stdarg.h> #include <stdlib.h> +#include <sys/time.h> +#include <time.h> #include "talloc.h" #include "list.h" #include "xenstored_watch.h" @@ -28,6 +30,8 @@ #include "utils.h" #include "xenstored_test.h" +/* FIXME: time out unacked watches. */ + /* We create this if anyone is interested "node", then we pass it from * watch to watch as each connection acks it. */ @@ -39,7 +43,10 @@ struct watch_event /* Watch we are currently attached to. */ struct watch *watch; - struct buffered_data *data; + struct timeval timeout; + + /* Name of node which changed. */ + char *node; }; struct watch @@ -50,72 +57,63 @@ struct watch /* Current outstanding events applying to this watch. */ struct list_head events; + char *token; char *node; struct connection *conn; }; static LIST_HEAD(watches); -static void reset_event(struct watch_event *event) -{ - event->data->inhdr = true; - event->data->used = 0; -} - -/* We received a non-ACK response: re-queue any watch we just sent. */ -void reset_watch_event(struct connection *conn) -{ - if (waiting_for_ack(conn)) - reset_event(conn->event); -} - -/* We're waiting if we have an event and we sent it all. */ -bool waiting_for_ack(struct connection *conn) +static struct watch_event *get_first_event(struct connection *conn) { - if (!conn->event) - return false; + struct watch *watch; + struct watch_event *event; - if (conn->event->data->inhdr) - return false; - return conn->event->data->used == conn->event->data->hdr.msg.len; -} + /* Find first watch with an event. */ + list_for_each_entry(watch, &watches, list) { + if (watch->conn != conn) + continue; -bool is_watch_event(struct connection *conn, struct buffered_data *out) -{ - return (conn->event && out == conn->event->data); + event = list_top(&watch->events, struct watch_event, list); + if (event) + return event; + } + return NULL; } /* Look through our watches: if any of them have an event, queue it. */ void queue_next_event(struct connection *conn) { - struct watch *watch; + struct watch_event *event; + char *buffer; + unsigned int len; - /* We had a reply queued already? Send it. */ + /* We had a reply queued already? Send it: other end will + * discard watch. */ if (conn->waiting_reply) { conn->out = conn->waiting_reply; conn->waiting_reply = NULL; + conn->waiting_for_ack = false; return; } - /* If we're waiting for ack, don't queue more. */ - if (waiting_for_ack(conn)) + /* If we're already waiting for ack, don't queue more. */ + if (conn->waiting_for_ack) return; - /* Find a good event to send. */ - if (!conn->event) { - list_for_each_entry(watch, &watches, list) { - if (watch->conn != conn) - continue; + event = get_first_event(conn); + if (!event) + return; - conn->event = list_top(&watch->events, - struct watch_event, list); - if (conn->event) - break; - } - if (!conn->event) - return; - } + /* If we decide to cancel, we will reset this. */ + conn->waiting_for_ack = true; - conn->out = conn->event->data; + /* Create reply from path and token */ + len = strlen(event->node) + 1 + strlen(event->watch->token) + 1; + buffer = talloc_array(conn, char, len); + strcpy(buffer, event->node); + strcpy(buffer+strlen(event->node)+1, event->watch->token); + send_reply(conn, XS_WATCH_EVENT, buffer, len); + talloc_free(buffer); } /* Watch on DIR applies to DIR, DIR/FILE, but not DIRLONG. */ @@ -160,14 +158,15 @@ void fire_watches(struct transaction *trans, const char *node) /* Create and fill in info about event. */ event = talloc(talloc_autofree_context(), struct watch_event); - event->data = new_buffer(event); - event->data->hdr.msg.type = XS_WATCH_EVENT; - event->data->hdr.msg.len = strlen(node) + 1; - event->data->buffer = talloc_strdup(event->data, node); + event->node = talloc_strdup(event, node); /* Tie event to this watch. */ event->watch = watch; - list_add(&event->list, &watch->events); + list_add_tail(&event->list, &watch->events); + + /* Warn if not finished after thirty seconds. */ + gettimeofday(&event->timeout, NULL); + event->timeout.tv_sec += 30; /* If connection not doing anything, queue this. */ if (!watch->conn->out) @@ -178,16 +177,15 @@ void fire_watches(struct transaction *trans, const char *node) static void move_event_onwards(struct watch_event *event) { list_del(&event->list); - reset_event(event); /* Remove from this watch, and find next watch to put this on. */ - event->watch = find_next_watch(event->watch, event->data->buffer); + event->watch = find_next_watch(event->watch, event->node); if (!event->watch) { talloc_free(event); return; } - list_add(&event->list, &event->watch->events); + list_add_tail(&event->list, &event->watch->events); /* If connection not doing anything, queue this. */ if (!event->watch->conn->out) @@ -199,10 +197,6 @@ static int destroy_watch(void *_watch) struct watch *watch = _watch; struct watch_event *event; - /* Forget about sending out or waiting for acks for this watch. */ - if (watch->conn->event && watch->conn->event->watch == watch) - watch->conn->event = NULL; - /* If we have pending events, pass them on to others. */ while ((event = list_top(&watch->events, struct watch_event, list))) move_event_onwards(event); @@ -227,21 +221,59 @@ static void insert_watch(struct watch *watch) list_add_tail(&watch->list, &watches); } +void shortest_watch_ack_timeout(struct timeval *tv) +{ + struct watch *watch; + + list_for_each_entry(watch, &watches, list) { + struct watch_event *i; + list_for_each_entry(i, &watch->events, list) { + if (!timerisset(&i->timeout)) + continue; + if (!timerisset(tv) || timercmp(&i->timeout, tv, <)) + *tv = i->timeout; + } + } +} + +void check_watch_ack_timeout(void) +{ + struct watch *watch; + struct timeval now; + + gettimeofday(&now, NULL); + list_for_each_entry(watch, &watches, list) { + struct watch_event *i, *tmp; + list_for_each_entry_safe(i, tmp, &watch->events, list) { + if (!timerisset(&i->timeout)) + continue; + if (timercmp(&i->timeout, &now, <)) { + xprintf("Warning: timeout on watch event %s" + " token %s\n", + i->node, watch->token); + timerclear(&i->timeout); + } + } + } +} + bool do_watch(struct connection *conn, struct buffered_data *in) { struct watch *watch; - char *vec[2]; + char *vec[3]; if (get_strings(in, vec, ARRAY_SIZE(vec)) != ARRAY_SIZE(vec)) return send_error(conn, EINVAL); + vec[0] = canonicalize(conn, vec[0]); if (!check_node_perms(conn, vec[0], XS_PERM_READ)) return send_error(conn, errno); watch = talloc(conn, struct watch); watch->node = talloc_strdup(watch, vec[0]); + watch->token = talloc_strdup(watch, vec[1]); watch->conn = conn; - watch->priority = strtoul(vec[1], NULL, 0); + watch->priority = strtoul(vec[2], NULL, 0); INIT_LIST_HEAD(&watch->events); insert_watch(watch); @@ -249,31 +281,58 @@ bool do_watch(struct connection *conn, struct buffered_data *in) return send_ack(conn, XS_WATCH); } -bool do_watch_ack(struct connection *conn) +bool do_watch_ack(struct connection *conn, const char *token) { struct watch_event *event; - if (!waiting_for_ack(conn)) + if (!conn->waiting_for_ack) return send_error(conn, ENOENT); - /* Remove this watch event. */ - event = conn->event; - conn->event = NULL; + event = get_first_event(conn); + if (!streq(event->watch->token, token)) + return send_error(conn, EINVAL); move_event_onwards(event); + conn->waiting_for_ack = false; return send_ack(conn, XS_WATCH_ACK); } -bool do_unwatch(struct connection *conn, const char *node) +bool do_unwatch(struct connection *conn, struct buffered_data *in) { struct watch *watch; + char *node, *vec[2]; + + if (get_strings(in, vec, ARRAY_SIZE(vec)) != ARRAY_SIZE(vec)) + return send_error(conn, EINVAL); + node = canonicalize(conn, vec[0]); list_for_each_entry(watch, &watches, list) { - if (watch->conn == conn - && streq(watch->node, node)) { + if (watch->conn != conn) + continue; + + if (streq(watch->node, node) && streq(watch->token, vec[1])) { talloc_free(watch); return send_ack(conn, XS_UNWATCH); } } return send_error(conn, ENOENT); } + +#ifdef TESTING +void dump_watches(struct connection *conn) +{ + struct watch *watch; + struct watch_event *event; + + /* Find first watch with an event. */ + list_for_each_entry(watch, &watches, list) { + if (watch->conn != conn) + continue; + + printf(" watch on %s token %s prio %i\n", + watch->node, watch->token, watch->priority); + list_for_each_entry(event, &watch->events, list) + printf(" event: %s\n", event->node); + } +} +#endif |