From 8f6210da20e9bc55a30e42b008f7e9e94587bf3a Mon Sep 17 00:00:00 2001 From: Max Goodhart Date: Sun, 5 Jul 2020 17:25:57 -0700 Subject: [PATCH] Use Y.js CRDT for view id mapping This fixes glitches when typing in the grid boxes by allowing concurrent writes and removing the need to read state from the server to render the box values. --- package-lock.json | 21 ++++++++ package.json | 3 +- src/node/StreamWindow.js | 2 +- src/node/index.js | 33 ++++++++++-- src/node/server.js | 26 ++++++++-- src/web/control.js | 105 ++++++++++++++++++++++----------------- 6 files changed, 136 insertions(+), 54 deletions(-) diff --git a/package-lock.json b/package-lock.json index 3c7e84d..7494ed6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6219,6 +6219,11 @@ "integrity": "sha1-TkMekrEalzFjaqH5yNHMvP2reN8=", "dev": true }, + "isomorphic.js": { + "version": "0.1.4", + "resolved": "https://registry.npmjs.org/isomorphic.js/-/isomorphic.js-0.1.4.tgz", + "integrity": "sha512-t9zbgkjE7f9f2M6OSW49YEq0lUrSdAllBbWFUZoeck/rnnFae6UlhmDtXWs48VJY3ZpryCoZsRiAiKD44hPIGQ==" + }, "isstream": { "version": "0.1.2", "resolved": "https://registry.npmjs.org/isstream/-/isstream-0.1.2.tgz", @@ -8419,6 +8424,14 @@ "type-check": "~0.3.2" } }, + "lib0": { + "version": "0.2.30", + "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.30.tgz", + "integrity": "sha512-Mp2SaW5uJwMD1atU/MJ7DGxGf5DBaokZGhByg9IYLCRfE+D+DeksjbnweaFvUoWrRpDCJLtg1jY8xENNICj5Mg==", + "requires": { + "isomorphic.js": "^0.1.3" + } + }, "lines-and-columns": { "version": "1.1.6", "resolved": "https://registry.npmjs.org/lines-and-columns/-/lines-and-columns-1.1.6.tgz", @@ -12475,6 +12488,14 @@ "fd-slicer": "~1.1.0" } }, + "yjs": { + "version": "13.2.0", + "resolved": "https://registry.npmjs.org/yjs/-/yjs-13.2.0.tgz", + "integrity": "sha512-0augWOespX5KC8de62GCR8WloZhAyBfEF3ZPDpjZlRs6yho7iFKqarpzxxJgmP8zA/pNJiV1EIpMezSxEdNdDw==", + "requires": { + "lib0": "^0.2.27" + } + }, "ylru": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/ylru/-/ylru-1.2.1.tgz", diff --git a/package.json b/package.json index ee9e7dd..6c21abd 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,8 @@ "svg-loaders-react": "^2.2.1", "ws": "^7.3.0", "xstate": "^4.10.0", - "yargs": "^15.3.1" + "yargs": "^15.3.1", + "yjs": "^13.2.0" }, "devDependencies": { "@babel/core": "^7.10.2", diff --git a/src/node/StreamWindow.js b/src/node/StreamWindow.js index cb3ef10..57c08e1 100644 --- a/src/node/StreamWindow.js +++ b/src/node/StreamWindow.js @@ -145,7 +145,7 @@ export default class StreamWindow extends EventEmitter { ) } - setViews(viewContentMap) { + setViews(viewContentMap, streams) { const { gridCount, spaceWidth, spaceHeight } = this.config const { win, views } = this const boxes = boxesFromViewContentMap(gridCount, gridCount, viewContentMap) diff --git a/src/node/index.js b/src/node/index.js index 1b6bc1f..7174897 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -1,6 +1,7 @@ import fs from 'fs' import yargs from 'yargs' import TOML from '@iarna/toml' +import * as Y from 'yjs' import { Repeater } from '@repeaterjs/repeater' import { app, shell, session, BrowserWindow } from 'electron' @@ -221,12 +222,37 @@ async function main() { views: [], streamdelay: null, } + + const stateDoc = new Y.Doc() + const viewsState = stateDoc.getMap('views') + stateDoc.transact(() => { + for (let i = 0; i < argv.grid.count ** 2; i++) { + const data = new Y.Map() + data.set('streamId', '') + viewsState.set(i, data) + } + }) + viewsState.observeDeep(() => { + const viewContentMap = new Map() + for (const [key, viewData] of viewsState) { + const stream = clientState.streams.find( + (s) => s._id === viewData.get('streamId'), + ) + if (!stream) { + continue + } + viewContentMap.set(key, { + url: stream.link, + kind: stream.kind || 'video', + }) + } + streamWindow.setViews(viewContentMap) + }) + const getInitialState = () => clientState let broadcastState = () => {} const onMessage = (msg) => { - if (msg.type === 'set-views') { - streamWindow.setViews(new Map(msg.views)) - } else if (msg.type === 'set-listening-view') { + if (msg.type === 'set-listening-view') { streamWindow.setListeningView(msg.viewIdx) } else if (msg.type === 'set-view-blurred') { streamWindow.setViewBlurred(msg.viewIdx, msg.blurred) @@ -277,6 +303,7 @@ async function main() { password: argv.control.password, getInitialState, onMessage, + stateDoc, })) if (argv.control.open) { shell.openExternal(argv.control.address) diff --git a/src/node/server.js b/src/node/server.js index 4d6f7ae..39417e8 100644 --- a/src/node/server.js +++ b/src/node/server.js @@ -11,10 +11,18 @@ import route from 'koa-route' import serveStatic from 'koa-static' import views from 'koa-views' import websocket from 'koa-easy-ws' +import * as Y from 'yjs' const webDistPath = path.join(app.getAppPath(), 'web') -function initApp({ username, password, baseURL, getInitialState, onMessage }) { +function initApp({ + username, + password, + baseURL, + getInitialState, + onMessage, + stateDoc, +}) { const expectedOrigin = new URL(baseURL).origin const sockets = new Set() @@ -47,16 +55,23 @@ function initApp({ username, password, baseURL, getInitialState, onMessage }) { const ws = await ctx.ws() sockets.add(ws) + ws.binaryType = 'arraybuffer' + ws.on('close', () => { sockets.delete(ws) }) - ws.on('message', (dataText) => { + ws.on('message', (rawData) => { + if (rawData instanceof ArrayBuffer) { + Y.applyUpdate(stateDoc, new Uint8Array(rawData)) + return + } + let data try { - data = JSON.parse(dataText) + data = JSON.parse(rawData) } catch (err) { - console.warn('received unexpected ws data:', dataText) + console.warn('received unexpected ws data:', rawData) return } @@ -69,6 +84,7 @@ function initApp({ username, password, baseURL, getInitialState, onMessage }) { const state = getInitialState() ws.send(JSON.stringify({ type: 'state', state })) + ws.send(Y.encodeStateAsUpdate(stateDoc)) return } ctx.status = 404 @@ -95,6 +111,7 @@ export default async function initWebServer({ password, getInitialState, onMessage, + stateDoc, }) { let { protocol, hostname, port } = new URL(baseURL) if (!port) { @@ -110,6 +127,7 @@ export default async function initWebServer({ baseURL, getInitialState, onMessage, + stateDoc, }) let server diff --git a/src/web/control.js b/src/web/control.js index 501637b..5351887 100644 --- a/src/web/control.js +++ b/src/web/control.js @@ -2,6 +2,7 @@ import range from 'lodash/range' import sortBy from 'lodash/sortBy' import truncate from 'lodash/truncate' import ReconnectingWebSocket from 'reconnecting-websocket' +import * as Y from 'yjs' import { h, Fragment, render } from 'preact' import { useEffect, useState, useCallback, useRef } from 'preact/hooks' import { State } from 'xstate' @@ -38,9 +39,29 @@ const hotkeyTriggers = [ 'p', ] +function useYDoc(existingDoc, keys) { + const { current: doc } = useRef(existingDoc || new Y.Doc()) + const [docValue, setDocValue] = useState() + useEffect(() => { + function updateDocValue() { + const valueCopy = Object.fromEntries( + keys.map((k) => [k, doc.getMap(k).toJSON()]), + ) + setDocValue(valueCopy) + } + updateDocValue() + doc.on('update', updateDocValue) + return () => { + doc.off('update', updateDocValue) + } + }, []) + return [docValue, doc] +} + function App({ wsEndpoint }) { const wsRef = useRef() const [isConnected, setIsConnected] = useState(false) + const [sharedState, stateDoc] = useYDoc(null, ['views']) const [config, setConfig] = useState({}) const [streams, setStreams] = useState([]) const [customStreams, setCustomStreams] = useState([]) @@ -55,9 +76,15 @@ function App({ wsEndpoint }) { minReconnectionDelay: 1000 + Math.random() * 500, reconnectionDelayGrowFactor: 1.1, }) + ws.binaryType = 'arraybuffer' ws.addEventListener('open', () => setIsConnected(true)) ws.addEventListener('close', () => setIsConnected(false)) ws.addEventListener('message', (ev) => { + if (ev.data instanceof ArrayBuffer) { + Y.applyUpdate(stateDoc, new Uint8Array(ev.data)) + return + } + const msg = JSON.parse(ev.data) if (msg.type === 'state') { const { @@ -68,9 +95,7 @@ function App({ wsEndpoint }) { } = msg.state const newStateIdxMap = new Map() for (const viewState of views) { - const { pos, content } = viewState.context - const stream = newStreams.find((d) => d.link === content.url) - const streamId = stream?._id + const { pos } = viewState.context const state = State.from(viewState.state) const isListening = state.matches( 'displaying.running.audio.listening', @@ -81,8 +106,6 @@ function App({ wsEndpoint }) { newStateIdxMap.set(space, {}) } Object.assign(newStateIdxMap.get(space), { - streamId, - content, state, isListening, isBlurred, @@ -103,33 +126,21 @@ function App({ wsEndpoint }) { console.warn('unexpected ws message', msg) } }) + stateDoc.on('update', (update) => { + ws.send(update) + }) wsRef.current = ws }, []) const handleSetView = useCallback( (idx, streamId) => { - const newSpaceIdxMap = new Map(stateIdxMap) const stream = streams.find((d) => d._id === streamId) - if (stream) { - const content = { - url: stream?.link, - kind: stream?.kind || 'video', - } - newSpaceIdxMap.set(idx, { - ...newSpaceIdxMap.get(idx), - streamId, - content, - }) - } else { - newSpaceIdxMap.delete(idx) - } - const views = Array.from(newSpaceIdxMap, ([space, { content }]) => [ - space, - content, - ]) - wsRef.current.send(JSON.stringify({ type: 'set-views', views })) + stateDoc + .getMap('views') + .get(String(idx)) + .set('streamId', stream ? streamId : '') }, - [streams, stateIdxMap], + [stateDoc, streams], ) const handleSetListening = useCallback((idx, listening) => { @@ -160,14 +171,21 @@ function App({ wsEndpoint }) { ) }, []) - const handleBrowse = useCallback((url) => { - wsRef.current.send( - JSON.stringify({ - type: 'browse', - url, - }), - ) - }, []) + const handleBrowse = useCallback( + (streamId) => { + const stream = streams.find((d) => d._id === streamId) + if (!stream) { + return + } + wsRef.current.send( + JSON.stringify({ + type: 'browse', + url: stream.link, + }), + ) + }, + [streams], + ) const handleDevTools = useCallback((idx) => { wsRef.current.send( @@ -181,14 +199,14 @@ function App({ wsEndpoint }) { const handleClickId = useCallback( (streamId) => { const availableIdx = range(gridCount * gridCount).find( - (i) => !stateIdxMap.has(i), + (i) => !sharedState.views[i].streamId, ) if (availableIdx === undefined) { return } handleSetView(availableIdx, streamId) }, - [gridCount, stateIdxMap], + [gridCount, sharedState], ) const handleChangeCustomStream = useCallback((idx, customStream) => { @@ -266,17 +284,12 @@ function App({ wsEndpoint }) { {range(0, gridCount).map((x) => { const idx = gridCount * y + x - const { - streamId = '', - isListening = false, - isBlurred = false, - content = { url: '' }, - state, - } = stateIdxMap.get(idx) || {} + const { isListening = false, isBlurred = false, state } = + stateIdxMap.get(idx) || {} + const { streamId } = sharedState.views?.[idx] || '' return ( onBrowse(url), [url, onBrowse]) + const handleBrowseClick = useCallback(() => onBrowse(spaceValue), [ + spaceValue, + onBrowse, + ]) const handleDevToolsClick = useCallback(() => onDevTools(idx), [ idx, onDevTools,