diff --git a/package-lock.json b/package-lock.json index 3c7e84d..7494ed6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6219,6 +6219,11 @@ "integrity": "sha1-TkMekrEalzFjaqH5yNHMvP2reN8=", "dev": true }, + "isomorphic.js": { + "version": "0.1.4", + "resolved": "https://registry.npmjs.org/isomorphic.js/-/isomorphic.js-0.1.4.tgz", + "integrity": "sha512-t9zbgkjE7f9f2M6OSW49YEq0lUrSdAllBbWFUZoeck/rnnFae6UlhmDtXWs48VJY3ZpryCoZsRiAiKD44hPIGQ==" + }, "isstream": { "version": "0.1.2", "resolved": "https://registry.npmjs.org/isstream/-/isstream-0.1.2.tgz", @@ -8419,6 +8424,14 @@ "type-check": "~0.3.2" } }, + "lib0": { + "version": "0.2.30", + "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.30.tgz", + "integrity": "sha512-Mp2SaW5uJwMD1atU/MJ7DGxGf5DBaokZGhByg9IYLCRfE+D+DeksjbnweaFvUoWrRpDCJLtg1jY8xENNICj5Mg==", + "requires": { + "isomorphic.js": "^0.1.3" + } + }, "lines-and-columns": { "version": "1.1.6", "resolved": "https://registry.npmjs.org/lines-and-columns/-/lines-and-columns-1.1.6.tgz", @@ -12475,6 +12488,14 @@ "fd-slicer": "~1.1.0" } }, + "yjs": { + "version": "13.2.0", + "resolved": "https://registry.npmjs.org/yjs/-/yjs-13.2.0.tgz", + "integrity": "sha512-0augWOespX5KC8de62GCR8WloZhAyBfEF3ZPDpjZlRs6yho7iFKqarpzxxJgmP8zA/pNJiV1EIpMezSxEdNdDw==", + "requires": { + "lib0": "^0.2.27" + } + }, "ylru": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/ylru/-/ylru-1.2.1.tgz", diff --git a/package.json b/package.json index ee9e7dd..6c21abd 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,8 @@ "svg-loaders-react": "^2.2.1", "ws": "^7.3.0", "xstate": "^4.10.0", - "yargs": "^15.3.1" + "yargs": "^15.3.1", + "yjs": "^13.2.0" }, "devDependencies": { "@babel/core": "^7.10.2", diff --git a/src/node/StreamWindow.js b/src/node/StreamWindow.js index cb3ef10..57c08e1 100644 --- a/src/node/StreamWindow.js +++ b/src/node/StreamWindow.js @@ -145,7 +145,7 @@ export default class StreamWindow extends EventEmitter { ) } - setViews(viewContentMap) { + setViews(viewContentMap, streams) { const { gridCount, spaceWidth, spaceHeight } = this.config const { win, views } = this const boxes = boxesFromViewContentMap(gridCount, gridCount, viewContentMap) diff --git a/src/node/index.js b/src/node/index.js index 1b6bc1f..7174897 100644 --- a/src/node/index.js +++ b/src/node/index.js @@ -1,6 +1,7 @@ import fs from 'fs' import yargs from 'yargs' import TOML from '@iarna/toml' +import * as Y from 'yjs' import { Repeater } from '@repeaterjs/repeater' import { app, shell, session, BrowserWindow } from 'electron' @@ -221,12 +222,37 @@ async function main() { views: [], streamdelay: null, } + + const stateDoc = new Y.Doc() + const viewsState = stateDoc.getMap('views') + stateDoc.transact(() => { + for (let i = 0; i < argv.grid.count ** 2; i++) { + const data = new Y.Map() + data.set('streamId', '') + viewsState.set(i, data) + } + }) + viewsState.observeDeep(() => { + const viewContentMap = new Map() + for (const [key, viewData] of viewsState) { + const stream = clientState.streams.find( + (s) => s._id === viewData.get('streamId'), + ) + if (!stream) { + continue + } + viewContentMap.set(key, { + url: stream.link, + kind: stream.kind || 'video', + }) + } + streamWindow.setViews(viewContentMap) + }) + const getInitialState = () => clientState let broadcastState = () => {} const onMessage = (msg) => { - if (msg.type === 'set-views') { - streamWindow.setViews(new Map(msg.views)) - } else if (msg.type === 'set-listening-view') { + if (msg.type === 'set-listening-view') { streamWindow.setListeningView(msg.viewIdx) } else if (msg.type === 'set-view-blurred') { streamWindow.setViewBlurred(msg.viewIdx, msg.blurred) @@ -277,6 +303,7 @@ async function main() { password: argv.control.password, getInitialState, onMessage, + stateDoc, })) if (argv.control.open) { shell.openExternal(argv.control.address) diff --git a/src/node/server.js b/src/node/server.js index 4d6f7ae..39417e8 100644 --- a/src/node/server.js +++ b/src/node/server.js @@ -11,10 +11,18 @@ import route from 'koa-route' import serveStatic from 'koa-static' import views from 'koa-views' import websocket from 'koa-easy-ws' +import * as Y from 'yjs' const webDistPath = path.join(app.getAppPath(), 'web') -function initApp({ username, password, baseURL, getInitialState, onMessage }) { +function initApp({ + username, + password, + baseURL, + getInitialState, + onMessage, + stateDoc, +}) { const expectedOrigin = new URL(baseURL).origin const sockets = new Set() @@ -47,16 +55,23 @@ function initApp({ username, password, baseURL, getInitialState, onMessage }) { const ws = await ctx.ws() sockets.add(ws) + ws.binaryType = 'arraybuffer' + ws.on('close', () => { sockets.delete(ws) }) - ws.on('message', (dataText) => { + ws.on('message', (rawData) => { + if (rawData instanceof ArrayBuffer) { + Y.applyUpdate(stateDoc, new Uint8Array(rawData)) + return + } + let data try { - data = JSON.parse(dataText) + data = JSON.parse(rawData) } catch (err) { - console.warn('received unexpected ws data:', dataText) + console.warn('received unexpected ws data:', rawData) return } @@ -69,6 +84,7 @@ function initApp({ username, password, baseURL, getInitialState, onMessage }) { const state = getInitialState() ws.send(JSON.stringify({ type: 'state', state })) + ws.send(Y.encodeStateAsUpdate(stateDoc)) return } ctx.status = 404 @@ -95,6 +111,7 @@ export default async function initWebServer({ password, getInitialState, onMessage, + stateDoc, }) { let { protocol, hostname, port } = new URL(baseURL) if (!port) { @@ -110,6 +127,7 @@ export default async function initWebServer({ baseURL, getInitialState, onMessage, + stateDoc, }) let server diff --git a/src/web/control.js b/src/web/control.js index 501637b..5351887 100644 --- a/src/web/control.js +++ b/src/web/control.js @@ -2,6 +2,7 @@ import range from 'lodash/range' import sortBy from 'lodash/sortBy' import truncate from 'lodash/truncate' import ReconnectingWebSocket from 'reconnecting-websocket' +import * as Y from 'yjs' import { h, Fragment, render } from 'preact' import { useEffect, useState, useCallback, useRef } from 'preact/hooks' import { State } from 'xstate' @@ -38,9 +39,29 @@ const hotkeyTriggers = [ 'p', ] +function useYDoc(existingDoc, keys) { + const { current: doc } = useRef(existingDoc || new Y.Doc()) + const [docValue, setDocValue] = useState() + useEffect(() => { + function updateDocValue() { + const valueCopy = Object.fromEntries( + keys.map((k) => [k, doc.getMap(k).toJSON()]), + ) + setDocValue(valueCopy) + } + updateDocValue() + doc.on('update', updateDocValue) + return () => { + doc.off('update', updateDocValue) + } + }, []) + return [docValue, doc] +} + function App({ wsEndpoint }) { const wsRef = useRef() const [isConnected, setIsConnected] = useState(false) + const [sharedState, stateDoc] = useYDoc(null, ['views']) const [config, setConfig] = useState({}) const [streams, setStreams] = useState([]) const [customStreams, setCustomStreams] = useState([]) @@ -55,9 +76,15 @@ function App({ wsEndpoint }) { minReconnectionDelay: 1000 + Math.random() * 500, reconnectionDelayGrowFactor: 1.1, }) + ws.binaryType = 'arraybuffer' ws.addEventListener('open', () => setIsConnected(true)) ws.addEventListener('close', () => setIsConnected(false)) ws.addEventListener('message', (ev) => { + if (ev.data instanceof ArrayBuffer) { + Y.applyUpdate(stateDoc, new Uint8Array(ev.data)) + return + } + const msg = JSON.parse(ev.data) if (msg.type === 'state') { const { @@ -68,9 +95,7 @@ function App({ wsEndpoint }) { } = msg.state const newStateIdxMap = new Map() for (const viewState of views) { - const { pos, content } = viewState.context - const stream = newStreams.find((d) => d.link === content.url) - const streamId = stream?._id + const { pos } = viewState.context const state = State.from(viewState.state) const isListening = state.matches( 'displaying.running.audio.listening', @@ -81,8 +106,6 @@ function App({ wsEndpoint }) { newStateIdxMap.set(space, {}) } Object.assign(newStateIdxMap.get(space), { - streamId, - content, state, isListening, isBlurred, @@ -103,33 +126,21 @@ function App({ wsEndpoint }) { console.warn('unexpected ws message', msg) } }) + stateDoc.on('update', (update) => { + ws.send(update) + }) wsRef.current = ws }, []) const handleSetView = useCallback( (idx, streamId) => { - const newSpaceIdxMap = new Map(stateIdxMap) const stream = streams.find((d) => d._id === streamId) - if (stream) { - const content = { - url: stream?.link, - kind: stream?.kind || 'video', - } - newSpaceIdxMap.set(idx, { - ...newSpaceIdxMap.get(idx), - streamId, - content, - }) - } else { - newSpaceIdxMap.delete(idx) - } - const views = Array.from(newSpaceIdxMap, ([space, { content }]) => [ - space, - content, - ]) - wsRef.current.send(JSON.stringify({ type: 'set-views', views })) + stateDoc + .getMap('views') + .get(String(idx)) + .set('streamId', stream ? streamId : '') }, - [streams, stateIdxMap], + [stateDoc, streams], ) const handleSetListening = useCallback((idx, listening) => { @@ -160,14 +171,21 @@ function App({ wsEndpoint }) { ) }, []) - const handleBrowse = useCallback((url) => { - wsRef.current.send( - JSON.stringify({ - type: 'browse', - url, - }), - ) - }, []) + const handleBrowse = useCallback( + (streamId) => { + const stream = streams.find((d) => d._id === streamId) + if (!stream) { + return + } + wsRef.current.send( + JSON.stringify({ + type: 'browse', + url: stream.link, + }), + ) + }, + [streams], + ) const handleDevTools = useCallback((idx) => { wsRef.current.send( @@ -181,14 +199,14 @@ function App({ wsEndpoint }) { const handleClickId = useCallback( (streamId) => { const availableIdx = range(gridCount * gridCount).find( - (i) => !stateIdxMap.has(i), + (i) => !sharedState.views[i].streamId, ) if (availableIdx === undefined) { return } handleSetView(availableIdx, streamId) }, - [gridCount, stateIdxMap], + [gridCount, sharedState], ) const handleChangeCustomStream = useCallback((idx, customStream) => { @@ -266,17 +284,12 @@ function App({ wsEndpoint }) { {range(0, gridCount).map((x) => { const idx = gridCount * y + x - const { - streamId = '', - isListening = false, - isBlurred = false, - content = { url: '' }, - state, - } = stateIdxMap.get(idx) || {} + const { isListening = false, isBlurred = false, state } = + stateIdxMap.get(idx) || {} + const { streamId } = sharedState.views?.[idx] || '' return ( onBrowse(url), [url, onBrowse]) + const handleBrowseClick = useCallback(() => onBrowse(spaceValue), [ + spaceValue, + onBrowse, + ]) const handleDevToolsClick = useCallback(() => onDevTools(idx), [ idx, onDevTools,