aboutsummaryrefslogtreecommitdiffstats
path: root/tools/remus/kmod/sch_queue.c
diff options
context:
space:
mode:
Diffstat (limited to 'tools/remus/kmod/sch_queue.c')
-rw-r--r--tools/remus/kmod/sch_queue.c208
1 files changed, 208 insertions, 0 deletions
diff --git a/tools/remus/kmod/sch_queue.c b/tools/remus/kmod/sch_queue.c
new file mode 100644
index 0000000000..3a6a50c39a
--- /dev/null
+++ b/tools/remus/kmod/sch_queue.c
@@ -0,0 +1,208 @@
+/*
+ * sch_queue.c Queue traffic until an explicit release command
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version
+ * 2 of the License, or (at your option) any later version.
+ *
+ * The operation of the buffer is as follows:
+ * When a checkpoint begins, a barrier is inserted into the
+ * network queue by a netlink request (it operates by storing
+ * a pointer to the next packet which arrives and blocking dequeue
+ * when that packet is at the head of the queue).
+ * When a checkpoint completes (the backup acknowledges receipt),
+ * currently-queued packets are released.
+ * So it supports two operations, barrier and release.
+ */
+
+#include <linux/config.h>
+#include <linux/module.h>
+#include <linux/types.h>
+#include <linux/kernel.h>
+#include <linux/errno.h>
+#include <linux/netdevice.h>
+#include <linux/skbuff.h>
+#include <net/pkt_sched.h>
+
+/* xenbus directory */
+#define FIFO_BUF (10*1024*1024)
+
+#define TCQ_CHECKPOINT 0
+#define TCQ_DEQUEUE 1
+
+struct queue_sched_data {
+ /* this packet is the first packet which should not be delivered.
+ * If it is NULL, queue_enqueue will set it to the next packet it sees. */
+ struct sk_buff *stop;
+};
+
+struct tc_queue_qopt {
+ /* 0: reset stop packet pointer
+ * 1: dequeue to stop pointer */
+ int action;
+};
+
+/* borrowed from drivers/xen/netback/loopback.c */
+static int is_foreign(unsigned long pfn)
+{
+ /* NB. Play it safe for auto-translation mode. */
+ return (xen_feature(XENFEAT_auto_translated_physmap) ||
+ (phys_to_machine_mapping[pfn] & FOREIGN_FRAME_BIT));
+}
+
+static int skb_remove_foreign_references(struct sk_buff *skb)
+{
+ struct page *page;
+ unsigned long pfn;
+ int i, off;
+ char *vaddr;
+
+ BUG_ON(skb_shinfo(skb)->frag_list);
+
+ for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) {
+ pfn = page_to_pfn(skb_shinfo(skb)->frags[i].page);
+ if (!is_foreign(pfn))
+ continue;
+ /*
+ printk("foreign ref found\n");
+ */
+ page = alloc_page(GFP_ATOMIC | __GFP_NOWARN);
+ if (unlikely(!page))
+ return 0;
+
+ vaddr = kmap_skb_frag(&skb_shinfo(skb)->frags[i]);
+ off = skb_shinfo(skb)->frags[i].page_offset;
+ memcpy(page_address(page) + off, vaddr + off,
+ skb_shinfo(skb)->frags[i].size);
+ kunmap_skb_frag(vaddr);
+
+ put_page(skb_shinfo(skb)->frags[i].page);
+ skb_shinfo(skb)->frags[i].page = page;
+ }
+
+ return 1;
+}
+
+static int queue_enqueue(struct sk_buff *skb, struct Qdisc* sch)
+{
+ struct queue_sched_data *q = qdisc_priv(sch);
+
+ if (likely(sch->qstats.backlog + skb->len <= FIFO_BUF))
+ {
+ if (!q->stop)
+ q->stop = skb;
+
+ if (!skb_remove_foreign_references(skb)) {
+ printk("error removing foreign ref\n");
+ return qdisc_reshape_fail(skb, sch);
+ }
+
+ return qdisc_enqueue_tail(skb, sch);
+ }
+ printk("queue reported full: %d,%d\n", sch->qstats.backlog, skb->len);
+
+ return qdisc_reshape_fail(skb, sch);
+}
+
+/* dequeue doesn't actually dequeue until the release command is
+ * received. */
+static inline struct sk_buff *queue_dequeue(struct Qdisc* sch)
+{
+ struct queue_sched_data *q = qdisc_priv(sch);
+ struct sk_buff* peek;
+ /*
+ struct timeval tv;
+
+ if (!q->stop) {
+ do_gettimeofday(&tv);
+ printk("packet dequeued at %lu.%06lu\n", tv.tv_sec, tv.tv_usec);
+ }
+ */
+
+ if (sch->flags & TCQ_F_THROTTLED)
+ return NULL;
+
+ peek = (struct sk_buff *)((sch->q).next);
+
+ /* this pointer comparison may be shady */
+ if (peek == q->stop) {
+ /*
+ do_gettimeofday(&tv);
+ printk("stop packet at %lu.%06lu\n", tv.tv_sec, tv.tv_usec);
+ */
+
+ /* this is the tail of the last round. Release it and block the queue */
+ sch->flags |= TCQ_F_THROTTLED;
+ return NULL;
+ }
+
+ return qdisc_dequeue_head(sch);
+}
+
+static int queue_init(struct Qdisc *sch, struct rtattr *opt)
+{
+ sch->flags |= TCQ_F_THROTTLED;
+
+ return 0;
+}
+
+/* receives two messages:
+ * 0: checkpoint queue (set stop to next packet)
+ * 1: dequeue until stop */
+static int queue_change(struct Qdisc* sch, struct rtattr* opt)
+{
+ struct queue_sched_data *q = qdisc_priv(sch);
+ struct tc_queue_qopt* msg;
+ /*
+ struct timeval tv;
+ */
+
+ if (!opt || RTA_PAYLOAD(opt) < sizeof(*msg))
+ return -EINVAL;
+
+ msg = RTA_DATA(opt);
+
+ if (msg->action == TCQ_CHECKPOINT) {
+ /* reset stop */
+ q->stop = NULL;
+ } else if (msg->action == TCQ_DEQUEUE) {
+ /* dequeue */
+ sch->flags &= ~TCQ_F_THROTTLED;
+ netif_schedule(sch->dev);
+ /*
+ do_gettimeofday(&tv);
+ printk("queue release at %lu.%06lu (%d bytes)\n", tv.tv_sec, tv.tv_usec,
+ sch->qstats.backlog);
+ */
+ } else {
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+struct Qdisc_ops queue_qdisc_ops = {
+ .id = "queue",
+ .priv_size = sizeof(struct queue_sched_data),
+ .enqueue = queue_enqueue,
+ .dequeue = queue_dequeue,
+ .init = queue_init,
+ .change = queue_change,
+ .owner = THIS_MODULE,
+};
+
+static int __init queue_module_init(void)
+{
+ printk("loading queue\n");
+ return register_qdisc(&queue_qdisc_ops);
+}
+
+static void __exit queue_module_exit(void)
+{
+ printk("queue unloaded\n");
+ unregister_qdisc(&queue_qdisc_ops);
+}
+module_init(queue_module_init)
+module_exit(queue_module_exit)
+MODULE_LICENSE("GPL");