aboutsummaryrefslogtreecommitdiffstats
path: root/tools/xenstore/xenstored_watch.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/xenstore/xenstored_watch.c')
-rw-r--r--tools/xenstore/xenstored_watch.c279
1 files changed, 279 insertions, 0 deletions
diff --git a/tools/xenstore/xenstored_watch.c b/tools/xenstore/xenstored_watch.c
new file mode 100644
index 0000000000..2df83e1a54
--- /dev/null
+++ b/tools/xenstore/xenstored_watch.c
@@ -0,0 +1,279 @@
+/*
+ Watch code for Xen Store Daemon.
+ Copyright (C) 2005 Rusty Russell IBM Corporation
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+*/
+
+#include <stdio.h>
+#include <sys/types.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include "talloc.h"
+#include "list.h"
+#include "xenstored_watch.h"
+#include "xs_lib.h"
+#include "utils.h"
+#include "xenstored_test.h"
+
+/* We create this if anyone is interested "node", then we pass it from
+ * watch to watch as each connection acks it.
+ */
+struct watch_event
+{
+ /* The watch we are firing for (watch->events) */
+ struct list_head list;
+
+ /* Watch we are currently attached to. */
+ struct watch *watch;
+
+ struct buffered_data *data;
+};
+
+struct watch
+{
+ struct list_head list;
+ unsigned int priority;
+
+ /* Current outstanding events applying to this watch. */
+ struct list_head events;
+
+ char *node;
+ struct connection *conn;
+};
+static LIST_HEAD(watches);
+
+static void reset_event(struct watch_event *event)
+{
+ event->data->inhdr = true;
+ event->data->used = 0;
+}
+
+/* We received a non-ACK response: re-queue any watch we just sent. */
+void reset_watch_event(struct connection *conn)
+{
+ if (waiting_for_ack(conn))
+ reset_event(conn->event);
+}
+
+/* We're waiting if we have an event and we sent it all. */
+bool waiting_for_ack(struct connection *conn)
+{
+ if (!conn->event)
+ return false;
+
+ if (conn->event->data->inhdr)
+ return false;
+ return conn->event->data->used == conn->event->data->hdr.msg.len;
+}
+
+bool is_watch_event(struct connection *conn, struct buffered_data *out)
+{
+ return (conn->event && out == conn->event->data);
+}
+
+/* Look through our watches: if any of them have an event, queue it. */
+void queue_next_event(struct connection *conn)
+{
+ struct watch *watch;
+
+ /* We had a reply queued already? Send it. */
+ if (conn->waiting_reply) {
+ conn->out = conn->waiting_reply;
+ conn->waiting_reply = NULL;
+ return;
+ }
+
+ /* If we're waiting for ack, don't queue more. */
+ if (waiting_for_ack(conn))
+ return;
+
+ /* Find a good event to send. */
+ if (!conn->event) {
+ list_for_each_entry(watch, &watches, list) {
+ if (watch->conn != conn)
+ continue;
+
+ conn->event = list_top(&watch->events,
+ struct watch_event, list);
+ if (conn->event)
+ break;
+ }
+ if (!conn->event)
+ return;
+ }
+
+ conn->out = conn->event->data;
+}
+
+/* Watch on DIR applies to DIR, DIR/FILE, but not DIRLONG. */
+static bool watch_applies(const struct watch *watch, const char *node)
+{
+ return is_child(node, watch->node);
+}
+
+static struct watch *find_watch(const char *node)
+{
+ struct watch *watch;
+
+ list_for_each_entry(watch, &watches, list) {
+ if (watch_applies(watch, node))
+ return watch;
+ }
+ return NULL;
+}
+
+static struct watch *find_next_watch(struct watch *watch, const char *node)
+{
+ list_for_each_entry_continue(watch, &watches, list) {
+ if (watch_applies(watch, node))
+ return watch;
+ }
+ return NULL;
+}
+
+/* FIXME: we fail to fire on out of memory. Should drop connections. */
+void fire_watches(struct transaction *trans, const char *node)
+{
+ struct watch *watch;
+ struct watch_event *event;
+
+ /* During transactions, don't fire watches. */
+ if (trans)
+ return;
+
+ watch = find_watch(node);
+ if (!watch)
+ return;
+
+ /* Create and fill in info about event. */
+ event = talloc(talloc_autofree_context(), struct watch_event);
+ event->data = new_buffer(event);
+ event->data->hdr.msg.type = XS_WATCH_EVENT;
+ event->data->hdr.msg.len = strlen(node) + 1;
+ event->data->buffer = talloc_strdup(event->data, node);
+
+ /* Tie event to this watch. */
+ event->watch = watch;
+ list_add(&event->list, &watch->events);
+
+ /* If connection not doing anything, queue this. */
+ if (!watch->conn->out)
+ queue_next_event(watch->conn);
+}
+
+/* We're done with this event: see if anyone else wants it. */
+static void move_event_onwards(struct watch_event *event)
+{
+ list_del(&event->list);
+ reset_event(event);
+
+ /* Remove from this watch, and find next watch to put this on. */
+ event->watch = find_next_watch(event->watch, event->data->buffer);
+ if (!event->watch) {
+ talloc_free(event);
+ return;
+ }
+
+ list_add(&event->list, &event->watch->events);
+
+ /* If connection not doing anything, queue this. */
+ if (!event->watch->conn->out)
+ queue_next_event(event->watch->conn);
+}
+
+static int destroy_watch(void *_watch)
+{
+ struct watch *watch = _watch;
+ struct watch_event *event;
+
+ /* Forget about sending out or waiting for acks for this watch. */
+ if (watch->conn->event && watch->conn->event->watch == watch)
+ watch->conn->event = NULL;
+
+ /* If we have pending events, pass them on to others. */
+ while ((event = list_top(&watch->events, struct watch_event, list)))
+ move_event_onwards(event);
+
+ /* Remove from global list. */
+ list_del(&watch->list);
+ return 0;
+}
+
+/* We keep watches in priority order. */
+static void insert_watch(struct watch *watch)
+{
+ struct watch *i;
+
+ list_for_each_entry(i, &watches, list) {
+ if (i->priority <= watch->priority) {
+ list_add_tail(&watch->list, &i->list);
+ return;
+ }
+ }
+
+ list_add_tail(&watch->list, &watches);
+}
+
+bool do_watch(struct connection *conn, struct buffered_data *in)
+{
+ struct watch *watch;
+ char *vec[2];
+
+ if (get_strings(in, vec, ARRAY_SIZE(vec)) != ARRAY_SIZE(vec))
+ return send_error(conn, EINVAL);
+
+ if (!check_node_perms(conn, vec[0], XS_PERM_READ))
+ return send_error(conn, errno);
+
+ watch = talloc(conn, struct watch);
+ watch->node = talloc_strdup(watch, vec[0]);
+ watch->conn = conn;
+ watch->priority = strtoul(vec[1], NULL, 0);
+ INIT_LIST_HEAD(&watch->events);
+
+ insert_watch(watch);
+ talloc_set_destructor(watch, destroy_watch);
+ return send_ack(conn, XS_WATCH);
+}
+
+bool do_watch_ack(struct connection *conn)
+{
+ struct watch_event *event;
+
+ if (!waiting_for_ack(conn))
+ return send_error(conn, ENOENT);
+
+ /* Remove this watch event. */
+ event = conn->event;
+ conn->event = NULL;
+
+ move_event_onwards(event);
+ return send_ack(conn, XS_WATCH_ACK);
+}
+
+bool do_unwatch(struct connection *conn, const char *node)
+{
+ struct watch *watch;
+
+ list_for_each_entry(watch, &watches, list) {
+ if (watch->conn == conn
+ && streq(watch->node, node)) {
+ talloc_free(watch);
+ return send_ack(conn, XS_UNWATCH);
+ }
+ }
+ return send_error(conn, ENOENT);
+}