/****************************************************************************** * common/softirq.c * * Modified from the Linux original. Softirqs in Xen are only executed in * an outermost activation (e.g., never within an interrupt activation). * This simplifies some things and generally seems a good thing. * * Copyright (c) 2003, K A Fraser * * Copyright (C) 1992 Linus Torvalds */ #include #include #include #include #include irq_cpustat_t irq_stat[NR_CPUS]; static struct softirq_action softirq_vec[32] __cacheline_aligned; asmlinkage void do_softirq() { unsigned int pending, cpu = smp_processor_id(); struct softirq_action *h; if ( unlikely(in_interrupt()) ) BUG(); /* * XEN: This isn't real mutual-exclusion: it just ensures that in_softirq() * and in_interrupt() are both TRUE, allowing checks for erroneous reentry. */ cpu_bh_disable(cpu); while ( (pending = xchg(&softirq_pending(cpu), 0)) != 0 ) { h = softirq_vec; while ( pending ) { if ( pending & 1 ) h->action(h); h++; pending >>= 1; } } cpu_bh_enable(cpu); } inline void cpu_raise_softirq(unsigned int cpu, unsigned int nr) { __cpu_raise_softirq(cpu, nr); #ifdef CONFIG_SMP if ( cpu != smp_processor_id() ) smp_send_event_check_cpu(cpu); #endif } void raise_softirq(unsigned int nr) { __cpu_raise_softirq(smp_processor_id(), nr); } void open_softirq(int nr, void (*action)(struct softirq_action*), void *data) { softirq_vec[nr].data = data; softirq_vec[nr].action = action; } /* Tasklets */ struct tasklet_head tasklet_vec[NR_CPUS] __cacheline_aligned; struct tasklet_head tasklet_hi_vec[NR_CPUS] __cacheline_aligned; void __tasklet_schedule(struct tasklet_struct *t) { int cpu = smp_processor_id(); unsigned long flags; local_irq_save(flags); t->next = tasklet_vec[cpu].list; tasklet_vec[cpu].list = t; cpu_raise_softirq(cpu, TASKLET_SOFTIRQ); local_irq_restore(flags); } void __tasklet_hi_schedule(struct tasklet_struct *t) { int cpu = smp_processor_id(); unsigned long flags; local_irq_save(flags); t->next = tasklet_hi_vec[cpu].list; tasklet_hi_vec[cpu].list = t; cpu_raise_softirq(cpu, HI_SOFTIRQ); local_irq_restore(flags); } static void tasklet_action(struct softirq_action *a) { int cpu = smp_processor_id(); struct tasklet_struct *list; local_irq_disable(); list = tasklet_vec[cpu].list; tasklet_vec[cpu].list = NULL; local_irq_enable(); while ( list != NULL ) { struct tasklet_struct *t = list; list = list->next; if ( likely(tasklet_trylock(t)) ) { if ( likely(!atomic_read(&t->count)) ) { if ( unlikely(!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state)) ) BUG(); t->func(t->data); } tasklet_unlock(t); continue; } local_irq_disable(); t->next = tasklet_vec[cpu].list; tasklet_vec[cpu].list = t; __cpu_raise_softirq(cpu, TASKLET_SOFTIRQ); local_irq_enable(); } } static void tasklet_hi_action(struct softirq_action *a) { int cpu = smp_processor_id(); struct tasklet_struct *list; local_irq_disable(); list = tasklet_hi_vec[cpu].list; tasklet_hi_vec[cpu].list = NULL; local_irq_enable(); while ( list != NULL ) { struct tasklet_struct *t = list; list = list->next; if ( likely(tasklet_trylock(t)) ) { if ( likely(!atomic_read(&t->count)) ) { if ( unlikely(!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state)) ) BUG(); t->func(t->data); } tasklet_unlock(t); continue; } local_irq_disable(); t->next = tasklet_hi_vec[cpu].list; tasklet_hi_vec[cpu].list = t; __cpu_raise_softirq(cpu, HI_SOFTIRQ); local_irq_enable(); } } void tasklet_init(struct tasklet_struct *t, void (*func)(unsigned long), unsigned long data) { t->next = NULL; t->state = 0; atomic_set(&t->count, 0); t->func = func; t->data = data; } void tasklet_kill(struct tasklet_struct *t) { if ( in_interrupt() ) BUG(); while ( test_and_set_bit(TASKLET_STATE_SCHED, &t->state) ) while ( test_bit(TASKLET_STATE_SCHED, &t->state) ) do_softirq(); tasklet_unlock_wait(t); clear_bit(TASKLET_STATE_SCHED, &t->state); } void __init softirq_init() { open_softirq(TASKLET_SOFTIRQ, tasklet_action, NULL); open_softirq(HI_SOFTIRQ, tasklet_hi_action, NULL); } 4 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
/**
 *  The WebSocket backend is responsible for updating our knowledge of flows and events
 *  from the REST API and live updates delivered via a WebSocket connection.
 *  An alternative backend may use the REST API only to host static instances.
 */
import { fetchApi } from "../utils"
import * as connectionActions from "../ducks/connection"

const CMD_RESET = 'reset'

export default class WebsocketBackend {
    constructor(store) {
        this.activeFetches = {}
        this.store = store
        this.connect()
    }

    connect() {
        this.socket = new WebSocket(location.origin.replace('http', 'ws') + '/updates')
        this.socket.addEventListener('open', () => this.onOpen())
        this.socket.addEventListener('close', event => this.onClose(event))
        this.socket.addEventListener('message', msg => this.onMessage(JSON.parse(msg.data)))
        this.socket.addEventListener('error', error => this.onError(error))
    }

    onOpen() {
        this.fetchData("settings")
        this.fetchData("flows")
        this.fetchData("events")
        this.store.dispatch(connectionActions.startFetching())
    }

    fetchData(resource) {
        let queue = []
        this.activeFetches[resource] = queue
        fetchApi(`/${resource}`)
            .then(res => res.json())
            .then(json => {
                // Make sure that we are not superseded yet by the server sending a RESET.
                if (this.activeFetches[resource] === queue)
                    this.receive(resource, json)
            })
    }

    onMessage(msg) {

        if (msg.cmd === CMD_RESET) {
            return this.fetchData(msg.resource)
        }
        if (msg.resource in this.activeFetches) {
            this.activeFetches[msg.resource].push(msg)
        } else {
            let type = `${msg.resource}_${msg.cmd}`.toUpperCase()
            this.store.dispatch({ type, ...msg })
        }
    }

    receive(resource, data) {
        let type = `${resource}_RECEIVE`.toUpperCase()
        this.store.dispatch({ type, cmd: "receive", resource, data })
        let queue = this.activeFetches[resource]
        delete this.activeFetches[resource]
        queue.forEach(msg => this.onMessage(msg))

        if(Object.keys(this.activeFetches).length === 0) {
            // We have fetched the last resource
            this.store.dispatch(connectionActions.connectionEstablished())
        }
    }

    onClose(closeEvent) {
        this.store.dispatch(connectionActions.connectionError(
            `Connection closed at ${new Date().toUTCString()} with error code ${closeEvent.code}.`
        ))
        console.error("websocket connection closed", closeEvent)
    }

    onError() {
        // FIXME
        console.error("websocket connection errored", arguments)
    }
}