aboutsummaryrefslogtreecommitdiffstats
path: root/tools/xenstore/xenstored_watch.c
diff options
context:
space:
mode:
authorcl349@firebug.cl.cam.ac.uk <cl349@firebug.cl.cam.ac.uk>2005-07-26 15:20:09 +0000
committercl349@firebug.cl.cam.ac.uk <cl349@firebug.cl.cam.ac.uk>2005-07-26 15:20:09 +0000
commitef071736546681facfdc36617632bdafd6c4d4a4 (patch)
tree49b258982283aa0aa0b9017834b6dbffa92b4927 /tools/xenstore/xenstored_watch.c
parente04c630cb24b65d871f061074bea23174cadf841 (diff)
downloadxen-ef071736546681facfdc36617632bdafd6c4d4a4.tar.gz
xen-ef071736546681facfdc36617632bdafd6c4d4a4.tar.bz2
xen-ef071736546681facfdc36617632bdafd6c4d4a4.zip
Change watches: operations block until everyone has acked.
Watch events are no longer sent to self Watches no longer take a priority async and asyncwait commands for xs_test, now we need to continue despite blocking ops. Print test name at end of verbose run on failure. Use --trace-file arg to xenstored when testing Signed-off-by: Rusty Russel <rusty@rustcorp.com.au> Signed-off-by: Christian Limpach <Christian.Limpach@cl.cam.ac.uk>
Diffstat (limited to 'tools/xenstore/xenstored_watch.c')
-rw-r--r--tools/xenstore/xenstored_watch.c291
1 files changed, 95 insertions, 196 deletions
diff --git a/tools/xenstore/xenstored_watch.c b/tools/xenstore/xenstored_watch.c
index 205b70399c..c532da26a8 100644
--- a/tools/xenstore/xenstored_watch.c
+++ b/tools/xenstore/xenstored_watch.c
@@ -33,69 +33,39 @@
#include "xenstored_domain.h"
/* FIXME: time out unacked watches. */
-
-/* We create this if anyone is interested "node", then we pass it from
- * watch to watch as each connection acks it.
- */
struct watch_event
{
- /* The watch we are firing for (watch->events) */
+ /* The events on this watch. */
struct list_head list;
- /* Watches we need to fire for (watches[0]->events == this). */
- struct watch **watches;
- unsigned int num_watches;
-
- struct timeval timeout;
-
- /* Name of node which changed. */
- char *node;
+ /* Data to send (node\0token\0). */
+ unsigned int len;
+ char *data;
- /* For remove, we trigger on all the children of this node too. */
- bool recurse;
+ /* Connection which caused watch event (which we are blocking) */
+ struct connection *cause;
};
struct watch
{
+ /* Watches on this connection */
struct list_head list;
- unsigned int priority;
/* Current outstanding events applying to this watch. */
struct list_head events;
/* Is this relative to connnection's implicit path? */
- bool relative;
+ const char *relative_path;
char *token;
char *node;
- struct connection *conn;
};
-static LIST_HEAD(watches);
-
-static struct watch_event *get_first_event(struct connection *conn)
-{
- struct watch *watch;
- struct watch_event *event;
-
- /* Find first watch with an event. */
- list_for_each_entry(watch, &watches, list) {
- if (watch->conn != conn)
- continue;
-
- event = list_top(&watch->events, struct watch_event, list);
- if (event)
- return event;
- }
- return NULL;
-}
/* Look through our watches: if any of them have an event, queue it. */
void queue_next_event(struct connection *conn)
{
struct watch_event *event;
- const char *node;
- char *buffer;
- unsigned int len;
+ struct watch *watch;
/* We had a reply queued already? Send it: other end will
* discard watch. */
@@ -110,170 +80,93 @@ void queue_next_event(struct connection *conn)
if (conn->waiting_for_ack)
return;
- event = get_first_event(conn);
- if (!event)
- return;
-
- /* If we decide to cancel, we will reset this. */
- conn->waiting_for_ack = event->watches[0];
-
- /* If we deleted /foo and they're watching /foo/bar, that's what we
- * tell them has changed. */
- if (!is_child(event->node, event->watches[0]->node)) {
- assert(event->recurse);
- node = event->watches[0]->node;
- } else
- node = event->node;
-
- /* If watch placed using relative path, give them relative answer. */
- if (event->watches[0]->relative) {
- node += strlen(get_implicit_path(conn));
- if (node[0] == '/') /* Could be "". */
- node++;
- }
-
- /* Create reply from path and token */
- len = strlen(node) + 1 + strlen(event->watches[0]->token) + 1;
- buffer = talloc_array(conn, char, len);
- strcpy(buffer, node);
- strcpy(buffer+strlen(node)+1, event->watches[0]->token);
- send_reply(conn, XS_WATCH_EVENT, buffer, len);
- talloc_free(buffer);
-}
-
-static struct watch **find_watches(const char *node, bool recurse,
- unsigned int *num)
-{
- struct watch *i;
- struct watch **ret = NULL;
-
- *num = 0;
-
- /* We include children too if this is an rm. */
- list_for_each_entry(i, &watches, list) {
- if (is_child(node, i->node) ||
- (recurse && is_child(i->node, node))) {
- (*num)++;
- ret = talloc_realloc(node, ret, struct watch *, *num);
- ret[*num - 1] = i;
+ list_for_each_entry(watch, &conn->watches, list) {
+ event = list_top(&watch->events, struct watch_event, list);
+ if (event) {
+ conn->waiting_for_ack = watch;
+ send_reply(conn,XS_WATCH_EVENT,event->data,event->len);
+ break;
}
}
- return ret;
}
-/* FIXME: we fail to fire on out of memory. Should drop connections. */
-void fire_watches(struct transaction *trans, const char *node, bool recurse)
+static int destroy_watch_event(void *_event)
{
- struct watch **watches;
- struct watch_event *event;
- unsigned int num_watches;
+ struct watch_event *event = _event;
- /* During transactions, don't fire watches. */
- if (trans)
- return;
-
- watches = find_watches(node, recurse, &num_watches);
- if (!watches)
- return;
-
- /* Create and fill in info about event. */
- event = talloc(talloc_autofree_context(), struct watch_event);
- event->node = talloc_strdup(event, node);
-
- /* Tie event to this watch. */
- event->watches = watches;
- talloc_steal(event, watches);
- event->num_watches = num_watches;
- event->recurse = recurse;
- list_add_tail(&event->list, &watches[0]->events);
-
- /* Warn if not finished after thirty seconds. */
- gettimeofday(&event->timeout, NULL);
- event->timeout.tv_sec += 30;
-
- /* If connection not doing anything, queue this. */
- if (!watches[0]->conn->out)
- queue_next_event(watches[0]->conn);
+ trace_destroy(event, "watch_event");
+ assert(event->cause->watches_unacked != 0);
+ /* If it hits zero, will unblock in unblock_connections. */
+ event->cause->watches_unacked--;
+ return 0;
}
-/* We're done with this event: see if anyone else wants it. */
-static void move_event_onwards(struct watch_event *event)
+static void add_event(struct connection *cause, struct watch *watch,
+ const char *node)
{
- list_del(&event->list);
+ struct watch_event *event;
- event->num_watches--;
- event->watches++;
- if (!event->num_watches) {
- talloc_free(event);
- return;
+ if (watch->relative_path) {
+ node += strlen(watch->relative_path);
+ if (*node == '/') /* Could be "" */
+ node++;
}
- list_add_tail(&event->list, &event->watches[0]->events);
-
- /* If connection not doing anything, queue this. */
- if (!event->watches[0]->conn->out)
- queue_next_event(event->watches[0]->conn);
+ event = talloc(watch, struct watch_event);
+ event->len = strlen(node) + 1 + strlen(watch->token) + 1;
+ event->data = talloc_array(event, char, event->len);
+ strcpy(event->data, node);
+ strcpy(event->data + strlen(node) + 1, watch->token);
+ event->cause = cause;
+ cause->watches_unacked++;
+ talloc_set_destructor(event, destroy_watch_event);
+ list_add_tail(&event->list, &watch->events);
+ trace_create(event, "watch_event");
}
-static void remove_watch_from_events(struct watch *dying_watch)
+/* FIXME: we fail to fire on out of memory. Should drop connections. */
+bool fire_watches(struct connection *conn, const char *node, bool recurse)
{
+ struct connection *i;
struct watch *watch;
- struct watch_event *event;
- unsigned int i;
- list_for_each_entry(watch, &watches, list) {
- list_for_each_entry(event, &watch->events, list) {
- for (i = 0; i < event->num_watches; i++) {
- if (event->watches[i] != dying_watch)
- continue;
-
- assert(i != 0);
- memmove(event->watches+i,
- event->watches+i+1,
- (event->num_watches - (i+1))
- * sizeof(struct watch *));
- event->num_watches--;
- }
+ /* During transactions, don't fire watches. */
+ if (conn->transaction)
+ return false;
+
+ assert(conn->state == OK);
+
+ /* Create an event for each watch. Don't send to self. */
+ list_for_each_entry(i, &connections, list) {
+ if (i == conn)
+ continue;
+
+ list_for_each_entry(watch, &i->watches, list) {
+ if (is_child(node, watch->node))
+ add_event(conn, watch, node);
+ else if (recurse && is_child(watch->node, node))
+ add_event(conn, watch, watch->node);
+ else
+ continue;
+ conn->state = WATCHED;
+ /* If connection not doing anything, queue this. */
+ if (!i->out)
+ queue_next_event(i);
}
}
+ return conn->state == WATCHED;
}
static int destroy_watch(void *_watch)
{
- struct watch *watch = _watch;
- struct watch_event *event;
-
- /* If we have pending events, pass them on to others. */
- while ((event = list_top(&watch->events, struct watch_event, list)))
- move_event_onwards(event);
-
- /* Remove from global list. */
- list_del(&watch->list);
-
- /* Other events which match this watch must be cleared. */
- remove_watch_from_events(watch);
-
- trace_destroy(watch, "watch");
+ trace_destroy(_watch, "watch");
return 0;
}
-/* We keep watches in priority order. */
-static void insert_watch(struct watch *watch)
-{
- struct watch *i;
-
- list_for_each_entry(i, &watches, list) {
- if (i->priority <= watch->priority) {
- list_add_tail(&watch->list, &i->list);
- return;
- }
- }
-
- list_add_tail(&watch->list, &watches);
-}
-
void shortest_watch_ack_timeout(struct timeval *tv)
{
+ (void)tv;
+#if 0 /* FIXME */
struct watch *watch;
list_for_each_entry(watch, &watches, list) {
@@ -285,10 +178,12 @@ void shortest_watch_ack_timeout(struct timeval *tv)
*tv = i->timeout;
}
}
+#endif
}
void check_watch_ack_timeout(void)
{
+#if 0
struct watch *watch;
struct timeval now;
@@ -308,12 +203,13 @@ void check_watch_ack_timeout(void)
}
}
}
+#endif
}
void do_watch(struct connection *conn, struct buffered_data *in)
{
struct watch *watch;
- char *vec[3];
+ char *vec[2];
bool relative;
if (get_strings(in, vec, ARRAY_SIZE(vec)) != ARRAY_SIZE(vec)) {
@@ -331,14 +227,16 @@ void do_watch(struct connection *conn, struct buffered_data *in)
watch = talloc(conn, struct watch);
watch->node = talloc_strdup(watch, vec[0]);
watch->token = talloc_strdup(watch, vec[1]);
- watch->conn = conn;
- watch->priority = strtoul(vec[2], NULL, 0);
- watch->relative = relative;
+ if (relative)
+ watch->relative_path = get_implicit_path(conn);
+ else
+ watch->relative_path = NULL;
+
INIT_LIST_HEAD(&watch->events);
- insert_watch(watch);
- talloc_set_destructor(watch, destroy_watch);
+ list_add_tail(&watch->list, &conn->watches);
trace_create(watch, "watch");
+ talloc_set_destructor(watch, destroy_watch);
send_ack(conn, XS_WATCH);
}
@@ -356,9 +254,6 @@ void do_watch_ack(struct connection *conn, const char *token)
return;
}
- event = list_top(&conn->waiting_for_ack->events,
- struct watch_event, list);
- assert(event->watches[0] == conn->waiting_for_ack);
if (!streq(conn->waiting_for_ack->token, token)) {
/* They're confused: this will cause us to send event again */
conn->waiting_for_ack = NULL;
@@ -366,7 +261,12 @@ void do_watch_ack(struct connection *conn, const char *token)
return;
}
- move_event_onwards(event);
+ /* Remove event: after ack sent, core will call queue_next_event */
+ event = list_top(&conn->waiting_for_ack->events, struct watch_event,
+ list);
+ list_del(&event->list);
+ talloc_free(event);
+
conn->waiting_for_ack = NULL;
send_ack(conn, XS_WATCH_ACK);
}
@@ -385,11 +285,9 @@ void do_unwatch(struct connection *conn, struct buffered_data *in)
* watch we're deleting: conn->waiting_for_ack was reset by
* this command in consider_message anyway. */
node = canonicalize(conn, vec[0]);
- list_for_each_entry(watch, &watches, list) {
- if (watch->conn != conn)
- continue;
-
+ list_for_each_entry(watch, &conn->watches, list) {
if (streq(watch->node, node) && streq(watch->token, vec[1])) {
+ list_del(&watch->list);
talloc_free(watch);
send_ack(conn, XS_UNWATCH);
return;
@@ -404,15 +302,16 @@ void dump_watches(struct connection *conn)
struct watch *watch;
struct watch_event *event;
- /* Find first watch with an event. */
- list_for_each_entry(watch, &watches, list) {
- if (watch->conn != conn)
- continue;
+ if (conn->waiting_for_ack)
+ printf(" waiting_for_ack for watch on %s token %s\n",
+ conn->waiting_for_ack->node,
+ conn->waiting_for_ack->token);
- printf(" watch on %s token %s prio %i\n",
- watch->node, watch->token, watch->priority);
+ list_for_each_entry(watch, &conn->watches, list) {
+ printf(" watch on %s token %s\n",
+ watch->node, watch->token);
list_for_each_entry(event, &watch->events, list)
- printf(" event: %s\n", event->node);
+ printf(" event: %s\n", event->data);
}
}
#endif