From 571c817f2176008c460c736580c0b35385808a2e Mon Sep 17 00:00:00 2001 From: Jason Date: Sat, 25 Jun 2016 00:36:39 +0800 Subject: [web] add msgQueue --- web/src/js/ducks/eventLog.js | 39 +++----------- web/src/js/ducks/flows.js | 46 ++++------------- web/src/js/ducks/index.js | 10 ++-- web/src/js/ducks/msgQueue.js | 113 +++++++++++++++++++++++++++++++++++++++++ web/src/js/ducks/settings.js | 86 +++++++++++++++---------------- web/src/js/ducks/utils/list.js | 85 ++++++++++++++++++------------- web/src/js/ducks/utils/view.js | 6 +-- web/src/js/ducks/websocket.js | 39 +++----------- 8 files changed, 233 insertions(+), 191 deletions(-) create mode 100644 web/src/js/ducks/msgQueue.js (limited to 'web/src') diff --git a/web/src/js/ducks/eventLog.js b/web/src/js/ducks/eventLog.js index 0c875689..50b24cd6 100644 --- a/web/src/js/ducks/eventLog.js +++ b/web/src/js/ducks/eventLog.js @@ -1,12 +1,12 @@ -import { fetchApi } from '../utils' import reduceList, * as listActions from './utils/list' import reduceView, * as viewActions from './utils/view' import * as websocketActions from './websocket' +import * as msgQueueActions from './msgQueue' -export const WS_MSG_TYPE = 'UPDATE_EVENTLOG' +export const MSG_TYPE = 'UPDATE_EVENTLOG' +export const DATA_URL = '/events' export const ADD = 'EVENTLOG_ADD' -export const REQUEST = 'EVENTLOG_REQUEST' export const RECEIVE = 'EVENTLOG_RECEIVE' export const TOGGLE_VISIBILITY = 'EVENTLOG_TOGGLE_VISIBILITY' export const TOGGLE_FILTER = 'EVENTLOG_TOGGLE_FILTER' @@ -51,12 +51,6 @@ export default function reduce(state = defaultState, action) { view: reduceView(state.view, viewActions.add(item, log => state.filters[log.level])), } - case REQUEST: - return { - ...state, - list: reduceList(state.list, listActions.request()), - } - case RECEIVE: const list = reduceList(state.list, listActions.receive(action.list)) return { @@ -120,33 +114,12 @@ export function handleWsMsg(msg) { * @public websocket */ export function fetchData() { - return dispatch => { - dispatch(request()) - - return fetchApi('/events') - .then(res => res.json()) - .then(json => dispatch(receive(json.data))) - .catch(error => dispatch(fetchError(error))) - } -} - -/** - * @private - */ -export function request() { - return { type: REQUEST } + return msgQueueActions.fetchData(MSG_TYPE) } /** - * @private + * @public msgQueue */ -export function receive(list) { +export function receiveData(list) { return { type: RECEIVE, list } } - -/** - * @private - */ -export function fetchError(error) { - return { type: FETCH_ERROR, error } -} diff --git a/web/src/js/ducks/flows.js b/web/src/js/ducks/flows.js index f732f536..e9ac38f6 100644 --- a/web/src/js/ducks/flows.js +++ b/web/src/js/ducks/flows.js @@ -1,14 +1,15 @@ import { fetchApi } from '../utils' import reduceList, * as listActions from './utils/list' import reduceViews, * as viewsActions from './views' +import * as msgQueueActions from './msgQueue' import * as websocketActions from './websocket' -export const WS_MSG_TYPE = 'UPDATE_FLOWS' +export const MSG_TYPE = 'UPDATE_FLOWS' +export const DATA_URL = '/flows' export const ADD = 'FLOWS_ADD' export const UPDATE = 'FLOWS_UPDATE' export const REMOVE = 'FLOWS_REMOVE' -export const REQUEST = 'FLOWS_REQUEST' export const RECEIVE = 'FLOWS_RECEIVE' export const REQUEST_ACTION = 'FLOWS_REQUEST_ACTION' export const UNKNOWN_CMD = 'FLOWS_UNKNOWN_CMD' @@ -43,12 +44,6 @@ export default function reduce(state = defaultState, action) { views: reduceViews(state.views, viewsActions.remove(action.item.id)), } - case REQUEST: - return { - ...state, - list: reduceList(state.list, listActions.request()), - } - case RECEIVE: const list = reduceList(state.list, listActions.receive(action.list)) return { @@ -177,14 +172,14 @@ export function handleWsMsg(msg) { * @public websocket */ export function fetchData() { - return dispatch => { - dispatch(request()) + return msgQueueActions.fetchData(MSG_TYPE) +} - return fetchApi('/flows') - .then(res => res.json()) - .then(json => dispatch(receive(json.data))) - .catch(error => dispatch(fetchError(error))) - } +/** + * @public msgQueue + */ +export function receiveData(list) { + return { type: RECEIVE, list } } /** @@ -207,24 +202,3 @@ export function update(id, item) { export function remove(id) { return { type: REMOVE, id } } - -/** - * @private - */ -export function request() { - return { type: REQUEST } -} - -/** - * @private - */ -export function receive(list) { - return { type: RECEIVE, list } -} - -/** - * @private - */ -export function fetchError(error) { - return { type: FETCH_ERROR, error } -} diff --git a/web/src/js/ducks/index.js b/web/src/js/ducks/index.js index ffde1a64..c2488d70 100644 --- a/web/src/js/ducks/index.js +++ b/web/src/js/ducks/index.js @@ -1,16 +1,16 @@ -import {combineReducers} from 'redux' +import { combineReducers } from 'redux' import eventLog from './eventLog' import websocket from './websocket' import flows from './flows' import settings from './settings' import ui from './ui' +import msgQueue from './msgQueue' -const rootReducer = combineReducers({ +export default combineReducers({ eventLog, websocket, flows, settings, - ui + ui, + msgQueue, }) - -export default rootReducer diff --git a/web/src/js/ducks/msgQueue.js b/web/src/js/ducks/msgQueue.js new file mode 100644 index 00000000..6d82f4c2 --- /dev/null +++ b/web/src/js/ducks/msgQueue.js @@ -0,0 +1,113 @@ +import { fetchApi } from '../utils' +import * as websocketActions from './websocket' +import * as eventLogActions from './eventLog' +import * as flowsActions from './flows' +import * as settingsActions from './settings' + +export const INIT = 'MSG_QUEUE_INIT' +export const ENQUEUE = 'MSG_QUEUE_ENQUEUE' +export const CLEAR = 'MSG_QUEUE_CLEAR' +export const FETCH_ERROR = 'MSG_QUEUE_FETCH_ERROR' + +const handlers = { + [eventLogActions.MSG_TYPE] : eventLogActions, + [flowsActions.MSG_TYPE] : flowsActions, + [settingsActions.MSG_TYPE] : settingsActions, +} + +const defaultState = {} + +export default function reduce(state = defaultState, action) { + switch (action.type) { + + case INIT: + return { + ...state, + [action.queue]: [], + } + + case ENQUEUE: + return { + ...state, + [action.queue]: [...state[action.queue], action.msg], + } + + case CLEAR: + return { + ...state, + [action.queue]: null, + } + + default: + return state + } +} + +/** + * @public websocket + */ +export function handleWsMsg(msg) { + return (dispatch, getState) => { + const handler = handlers[msg.type] + if (msg.cmd === websocketActions.CMD_RESET) { + return dispatch(fetchData(handler.MSG_TYPE)) + } + if (getState().msgQueue[handler.MSG_TYPE]) { + return dispatch({ type: ENQUEUE, queue: handler.MSG_TYPE, msg }) + } + return dispatch(handler.handleWsMsg(msg)) + } +} + +/** + * @public + */ +export function fetchData(type) { + return dispatch => { + const handler = handlers[type] + + dispatch(init(handler.MSG_TYPE)) + + fetchApi(handler.DATA_URL) + .then(res => res.json()) + .then(json => dispatch(receive(type, json))) + .catch(error => dispatch(fetchError(type, error))) + } +} + +/** + * @private + */ +export function receive(type, res) { + return (dispatch, getState) => { + const handler = handlers[type] + const queue = getState().msgQueue[handler.MSG_TYPE] || [] + + dispatch(clear(handler.MSG_TYPE)) + dispatch(handler.receiveData(res.data)) + for (const msg of queue) { + dispatch(handler.handleWsMsg(msg)) + } + } +} + +/** + * @private + */ +export function init(queue) { + return { type: INIT, queue } +} + +/** + * @private + */ +export function clear(queue) { + return { type: CLEAR, queue } +} + +/** + * @private + */ +export function fetchError(type, error) { + return { type: FETCH_ERROR, type, error } +} diff --git a/web/src/js/ducks/settings.js b/web/src/js/ducks/settings.js index bef7c0ff..c5f0a90b 100644 --- a/web/src/js/ducks/settings.js +++ b/web/src/js/ducks/settings.js @@ -1,45 +1,31 @@ -import {fetchApi} from '../utils'; +import { fetchApi } from '../utils' +import * as msgQueueActions from './msgQueue' -export const REQUEST_SETTINGS = 'REQUEST_SETTINGS' -export const RECEIVE_SETTINGS = 'RECEIVE_SETTINGS' -export const UPDATE_SETTINGS = 'UPDATE_SETTINGS' +export const MSG_TYPE = 'UPDATE_SETTINGS' +export const DATA_URL = '/settings' + +export const RECEIVE = 'RECEIVE' +export const UPDATE = 'UPDATE' +export const REQUEST_UPDATE = 'REQUEST_UPDATE' +export const UNKNOWN_CMD = 'SETTINGS_UNKNOWN_CMD' const defaultState = { settings: {}, - isFetching: false, - actionsDuringFetch: [], } export default function reducer(state = defaultState, action) { switch (action.type) { - case REQUEST_SETTINGS: + case RECEIVE: return { ...state, - isFetching: true - } - - case RECEIVE_SETTINGS: - let s = { settings: action.settings, - isFetching: false, - actionsDuringFetch: [], } - for (action of state.actionsDuringFetch) { - s = reducer(s, action) - } - return s - case UPDATE_SETTINGS: - if (state.isFetching) { - return { - ...state, - actionsDuringFetch: [...state.actionsDuringFetch, action] - } - } + case UPDATE: return { ...state, - settings: {...state.settings, ...action.settings} + settings: { ...state.settings, ...action.settings }, } default: @@ -47,31 +33,39 @@ export default function reducer(state = defaultState, action) { } } -export function handleWsMsg(event) { - /* This action creator takes all WebSocket events */ - if (event.cmd === 'update') { - return { - type: UPDATE_SETTINGS, - settings: event.data - } - } - console.error('unknown settings update', event) -} +/** + * @public msgQueue + */ +export function handleWsMsg(msg) { + switch (msg.cmd) { -export function fetchSettings() { - return dispatch => { - dispatch({type: REQUEST_SETTINGS}) + case websocketActions.CMD_UPDATE: + return { type: UPDATE, settings: msg.data } - return fetchApi('/settings') - .then(response => response.json()) - .then(json => - dispatch({type: RECEIVE_SETTINGS, settings: json.data}) - ) - // TODO: Error handling + default: + console.error('unknown settings update', msg) + return { type: UNKNOWN_CMD, msg } } } +/** + * @public + */ export function updateSettings(settings) { fetchApi.put('/settings', settings) - return { type: SET_INTERCEPT } + return { type: REQUEST_UPDATE } +} + +/** + * @public websocket + */ +export function fetchData() { + return msgQueueActions.fetchData(MSG_TYPE) +} + +/** + * @public msgQueue + */ +export function receiveData(settings) { + return { type: RECEIVE, settings } } diff --git a/web/src/js/ducks/utils/list.js b/web/src/js/ducks/utils/list.js index e66a8549..a93b2d45 100644 --- a/web/src/js/ducks/utils/list.js +++ b/web/src/js/ducks/utils/list.js @@ -1,54 +1,74 @@ import _ from 'lodash' -export const SET = 'LIST_SET' -export const CLEAR = 'LIST_CLEAR' -export const REQUEST = 'LIST_REQUEST' +export const ADD = 'LIST_ADD' +export const UPDATE = 'LIST_UPDATE' +export const REMOVE = 'LIST_REMOVE' export const RECEIVE = 'LIST_RECEIVE' const defaultState = { - data: {}, - pendingActions: null, + data: [], + byId: {}, + indexOf: {}, } export default function reduce(state = defaultState, action) { switch (action.type) { - case SET: - if (state.pendingActions) { - return { - ...state, - pendingActions: [...state.pendingActions, action] - } - } + case ADD: return { ...state, - data: { ...state.data, [action.id]: null, [action.item.id]: action.item } + data: [...state.data, action.item], + byId: { ...state.byId, [action.item.id]: action.item }, + indexOf: { ...state.indexOf, [action.item.id]: state.data.length }, } - case CLEAR: - if (state.pendingActions) { - return { - ...state, - pendingActions: [...state.pendingActions, action] - } + case UPDATE: { + const data = [...state.data] + const index = state.indexOf[action.id] + + if (index == null) { + throw new Error('Item not found') } + + data[index] = action.item + return { ...state, - data: { ...state.data, [action.id]: null } + data, + byId: { ...state.byId, [action.id]: null, [action.item.id]: action.item }, + indexOf: { ...state.indexOf, [action.id]: null, [action.item.id]: index }, + } + } + + case REMOVE: { + const data = [...state.data] + const indexOf = { ...state.indexOf } + const index = indexOf[action.id] + + if (index == null) { + throw new Error('Item not found') + } + + data.splice(index, 1) + for (let i = data.length - 1; i >= index; i--) { + indexOf[data[i].id] = i } - case REQUEST: return { ...state, - pendingActions: [] + data, + indexOf, + byId: { ...state.byId, [action.id]: null }, } + } case RECEIVE: - return state.pendingActions.reduce(reduce, { + return { ...state, - pendingActions: null, - data: _.fromPairs(action.list.map(item => [item.id, item])), - }) + data: action.list, + byId: _.fromPairs(action.list.map(item => [item.id, item])), + indexOf: _.fromPairs(action.list.map((item, index) => [item.id, index])), + } default: return state @@ -59,28 +79,21 @@ export default function reduce(state = defaultState, action) { * @public */ export function add(item) { - return { type: SET, id: item.id, item } + return { type: ADD, item } } /** * @public */ export function update(id, item) { - return { type: SET, id, item } + return { type: UPDATE, id, item } } /** * @public */ export function remove(id) { - return { type: CLEAR, id } -} - -/** - * @public - */ -export function request() { - return { type: REQUEST } + return { type: REMOVE, id } } /** diff --git a/web/src/js/ducks/utils/view.js b/web/src/js/ducks/utils/view.js index 3b552378..87f9bd2a 100755 --- a/web/src/js/ducks/utils/view.js +++ b/web/src/js/ducks/utils/view.js @@ -16,7 +16,7 @@ export default function reduce(state = defaultState, action) { switch (action.type) { case UPDATE_FILTER: { - const data = _.values(action.list.data).filter(action.filter).sort(action.sorter) + const data = action.list.data.filter(action.filter).sort(action.sorter) return { ...state, data, @@ -69,7 +69,7 @@ export default function reduce(state = defaultState, action) { } case RECEIVE: { - const data = _.values(action.list.data).filter(action.filter).sort(action.sorter) + const data = action.list.data.filter(action.filter).sort(action.sorter) return { ...state, data, @@ -138,7 +138,7 @@ function sortedIndex(list, item, sorter) { while (low < high) { const middle = (low + high) >>> 1 - if (sorter(item, list[middle]) > 0) { + if (sorter(item, list[middle]) >= 0) { low = middle + 1 } else { high = middle diff --git a/web/src/js/ducks/websocket.js b/web/src/js/ducks/websocket.js index c79d887a..aa0d7f7d 100644 --- a/web/src/js/ducks/websocket.js +++ b/web/src/js/ducks/websocket.js @@ -1,5 +1,7 @@ import { ConnectionActions } from '../actions.js' import { AppDispatcher } from '../dispatcher.js' + +import * as msgQueueActions from './msgQueue' import * as eventLogActions from './eventLog' import * as flowsActions from './flows' import * as settingsActions from './settings' @@ -45,17 +47,12 @@ export function connect() { return dispatch => { const socket = new WebSocket(location.origin.replace('http', 'ws') + '/updates') - // @todo remove this - window.ws = socket - socket.addEventListener('open', () => dispatch(onConnect())) socket.addEventListener('close', () => dispatch(onDisconnect())) - socket.addEventListener('message', msg => dispatch(onMessage(msg))) + socket.addEventListener('message', msg => dispatch(onMessage(JSON.parse(msg.data)))) socket.addEventListener('error', error => dispatch(onError(error))) dispatch({ type: CONNECT, socket }) - - return socket } } @@ -70,39 +67,18 @@ export function onConnect() { // workaround to make sure that our state is already available. return dispatch => { dispatch({ type: CONNECTED }) - dispatch(settingsActions.fetchSettings()) - dispatch(flowsActions.fetchFlows()).then(() => ConnectionActions.open()) + dispatch(settingsActions.fetchData()) + dispatch(flowsActions.fetchData()) + dispatch(eventLogActions.fetchData()) } } export function onMessage(msg) { - return dispatch => { - const data = JSON.parse(msg.data) - - AppDispatcher.dispatchServerAction(data) - - switch (data.type) { - - case eventLogActions.WS_MSG_TYPE: - return dispatch(eventLogActions.handleWsMsg(data)) - - case flowsActions.WS_MSG_TYPE: - return dispatch(flowsActions.handleWsMsg(data)) - - case settingsActions.UPDATE_SETTINGS: - return dispatch(settingsActions.handleWsMsg(data)) - - default: - console.warn('unknown message', data) - } - - dispatch({ type: MESSAGE, msg }) - } + return msgQueueActions.handleWsMsg(msg) } export function onDisconnect() { return dispatch => { - ConnectionActions.close() dispatch(eventLogActions.addLogEntry('WebSocket connection closed.')) dispatch({ type: DISCONNECTED }) } @@ -111,7 +87,6 @@ export function onDisconnect() { export function onError(error) { // @todo let event log subscribe WebSocketActions.ERROR return dispatch => { - ConnectionActions.error() dispatch(eventLogActions.addLogEntry('WebSocket connection error.')) dispatch({ type: ERROR, error }) } -- cgit v1.2.3