aboutsummaryrefslogtreecommitdiffstats
path: root/tools/xenstore/xenstored_watch.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/xenstore/xenstored_watch.c')
-rw-r--r--tools/xenstore/xenstored_watch.c189
1 files changed, 124 insertions, 65 deletions
diff --git a/tools/xenstore/xenstored_watch.c b/tools/xenstore/xenstored_watch.c
index 2df83e1a54..d0e00f53c2 100644
--- a/tools/xenstore/xenstored_watch.c
+++ b/tools/xenstore/xenstored_watch.c
@@ -21,6 +21,8 @@
#include <sys/types.h>
#include <stdarg.h>
#include <stdlib.h>
+#include <sys/time.h>
+#include <time.h>
#include "talloc.h"
#include "list.h"
#include "xenstored_watch.h"
@@ -28,6 +30,8 @@
#include "utils.h"
#include "xenstored_test.h"
+/* FIXME: time out unacked watches. */
+
/* We create this if anyone is interested "node", then we pass it from
* watch to watch as each connection acks it.
*/
@@ -39,7 +43,10 @@ struct watch_event
/* Watch we are currently attached to. */
struct watch *watch;
- struct buffered_data *data;
+ struct timeval timeout;
+
+ /* Name of node which changed. */
+ char *node;
};
struct watch
@@ -50,72 +57,63 @@ struct watch
/* Current outstanding events applying to this watch. */
struct list_head events;
+ char *token;
char *node;
struct connection *conn;
};
static LIST_HEAD(watches);
-static void reset_event(struct watch_event *event)
-{
- event->data->inhdr = true;
- event->data->used = 0;
-}
-
-/* We received a non-ACK response: re-queue any watch we just sent. */
-void reset_watch_event(struct connection *conn)
-{
- if (waiting_for_ack(conn))
- reset_event(conn->event);
-}
-
-/* We're waiting if we have an event and we sent it all. */
-bool waiting_for_ack(struct connection *conn)
+static struct watch_event *get_first_event(struct connection *conn)
{
- if (!conn->event)
- return false;
+ struct watch *watch;
+ struct watch_event *event;
- if (conn->event->data->inhdr)
- return false;
- return conn->event->data->used == conn->event->data->hdr.msg.len;
-}
+ /* Find first watch with an event. */
+ list_for_each_entry(watch, &watches, list) {
+ if (watch->conn != conn)
+ continue;
-bool is_watch_event(struct connection *conn, struct buffered_data *out)
-{
- return (conn->event && out == conn->event->data);
+ event = list_top(&watch->events, struct watch_event, list);
+ if (event)
+ return event;
+ }
+ return NULL;
}
/* Look through our watches: if any of them have an event, queue it. */
void queue_next_event(struct connection *conn)
{
- struct watch *watch;
+ struct watch_event *event;
+ char *buffer;
+ unsigned int len;
- /* We had a reply queued already? Send it. */
+ /* We had a reply queued already? Send it: other end will
+ * discard watch. */
if (conn->waiting_reply) {
conn->out = conn->waiting_reply;
conn->waiting_reply = NULL;
+ conn->waiting_for_ack = false;
return;
}
- /* If we're waiting for ack, don't queue more. */
- if (waiting_for_ack(conn))
+ /* If we're already waiting for ack, don't queue more. */
+ if (conn->waiting_for_ack)
return;
- /* Find a good event to send. */
- if (!conn->event) {
- list_for_each_entry(watch, &watches, list) {
- if (watch->conn != conn)
- continue;
+ event = get_first_event(conn);
+ if (!event)
+ return;
- conn->event = list_top(&watch->events,
- struct watch_event, list);
- if (conn->event)
- break;
- }
- if (!conn->event)
- return;
- }
+ /* If we decide to cancel, we will reset this. */
+ conn->waiting_for_ack = true;
- conn->out = conn->event->data;
+ /* Create reply from path and token */
+ len = strlen(event->node) + 1 + strlen(event->watch->token) + 1;
+ buffer = talloc_array(conn, char, len);
+ strcpy(buffer, event->node);
+ strcpy(buffer+strlen(event->node)+1, event->watch->token);
+ send_reply(conn, XS_WATCH_EVENT, buffer, len);
+ talloc_free(buffer);
}
/* Watch on DIR applies to DIR, DIR/FILE, but not DIRLONG. */
@@ -160,14 +158,15 @@ void fire_watches(struct transaction *trans, const char *node)
/* Create and fill in info about event. */
event = talloc(talloc_autofree_context(), struct watch_event);
- event->data = new_buffer(event);
- event->data->hdr.msg.type = XS_WATCH_EVENT;
- event->data->hdr.msg.len = strlen(node) + 1;
- event->data->buffer = talloc_strdup(event->data, node);
+ event->node = talloc_strdup(event, node);
/* Tie event to this watch. */
event->watch = watch;
- list_add(&event->list, &watch->events);
+ list_add_tail(&event->list, &watch->events);
+
+ /* Warn if not finished after thirty seconds. */
+ gettimeofday(&event->timeout, NULL);
+ event->timeout.tv_sec += 30;
/* If connection not doing anything, queue this. */
if (!watch->conn->out)
@@ -178,16 +177,15 @@ void fire_watches(struct transaction *trans, const char *node)
static void move_event_onwards(struct watch_event *event)
{
list_del(&event->list);
- reset_event(event);
/* Remove from this watch, and find next watch to put this on. */
- event->watch = find_next_watch(event->watch, event->data->buffer);
+ event->watch = find_next_watch(event->watch, event->node);
if (!event->watch) {
talloc_free(event);
return;
}
- list_add(&event->list, &event->watch->events);
+ list_add_tail(&event->list, &event->watch->events);
/* If connection not doing anything, queue this. */
if (!event->watch->conn->out)
@@ -199,10 +197,6 @@ static int destroy_watch(void *_watch)
struct watch *watch = _watch;
struct watch_event *event;
- /* Forget about sending out or waiting for acks for this watch. */
- if (watch->conn->event && watch->conn->event->watch == watch)
- watch->conn->event = NULL;
-
/* If we have pending events, pass them on to others. */
while ((event = list_top(&watch->events, struct watch_event, list)))
move_event_onwards(event);
@@ -227,21 +221,59 @@ static void insert_watch(struct watch *watch)
list_add_tail(&watch->list, &watches);
}
+void shortest_watch_ack_timeout(struct timeval *tv)
+{
+ struct watch *watch;
+
+ list_for_each_entry(watch, &watches, list) {
+ struct watch_event *i;
+ list_for_each_entry(i, &watch->events, list) {
+ if (!timerisset(&i->timeout))
+ continue;
+ if (!timerisset(tv) || timercmp(&i->timeout, tv, <))
+ *tv = i->timeout;
+ }
+ }
+}
+
+void check_watch_ack_timeout(void)
+{
+ struct watch *watch;
+ struct timeval now;
+
+ gettimeofday(&now, NULL);
+ list_for_each_entry(watch, &watches, list) {
+ struct watch_event *i, *tmp;
+ list_for_each_entry_safe(i, tmp, &watch->events, list) {
+ if (!timerisset(&i->timeout))
+ continue;
+ if (timercmp(&i->timeout, &now, <)) {
+ xprintf("Warning: timeout on watch event %s"
+ " token %s\n",
+ i->node, watch->token);
+ timerclear(&i->timeout);
+ }
+ }
+ }
+}
+
bool do_watch(struct connection *conn, struct buffered_data *in)
{
struct watch *watch;
- char *vec[2];
+ char *vec[3];
if (get_strings(in, vec, ARRAY_SIZE(vec)) != ARRAY_SIZE(vec))
return send_error(conn, EINVAL);
+ vec[0] = canonicalize(conn, vec[0]);
if (!check_node_perms(conn, vec[0], XS_PERM_READ))
return send_error(conn, errno);
watch = talloc(conn, struct watch);
watch->node = talloc_strdup(watch, vec[0]);
+ watch->token = talloc_strdup(watch, vec[1]);
watch->conn = conn;
- watch->priority = strtoul(vec[1], NULL, 0);
+ watch->priority = strtoul(vec[2], NULL, 0);
INIT_LIST_HEAD(&watch->events);
insert_watch(watch);
@@ -249,31 +281,58 @@ bool do_watch(struct connection *conn, struct buffered_data *in)
return send_ack(conn, XS_WATCH);
}
-bool do_watch_ack(struct connection *conn)
+bool do_watch_ack(struct connection *conn, const char *token)
{
struct watch_event *event;
- if (!waiting_for_ack(conn))
+ if (!conn->waiting_for_ack)
return send_error(conn, ENOENT);
- /* Remove this watch event. */
- event = conn->event;
- conn->event = NULL;
+ event = get_first_event(conn);
+ if (!streq(event->watch->token, token))
+ return send_error(conn, EINVAL);
move_event_onwards(event);
+ conn->waiting_for_ack = false;
return send_ack(conn, XS_WATCH_ACK);
}
-bool do_unwatch(struct connection *conn, const char *node)
+bool do_unwatch(struct connection *conn, struct buffered_data *in)
{
struct watch *watch;
+ char *node, *vec[2];
+
+ if (get_strings(in, vec, ARRAY_SIZE(vec)) != ARRAY_SIZE(vec))
+ return send_error(conn, EINVAL);
+ node = canonicalize(conn, vec[0]);
list_for_each_entry(watch, &watches, list) {
- if (watch->conn == conn
- && streq(watch->node, node)) {
+ if (watch->conn != conn)
+ continue;
+
+ if (streq(watch->node, node) && streq(watch->token, vec[1])) {
talloc_free(watch);
return send_ack(conn, XS_UNWATCH);
}
}
return send_error(conn, ENOENT);
}
+
+#ifdef TESTING
+void dump_watches(struct connection *conn)
+{
+ struct watch *watch;
+ struct watch_event *event;
+
+ /* Find first watch with an event. */
+ list_for_each_entry(watch, &watches, list) {
+ if (watch->conn != conn)
+ continue;
+
+ printf(" watch on %s token %s prio %i\n",
+ watch->node, watch->token, watch->priority);
+ list_for_each_entry(event, &watch->events, list)
+ printf(" event: %s\n", event->node);
+ }
+}
+#endif