Use Y.js CRDT for view id mapping

This fixes glitches when typing in the grid boxes by allowing concurrent
writes and removing the need to read state from the server to render the
box values.
This commit is contained in:
Max Goodhart
2020-07-05 17:25:57 -07:00
parent 71572df9c5
commit 8f6210da20
6 changed files with 136 additions and 54 deletions

21
package-lock.json generated
View File

@@ -6219,6 +6219,11 @@
"integrity": "sha1-TkMekrEalzFjaqH5yNHMvP2reN8=",
"dev": true
},
"isomorphic.js": {
"version": "0.1.4",
"resolved": "https://registry.npmjs.org/isomorphic.js/-/isomorphic.js-0.1.4.tgz",
"integrity": "sha512-t9zbgkjE7f9f2M6OSW49YEq0lUrSdAllBbWFUZoeck/rnnFae6UlhmDtXWs48VJY3ZpryCoZsRiAiKD44hPIGQ=="
},
"isstream": {
"version": "0.1.2",
"resolved": "https://registry.npmjs.org/isstream/-/isstream-0.1.2.tgz",
@@ -8419,6 +8424,14 @@
"type-check": "~0.3.2"
}
},
"lib0": {
"version": "0.2.30",
"resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.30.tgz",
"integrity": "sha512-Mp2SaW5uJwMD1atU/MJ7DGxGf5DBaokZGhByg9IYLCRfE+D+DeksjbnweaFvUoWrRpDCJLtg1jY8xENNICj5Mg==",
"requires": {
"isomorphic.js": "^0.1.3"
}
},
"lines-and-columns": {
"version": "1.1.6",
"resolved": "https://registry.npmjs.org/lines-and-columns/-/lines-and-columns-1.1.6.tgz",
@@ -12475,6 +12488,14 @@
"fd-slicer": "~1.1.0"
}
},
"yjs": {
"version": "13.2.0",
"resolved": "https://registry.npmjs.org/yjs/-/yjs-13.2.0.tgz",
"integrity": "sha512-0augWOespX5KC8de62GCR8WloZhAyBfEF3ZPDpjZlRs6yho7iFKqarpzxxJgmP8zA/pNJiV1EIpMezSxEdNdDw==",
"requires": {
"lib0": "^0.2.27"
}
},
"ylru": {
"version": "1.2.1",
"resolved": "https://registry.npmjs.org/ylru/-/ylru-1.2.1.tgz",

View File

@@ -35,7 +35,8 @@
"svg-loaders-react": "^2.2.1",
"ws": "^7.3.0",
"xstate": "^4.10.0",
"yargs": "^15.3.1"
"yargs": "^15.3.1",
"yjs": "^13.2.0"
},
"devDependencies": {
"@babel/core": "^7.10.2",

View File

@@ -145,7 +145,7 @@ export default class StreamWindow extends EventEmitter {
)
}
setViews(viewContentMap) {
setViews(viewContentMap, streams) {
const { gridCount, spaceWidth, spaceHeight } = this.config
const { win, views } = this
const boxes = boxesFromViewContentMap(gridCount, gridCount, viewContentMap)

View File

@@ -1,6 +1,7 @@
import fs from 'fs'
import yargs from 'yargs'
import TOML from '@iarna/toml'
import * as Y from 'yjs'
import { Repeater } from '@repeaterjs/repeater'
import { app, shell, session, BrowserWindow } from 'electron'
@@ -221,12 +222,37 @@ async function main() {
views: [],
streamdelay: null,
}
const stateDoc = new Y.Doc()
const viewsState = stateDoc.getMap('views')
stateDoc.transact(() => {
for (let i = 0; i < argv.grid.count ** 2; i++) {
const data = new Y.Map()
data.set('streamId', '')
viewsState.set(i, data)
}
})
viewsState.observeDeep(() => {
const viewContentMap = new Map()
for (const [key, viewData] of viewsState) {
const stream = clientState.streams.find(
(s) => s._id === viewData.get('streamId'),
)
if (!stream) {
continue
}
viewContentMap.set(key, {
url: stream.link,
kind: stream.kind || 'video',
})
}
streamWindow.setViews(viewContentMap)
})
const getInitialState = () => clientState
let broadcastState = () => {}
const onMessage = (msg) => {
if (msg.type === 'set-views') {
streamWindow.setViews(new Map(msg.views))
} else if (msg.type === 'set-listening-view') {
if (msg.type === 'set-listening-view') {
streamWindow.setListeningView(msg.viewIdx)
} else if (msg.type === 'set-view-blurred') {
streamWindow.setViewBlurred(msg.viewIdx, msg.blurred)
@@ -277,6 +303,7 @@ async function main() {
password: argv.control.password,
getInitialState,
onMessage,
stateDoc,
}))
if (argv.control.open) {
shell.openExternal(argv.control.address)

View File

@@ -11,10 +11,18 @@ import route from 'koa-route'
import serveStatic from 'koa-static'
import views from 'koa-views'
import websocket from 'koa-easy-ws'
import * as Y from 'yjs'
const webDistPath = path.join(app.getAppPath(), 'web')
function initApp({ username, password, baseURL, getInitialState, onMessage }) {
function initApp({
username,
password,
baseURL,
getInitialState,
onMessage,
stateDoc,
}) {
const expectedOrigin = new URL(baseURL).origin
const sockets = new Set()
@@ -47,16 +55,23 @@ function initApp({ username, password, baseURL, getInitialState, onMessage }) {
const ws = await ctx.ws()
sockets.add(ws)
ws.binaryType = 'arraybuffer'
ws.on('close', () => {
sockets.delete(ws)
})
ws.on('message', (dataText) => {
ws.on('message', (rawData) => {
if (rawData instanceof ArrayBuffer) {
Y.applyUpdate(stateDoc, new Uint8Array(rawData))
return
}
let data
try {
data = JSON.parse(dataText)
data = JSON.parse(rawData)
} catch (err) {
console.warn('received unexpected ws data:', dataText)
console.warn('received unexpected ws data:', rawData)
return
}
@@ -69,6 +84,7 @@ function initApp({ username, password, baseURL, getInitialState, onMessage }) {
const state = getInitialState()
ws.send(JSON.stringify({ type: 'state', state }))
ws.send(Y.encodeStateAsUpdate(stateDoc))
return
}
ctx.status = 404
@@ -95,6 +111,7 @@ export default async function initWebServer({
password,
getInitialState,
onMessage,
stateDoc,
}) {
let { protocol, hostname, port } = new URL(baseURL)
if (!port) {
@@ -110,6 +127,7 @@ export default async function initWebServer({
baseURL,
getInitialState,
onMessage,
stateDoc,
})
let server

View File

@@ -2,6 +2,7 @@ import range from 'lodash/range'
import sortBy from 'lodash/sortBy'
import truncate from 'lodash/truncate'
import ReconnectingWebSocket from 'reconnecting-websocket'
import * as Y from 'yjs'
import { h, Fragment, render } from 'preact'
import { useEffect, useState, useCallback, useRef } from 'preact/hooks'
import { State } from 'xstate'
@@ -38,9 +39,29 @@ const hotkeyTriggers = [
'p',
]
function useYDoc(existingDoc, keys) {
const { current: doc } = useRef(existingDoc || new Y.Doc())
const [docValue, setDocValue] = useState()
useEffect(() => {
function updateDocValue() {
const valueCopy = Object.fromEntries(
keys.map((k) => [k, doc.getMap(k).toJSON()]),
)
setDocValue(valueCopy)
}
updateDocValue()
doc.on('update', updateDocValue)
return () => {
doc.off('update', updateDocValue)
}
}, [])
return [docValue, doc]
}
function App({ wsEndpoint }) {
const wsRef = useRef()
const [isConnected, setIsConnected] = useState(false)
const [sharedState, stateDoc] = useYDoc(null, ['views'])
const [config, setConfig] = useState({})
const [streams, setStreams] = useState([])
const [customStreams, setCustomStreams] = useState([])
@@ -55,9 +76,15 @@ function App({ wsEndpoint }) {
minReconnectionDelay: 1000 + Math.random() * 500,
reconnectionDelayGrowFactor: 1.1,
})
ws.binaryType = 'arraybuffer'
ws.addEventListener('open', () => setIsConnected(true))
ws.addEventListener('close', () => setIsConnected(false))
ws.addEventListener('message', (ev) => {
if (ev.data instanceof ArrayBuffer) {
Y.applyUpdate(stateDoc, new Uint8Array(ev.data))
return
}
const msg = JSON.parse(ev.data)
if (msg.type === 'state') {
const {
@@ -68,9 +95,7 @@ function App({ wsEndpoint }) {
} = msg.state
const newStateIdxMap = new Map()
for (const viewState of views) {
const { pos, content } = viewState.context
const stream = newStreams.find((d) => d.link === content.url)
const streamId = stream?._id
const { pos } = viewState.context
const state = State.from(viewState.state)
const isListening = state.matches(
'displaying.running.audio.listening',
@@ -81,8 +106,6 @@ function App({ wsEndpoint }) {
newStateIdxMap.set(space, {})
}
Object.assign(newStateIdxMap.get(space), {
streamId,
content,
state,
isListening,
isBlurred,
@@ -103,33 +126,21 @@ function App({ wsEndpoint }) {
console.warn('unexpected ws message', msg)
}
})
stateDoc.on('update', (update) => {
ws.send(update)
})
wsRef.current = ws
}, [])
const handleSetView = useCallback(
(idx, streamId) => {
const newSpaceIdxMap = new Map(stateIdxMap)
const stream = streams.find((d) => d._id === streamId)
if (stream) {
const content = {
url: stream?.link,
kind: stream?.kind || 'video',
}
newSpaceIdxMap.set(idx, {
...newSpaceIdxMap.get(idx),
streamId,
content,
})
} else {
newSpaceIdxMap.delete(idx)
}
const views = Array.from(newSpaceIdxMap, ([space, { content }]) => [
space,
content,
])
wsRef.current.send(JSON.stringify({ type: 'set-views', views }))
stateDoc
.getMap('views')
.get(String(idx))
.set('streamId', stream ? streamId : '')
},
[streams, stateIdxMap],
[stateDoc, streams],
)
const handleSetListening = useCallback((idx, listening) => {
@@ -160,14 +171,21 @@ function App({ wsEndpoint }) {
)
}, [])
const handleBrowse = useCallback((url) => {
wsRef.current.send(
JSON.stringify({
type: 'browse',
url,
}),
)
}, [])
const handleBrowse = useCallback(
(streamId) => {
const stream = streams.find((d) => d._id === streamId)
if (!stream) {
return
}
wsRef.current.send(
JSON.stringify({
type: 'browse',
url: stream.link,
}),
)
},
[streams],
)
const handleDevTools = useCallback((idx) => {
wsRef.current.send(
@@ -181,14 +199,14 @@ function App({ wsEndpoint }) {
const handleClickId = useCallback(
(streamId) => {
const availableIdx = range(gridCount * gridCount).find(
(i) => !stateIdxMap.has(i),
(i) => !sharedState.views[i].streamId,
)
if (availableIdx === undefined) {
return
}
handleSetView(availableIdx, streamId)
},
[gridCount, stateIdxMap],
[gridCount, sharedState],
)
const handleChangeCustomStream = useCallback((idx, customStream) => {
@@ -266,17 +284,12 @@ function App({ wsEndpoint }) {
<StyledGridLine>
{range(0, gridCount).map((x) => {
const idx = gridCount * y + x
const {
streamId = '',
isListening = false,
isBlurred = false,
content = { url: '' },
state,
} = stateIdxMap.get(idx) || {}
const { isListening = false, isBlurred = false, state } =
stateIdxMap.get(idx) || {}
const { streamId } = sharedState.views?.[idx] || ''
return (
<GridInput
idx={idx}
url={content.url}
spaceValue={streamId}
isError={state && state.matches('displaying.error')}
isDisplaying={state && state.matches('displaying')}
@@ -396,7 +409,6 @@ function StreamLine({
function GridInput({
idx,
url,
onChangeSpace,
spaceValue,
isDisplaying,
@@ -437,7 +449,10 @@ function GridInput({
idx,
onReloadView,
])
const handleBrowseClick = useCallback(() => onBrowse(url), [url, onBrowse])
const handleBrowseClick = useCallback(() => onBrowse(spaceValue), [
spaceValue,
onBrowse,
])
const handleDevToolsClick = useCallback(() => onDevTools(idx), [
idx,
onDevTools,