mirror of
https://github.com/streamwall/streamwall.git
synced 2026-01-31 09:22:49 -05:00
Send diffs of state objects to clients
This commit is contained in:
14
package-lock.json
generated
14
package-lock.json
generated
@@ -4431,6 +4431,11 @@
|
|||||||
"integrity": "sha512-ZIzRpLJrOj7jjP2miAtgqIfmzbxa4ZOr5jJc601zklsfEx9oTzmmj2nVpIPRpNlRTIh8lc1kyViIY7BWSGNmKw==",
|
"integrity": "sha512-ZIzRpLJrOj7jjP2miAtgqIfmzbxa4ZOr5jJc601zklsfEx9oTzmmj2nVpIPRpNlRTIh8lc1kyViIY7BWSGNmKw==",
|
||||||
"optional": true
|
"optional": true
|
||||||
},
|
},
|
||||||
|
"diff-match-patch": {
|
||||||
|
"version": "1.0.5",
|
||||||
|
"resolved": "https://registry.npmjs.org/diff-match-patch/-/diff-match-patch-1.0.5.tgz",
|
||||||
|
"integrity": "sha512-IayShXAgj/QMXgB0IWmKx+rOPuGMhqm5w6jvFxmVenXKIzRqTAAsbBPT3kWQeGANj3jGgvcvv4yK6SxqYmikgw=="
|
||||||
|
},
|
||||||
"diff-sequences": {
|
"diff-sequences": {
|
||||||
"version": "26.0.0",
|
"version": "26.0.0",
|
||||||
"resolved": "https://registry.npmjs.org/diff-sequences/-/diff-sequences-26.0.0.tgz",
|
"resolved": "https://registry.npmjs.org/diff-sequences/-/diff-sequences-26.0.0.tgz",
|
||||||
@@ -8168,6 +8173,15 @@
|
|||||||
"minimist": "^1.2.0"
|
"minimist": "^1.2.0"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"jsondiffpatch": {
|
||||||
|
"version": "0.4.1",
|
||||||
|
"resolved": "https://registry.npmjs.org/jsondiffpatch/-/jsondiffpatch-0.4.1.tgz",
|
||||||
|
"integrity": "sha512-t0etAxTUk1w5MYdNOkZBZ8rvYYN5iL+2dHCCx/DpkFm/bW28M6y5nUS83D4XdZiHy35Fpaw6LBb+F88fHZnVCw==",
|
||||||
|
"requires": {
|
||||||
|
"chalk": "^2.3.0",
|
||||||
|
"diff-match-patch": "^1.0.0"
|
||||||
|
}
|
||||||
|
},
|
||||||
"jsonfile": {
|
"jsonfile": {
|
||||||
"version": "4.0.0",
|
"version": "4.0.0",
|
||||||
"resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-4.0.0.tgz",
|
"resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-4.0.0.tgz",
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
"dank-twitch-irc": "^3.3.0",
|
"dank-twitch-irc": "^3.3.0",
|
||||||
"ejs": "^3.1.3",
|
"ejs": "^3.1.3",
|
||||||
"electron": "^9.0.4",
|
"electron": "^9.0.4",
|
||||||
|
"jsondiffpatch": "^0.4.1",
|
||||||
"koa": "^2.12.1",
|
"koa": "^2.12.1",
|
||||||
"koa-basic-auth": "^4.0.0",
|
"koa-basic-auth": "^4.0.0",
|
||||||
"koa-easy-ws": "^1.1.3",
|
"koa-easy-ws": "^1.1.3",
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import fs from 'fs'
|
|||||||
import yargs from 'yargs'
|
import yargs from 'yargs'
|
||||||
import TOML from '@iarna/toml'
|
import TOML from '@iarna/toml'
|
||||||
import * as Y from 'yjs'
|
import * as Y from 'yjs'
|
||||||
|
import { create as createJSONDiffPatch } from 'jsondiffpatch'
|
||||||
import { Repeater } from '@repeaterjs/repeater'
|
import { Repeater } from '@repeaterjs/repeater'
|
||||||
import { app, shell, session, BrowserWindow } from 'electron'
|
import { app, shell, session, BrowserWindow } from 'electron'
|
||||||
|
|
||||||
@@ -211,7 +212,7 @@ async function main() {
|
|||||||
let twitchBot = null
|
let twitchBot = null
|
||||||
let streamdelayClient = null
|
let streamdelayClient = null
|
||||||
|
|
||||||
const clientState = {
|
let clientState = {
|
||||||
config: {
|
config: {
|
||||||
width: argv.window.width,
|
width: argv.window.width,
|
||||||
height: argv.window.height,
|
height: argv.window.height,
|
||||||
@@ -250,7 +251,7 @@ async function main() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
const getInitialState = () => clientState
|
const getInitialState = () => clientState
|
||||||
let broadcastState = () => {}
|
let broadcast = () => {}
|
||||||
const onMessage = (msg) => {
|
const onMessage = (msg) => {
|
||||||
if (msg.type === 'set-listening-view') {
|
if (msg.type === 'set-listening-view') {
|
||||||
streamWindow.setListeningView(msg.viewIdx)
|
streamWindow.setListeningView(msg.viewIdx)
|
||||||
@@ -291,8 +292,28 @@ async function main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const stateDiff = createJSONDiffPatch({
|
||||||
|
objectHash: (obj, idx) => obj._id || `$$index:${idx}`,
|
||||||
|
})
|
||||||
|
function updateState(newState) {
|
||||||
|
const lastClientState = clientState
|
||||||
|
clientState = { ...clientState, ...newState }
|
||||||
|
const delta = stateDiff.diff(lastClientState, clientState)
|
||||||
|
if (!delta) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
broadcast({
|
||||||
|
type: 'state-delta',
|
||||||
|
delta,
|
||||||
|
})
|
||||||
|
streamWindow.send('state', clientState)
|
||||||
|
if (twitchBot) {
|
||||||
|
twitchBot.onState(clientState)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (argv.control.address) {
|
if (argv.control.address) {
|
||||||
;({ broadcastState } = await initWebServer({
|
;({ broadcast } = await initWebServer({
|
||||||
certDir: argv.cert.dir,
|
certDir: argv.cert.dir,
|
||||||
certProduction: argv.cert.production,
|
certProduction: argv.cert.production,
|
||||||
email: argv.cert.email,
|
email: argv.cert.email,
|
||||||
@@ -316,8 +337,7 @@ async function main() {
|
|||||||
key: argv.streamdelay.key,
|
key: argv.streamdelay.key,
|
||||||
})
|
})
|
||||||
streamdelayClient.on('state', (state) => {
|
streamdelayClient.on('state', (state) => {
|
||||||
clientState.streamdelay = state
|
updateState({ streamdelay: state })
|
||||||
broadcastState(clientState)
|
|
||||||
})
|
})
|
||||||
streamdelayClient.connect()
|
streamdelayClient.connect()
|
||||||
}
|
}
|
||||||
@@ -328,12 +348,7 @@ async function main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
streamWindow.on('state', (viewStates) => {
|
streamWindow.on('state', (viewStates) => {
|
||||||
clientState.views = viewStates
|
updateState({ views: viewStates })
|
||||||
streamWindow.send('state', clientState)
|
|
||||||
broadcastState(clientState)
|
|
||||||
if (twitchBot) {
|
|
||||||
twitchBot.onState(clientState)
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const dataSources = [
|
const dataSources = [
|
||||||
@@ -348,9 +363,7 @@ async function main() {
|
|||||||
|
|
||||||
for await (const rawStreams of combineDataSources(dataSources)) {
|
for await (const rawStreams of combineDataSources(dataSources)) {
|
||||||
const streams = idGen.process(rawStreams)
|
const streams = idGen.process(rawStreams)
|
||||||
clientState.streams = streams
|
updateState({ streams })
|
||||||
streamWindow.send('state', clientState)
|
|
||||||
broadcastState(clientState)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -91,9 +91,9 @@ function initApp({
|
|||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
const broadcastState = (state) => {
|
const broadcast = (data) => {
|
||||||
for (const ws of sockets) {
|
for (const ws of sockets) {
|
||||||
ws.send(JSON.stringify({ type: 'state', state }))
|
ws.send(JSON.stringify(data))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -103,7 +103,7 @@ function initApp({
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
return { app, broadcastState }
|
return { app, broadcast }
|
||||||
}
|
}
|
||||||
|
|
||||||
export default async function initWebServer({
|
export default async function initWebServer({
|
||||||
@@ -127,7 +127,7 @@ export default async function initWebServer({
|
|||||||
port = overridePort
|
port = overridePort
|
||||||
}
|
}
|
||||||
|
|
||||||
const { app, broadcastState } = initApp({
|
const { app, broadcast } = initApp({
|
||||||
username,
|
username,
|
||||||
password,
|
password,
|
||||||
baseURL,
|
baseURL,
|
||||||
@@ -153,5 +153,5 @@ export default async function initWebServer({
|
|||||||
const listen = promisify(server.listen).bind(server)
|
const listen = promisify(server.listen).bind(server)
|
||||||
await listen(port, overrideHostname || hostname)
|
await listen(port, overrideHostname || hostname)
|
||||||
|
|
||||||
return { broadcastState }
|
return { broadcast }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import sortBy from 'lodash/sortBy'
|
|||||||
import truncate from 'lodash/truncate'
|
import truncate from 'lodash/truncate'
|
||||||
import ReconnectingWebSocket from 'reconnecting-websocket'
|
import ReconnectingWebSocket from 'reconnecting-websocket'
|
||||||
import * as Y from 'yjs'
|
import * as Y from 'yjs'
|
||||||
|
import { patch as patchJSON } from 'jsondiffpatch'
|
||||||
import { h, Fragment, render } from 'preact'
|
import { h, Fragment, render } from 'preact'
|
||||||
import { useEffect, useState, useCallback, useRef } from 'preact/hooks'
|
import { useEffect, useState, useCallback, useRef } from 'preact/hooks'
|
||||||
import { State } from 'xstate'
|
import { State } from 'xstate'
|
||||||
@@ -69,6 +70,7 @@ function useStreamwallConnection(wsEndpoint) {
|
|||||||
const [delayState, setDelayState] = useState()
|
const [delayState, setDelayState] = useState()
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
|
let lastStateData
|
||||||
const ws = new ReconnectingWebSocket(wsEndpoint, [], {
|
const ws = new ReconnectingWebSocket(wsEndpoint, [], {
|
||||||
maxReconnectionDelay: 5000,
|
maxReconnectionDelay: 5000,
|
||||||
minReconnectionDelay: 1000 + Math.random() * 500,
|
minReconnectionDelay: 1000 + Math.random() * 500,
|
||||||
@@ -85,13 +87,21 @@ function useStreamwallConnection(wsEndpoint) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
const msg = JSON.parse(ev.data)
|
const msg = JSON.parse(ev.data)
|
||||||
if (msg.type === 'state') {
|
if (msg.type === 'state' || msg.type === 'state-delta') {
|
||||||
|
let state
|
||||||
|
if (msg.type === 'state') {
|
||||||
|
state = msg.state
|
||||||
|
} else {
|
||||||
|
state = patchJSON(lastStateData, msg.delta)
|
||||||
|
}
|
||||||
|
lastStateData = state
|
||||||
|
|
||||||
const {
|
const {
|
||||||
config: newConfig,
|
config: newConfig,
|
||||||
streams: newStreams,
|
streams: newStreams,
|
||||||
views,
|
views,
|
||||||
streamdelay,
|
streamdelay,
|
||||||
} = msg.state
|
} = state
|
||||||
const newStateIdxMap = new Map()
|
const newStateIdxMap = new Map()
|
||||||
for (const viewState of views) {
|
for (const viewState of views) {
|
||||||
const { pos } = viewState.context
|
const { pos } = viewState.context
|
||||||
|
|||||||
Reference in New Issue
Block a user