diff options
author | cl349@firebug.cl.cam.ac.uk <cl349@firebug.cl.cam.ac.uk> | 2005-07-26 15:20:09 +0000 |
---|---|---|
committer | cl349@firebug.cl.cam.ac.uk <cl349@firebug.cl.cam.ac.uk> | 2005-07-26 15:20:09 +0000 |
commit | ef071736546681facfdc36617632bdafd6c4d4a4 (patch) | |
tree | 49b258982283aa0aa0b9017834b6dbffa92b4927 /tools/xenstore/xenstored_watch.c | |
parent | e04c630cb24b65d871f061074bea23174cadf841 (diff) | |
download | xen-ef071736546681facfdc36617632bdafd6c4d4a4.tar.gz xen-ef071736546681facfdc36617632bdafd6c4d4a4.tar.bz2 xen-ef071736546681facfdc36617632bdafd6c4d4a4.zip |
Change watches: operations block until everyone has acked.
Watch events are no longer sent to self
Watches no longer take a priority
async and asyncwait commands for xs_test, now we need to continue
despite blocking ops.
Print test name at end of verbose run on failure.
Use --trace-file arg to xenstored when testing
Signed-off-by: Rusty Russel <rusty@rustcorp.com.au>
Signed-off-by: Christian Limpach <Christian.Limpach@cl.cam.ac.uk>
Diffstat (limited to 'tools/xenstore/xenstored_watch.c')
-rw-r--r-- | tools/xenstore/xenstored_watch.c | 291 |
1 files changed, 95 insertions, 196 deletions
diff --git a/tools/xenstore/xenstored_watch.c b/tools/xenstore/xenstored_watch.c index 205b70399c..c532da26a8 100644 --- a/tools/xenstore/xenstored_watch.c +++ b/tools/xenstore/xenstored_watch.c @@ -33,69 +33,39 @@ #include "xenstored_domain.h" /* FIXME: time out unacked watches. */ - -/* We create this if anyone is interested "node", then we pass it from - * watch to watch as each connection acks it. - */ struct watch_event { - /* The watch we are firing for (watch->events) */ + /* The events on this watch. */ struct list_head list; - /* Watches we need to fire for (watches[0]->events == this). */ - struct watch **watches; - unsigned int num_watches; - - struct timeval timeout; - - /* Name of node which changed. */ - char *node; + /* Data to send (node\0token\0). */ + unsigned int len; + char *data; - /* For remove, we trigger on all the children of this node too. */ - bool recurse; + /* Connection which caused watch event (which we are blocking) */ + struct connection *cause; }; struct watch { + /* Watches on this connection */ struct list_head list; - unsigned int priority; /* Current outstanding events applying to this watch. */ struct list_head events; /* Is this relative to connnection's implicit path? */ - bool relative; + const char *relative_path; char *token; char *node; - struct connection *conn; }; -static LIST_HEAD(watches); - -static struct watch_event *get_first_event(struct connection *conn) -{ - struct watch *watch; - struct watch_event *event; - - /* Find first watch with an event. */ - list_for_each_entry(watch, &watches, list) { - if (watch->conn != conn) - continue; - - event = list_top(&watch->events, struct watch_event, list); - if (event) - return event; - } - return NULL; -} /* Look through our watches: if any of them have an event, queue it. */ void queue_next_event(struct connection *conn) { struct watch_event *event; - const char *node; - char *buffer; - unsigned int len; + struct watch *watch; /* We had a reply queued already? Send it: other end will * discard watch. */ @@ -110,170 +80,93 @@ void queue_next_event(struct connection *conn) if (conn->waiting_for_ack) return; - event = get_first_event(conn); - if (!event) - return; - - /* If we decide to cancel, we will reset this. */ - conn->waiting_for_ack = event->watches[0]; - - /* If we deleted /foo and they're watching /foo/bar, that's what we - * tell them has changed. */ - if (!is_child(event->node, event->watches[0]->node)) { - assert(event->recurse); - node = event->watches[0]->node; - } else - node = event->node; - - /* If watch placed using relative path, give them relative answer. */ - if (event->watches[0]->relative) { - node += strlen(get_implicit_path(conn)); - if (node[0] == '/') /* Could be "". */ - node++; - } - - /* Create reply from path and token */ - len = strlen(node) + 1 + strlen(event->watches[0]->token) + 1; - buffer = talloc_array(conn, char, len); - strcpy(buffer, node); - strcpy(buffer+strlen(node)+1, event->watches[0]->token); - send_reply(conn, XS_WATCH_EVENT, buffer, len); - talloc_free(buffer); -} - -static struct watch **find_watches(const char *node, bool recurse, - unsigned int *num) -{ - struct watch *i; - struct watch **ret = NULL; - - *num = 0; - - /* We include children too if this is an rm. */ - list_for_each_entry(i, &watches, list) { - if (is_child(node, i->node) || - (recurse && is_child(i->node, node))) { - (*num)++; - ret = talloc_realloc(node, ret, struct watch *, *num); - ret[*num - 1] = i; + list_for_each_entry(watch, &conn->watches, list) { + event = list_top(&watch->events, struct watch_event, list); + if (event) { + conn->waiting_for_ack = watch; + send_reply(conn,XS_WATCH_EVENT,event->data,event->len); + break; } } - return ret; } -/* FIXME: we fail to fire on out of memory. Should drop connections. */ -void fire_watches(struct transaction *trans, const char *node, bool recurse) +static int destroy_watch_event(void *_event) { - struct watch **watches; - struct watch_event *event; - unsigned int num_watches; + struct watch_event *event = _event; - /* During transactions, don't fire watches. */ - if (trans) - return; - - watches = find_watches(node, recurse, &num_watches); - if (!watches) - return; - - /* Create and fill in info about event. */ - event = talloc(talloc_autofree_context(), struct watch_event); - event->node = talloc_strdup(event, node); - - /* Tie event to this watch. */ - event->watches = watches; - talloc_steal(event, watches); - event->num_watches = num_watches; - event->recurse = recurse; - list_add_tail(&event->list, &watches[0]->events); - - /* Warn if not finished after thirty seconds. */ - gettimeofday(&event->timeout, NULL); - event->timeout.tv_sec += 30; - - /* If connection not doing anything, queue this. */ - if (!watches[0]->conn->out) - queue_next_event(watches[0]->conn); + trace_destroy(event, "watch_event"); + assert(event->cause->watches_unacked != 0); + /* If it hits zero, will unblock in unblock_connections. */ + event->cause->watches_unacked--; + return 0; } -/* We're done with this event: see if anyone else wants it. */ -static void move_event_onwards(struct watch_event *event) +static void add_event(struct connection *cause, struct watch *watch, + const char *node) { - list_del(&event->list); + struct watch_event *event; - event->num_watches--; - event->watches++; - if (!event->num_watches) { - talloc_free(event); - return; + if (watch->relative_path) { + node += strlen(watch->relative_path); + if (*node == '/') /* Could be "" */ + node++; } - list_add_tail(&event->list, &event->watches[0]->events); - - /* If connection not doing anything, queue this. */ - if (!event->watches[0]->conn->out) - queue_next_event(event->watches[0]->conn); + event = talloc(watch, struct watch_event); + event->len = strlen(node) + 1 + strlen(watch->token) + 1; + event->data = talloc_array(event, char, event->len); + strcpy(event->data, node); + strcpy(event->data + strlen(node) + 1, watch->token); + event->cause = cause; + cause->watches_unacked++; + talloc_set_destructor(event, destroy_watch_event); + list_add_tail(&event->list, &watch->events); + trace_create(event, "watch_event"); } -static void remove_watch_from_events(struct watch *dying_watch) +/* FIXME: we fail to fire on out of memory. Should drop connections. */ +bool fire_watches(struct connection *conn, const char *node, bool recurse) { + struct connection *i; struct watch *watch; - struct watch_event *event; - unsigned int i; - list_for_each_entry(watch, &watches, list) { - list_for_each_entry(event, &watch->events, list) { - for (i = 0; i < event->num_watches; i++) { - if (event->watches[i] != dying_watch) - continue; - - assert(i != 0); - memmove(event->watches+i, - event->watches+i+1, - (event->num_watches - (i+1)) - * sizeof(struct watch *)); - event->num_watches--; - } + /* During transactions, don't fire watches. */ + if (conn->transaction) + return false; + + assert(conn->state == OK); + + /* Create an event for each watch. Don't send to self. */ + list_for_each_entry(i, &connections, list) { + if (i == conn) + continue; + + list_for_each_entry(watch, &i->watches, list) { + if (is_child(node, watch->node)) + add_event(conn, watch, node); + else if (recurse && is_child(watch->node, node)) + add_event(conn, watch, watch->node); + else + continue; + conn->state = WATCHED; + /* If connection not doing anything, queue this. */ + if (!i->out) + queue_next_event(i); } } + return conn->state == WATCHED; } static int destroy_watch(void *_watch) { - struct watch *watch = _watch; - struct watch_event *event; - - /* If we have pending events, pass them on to others. */ - while ((event = list_top(&watch->events, struct watch_event, list))) - move_event_onwards(event); - - /* Remove from global list. */ - list_del(&watch->list); - - /* Other events which match this watch must be cleared. */ - remove_watch_from_events(watch); - - trace_destroy(watch, "watch"); + trace_destroy(_watch, "watch"); return 0; } -/* We keep watches in priority order. */ -static void insert_watch(struct watch *watch) -{ - struct watch *i; - - list_for_each_entry(i, &watches, list) { - if (i->priority <= watch->priority) { - list_add_tail(&watch->list, &i->list); - return; - } - } - - list_add_tail(&watch->list, &watches); -} - void shortest_watch_ack_timeout(struct timeval *tv) { + (void)tv; +#if 0 /* FIXME */ struct watch *watch; list_for_each_entry(watch, &watches, list) { @@ -285,10 +178,12 @@ void shortest_watch_ack_timeout(struct timeval *tv) *tv = i->timeout; } } +#endif } void check_watch_ack_timeout(void) { +#if 0 struct watch *watch; struct timeval now; @@ -308,12 +203,13 @@ void check_watch_ack_timeout(void) } } } +#endif } void do_watch(struct connection *conn, struct buffered_data *in) { struct watch *watch; - char *vec[3]; + char *vec[2]; bool relative; if (get_strings(in, vec, ARRAY_SIZE(vec)) != ARRAY_SIZE(vec)) { @@ -331,14 +227,16 @@ void do_watch(struct connection *conn, struct buffered_data *in) watch = talloc(conn, struct watch); watch->node = talloc_strdup(watch, vec[0]); watch->token = talloc_strdup(watch, vec[1]); - watch->conn = conn; - watch->priority = strtoul(vec[2], NULL, 0); - watch->relative = relative; + if (relative) + watch->relative_path = get_implicit_path(conn); + else + watch->relative_path = NULL; + INIT_LIST_HEAD(&watch->events); - insert_watch(watch); - talloc_set_destructor(watch, destroy_watch); + list_add_tail(&watch->list, &conn->watches); trace_create(watch, "watch"); + talloc_set_destructor(watch, destroy_watch); send_ack(conn, XS_WATCH); } @@ -356,9 +254,6 @@ void do_watch_ack(struct connection *conn, const char *token) return; } - event = list_top(&conn->waiting_for_ack->events, - struct watch_event, list); - assert(event->watches[0] == conn->waiting_for_ack); if (!streq(conn->waiting_for_ack->token, token)) { /* They're confused: this will cause us to send event again */ conn->waiting_for_ack = NULL; @@ -366,7 +261,12 @@ void do_watch_ack(struct connection *conn, const char *token) return; } - move_event_onwards(event); + /* Remove event: after ack sent, core will call queue_next_event */ + event = list_top(&conn->waiting_for_ack->events, struct watch_event, + list); + list_del(&event->list); + talloc_free(event); + conn->waiting_for_ack = NULL; send_ack(conn, XS_WATCH_ACK); } @@ -385,11 +285,9 @@ void do_unwatch(struct connection *conn, struct buffered_data *in) * watch we're deleting: conn->waiting_for_ack was reset by * this command in consider_message anyway. */ node = canonicalize(conn, vec[0]); - list_for_each_entry(watch, &watches, list) { - if (watch->conn != conn) - continue; - + list_for_each_entry(watch, &conn->watches, list) { if (streq(watch->node, node) && streq(watch->token, vec[1])) { + list_del(&watch->list); talloc_free(watch); send_ack(conn, XS_UNWATCH); return; @@ -404,15 +302,16 @@ void dump_watches(struct connection *conn) struct watch *watch; struct watch_event *event; - /* Find first watch with an event. */ - list_for_each_entry(watch, &watches, list) { - if (watch->conn != conn) - continue; + if (conn->waiting_for_ack) + printf(" waiting_for_ack for watch on %s token %s\n", + conn->waiting_for_ack->node, + conn->waiting_for_ack->token); - printf(" watch on %s token %s prio %i\n", - watch->node, watch->token, watch->priority); + list_for_each_entry(watch, &conn->watches, list) { + printf(" watch on %s token %s\n", + watch->node, watch->token); list_for_each_entry(event, &watch->events, list) - printf(" event: %s\n", event->node); + printf(" event: %s\n", event->data); } } #endif |