From a36a07d17743e86fa5313a704b0373c448552e8e Mon Sep 17 00:00:00 2001 From: Max Goodhart Date: Tue, 17 Jun 2025 01:21:22 +0000 Subject: [PATCH] Persist local stream data --- packages/streamwall-control-ui/src/index.tsx | 2 +- packages/streamwall-shared/src/types.ts | 9 +++- packages/streamwall/src/main/data.ts | 51 ++++++++++++++------ packages/streamwall/src/main/index.ts | 21 +++++--- packages/streamwall/src/main/storage.ts | 19 +++++++- 5 files changed, 75 insertions(+), 27 deletions(-) diff --git a/packages/streamwall-control-ui/src/index.tsx b/packages/streamwall-control-ui/src/index.tsx index c7a4546..b7051fe 100644 --- a/packages/streamwall-control-ui/src/index.tsx +++ b/packages/streamwall-control-ui/src/index.tsx @@ -1297,7 +1297,7 @@ function CustomStreamInput({ return (
{' '} diff --git a/packages/streamwall-shared/src/types.ts b/packages/streamwall-shared/src/types.ts index 5b50e55..617afdb 100644 --- a/packages/streamwall-shared/src/types.ts +++ b/packages/streamwall-shared/src/types.ts @@ -23,16 +23,21 @@ export interface ContentViewInfo { export type ContentKind = 'video' | 'audio' | 'web' | 'background' | 'overlay' -export interface StreamData extends ContentDisplayOptions { +export interface StreamDataContent extends ContentDisplayOptions { kind: ContentKind link: string - label: string + label?: string labelPosition?: 'top-left' | 'top-right' | 'bottom-right' | 'bottom-left' source?: string notes?: string status?: string city?: string state?: string + _id?: string + _dataSource?: string +} + +export interface StreamData extends StreamDataContent { _id: string _dataSource: string } diff --git a/packages/streamwall/src/main/data.ts b/packages/streamwall/src/main/data.ts index 286d959..567a9cb 100644 --- a/packages/streamwall/src/main/data.ts +++ b/packages/streamwall/src/main/data.ts @@ -6,20 +6,24 @@ import { promises as fsPromises } from 'fs' import { isArray } from 'lodash-es' import fetch from 'node-fetch' import { promisify } from 'util' -import { StreamData, StreamList } from '../../../streamwall-shared/src/types' +import { + StreamData, + StreamDataContent, + StreamList, +} from '../../../streamwall-shared/src/types' const sleep = promisify(setTimeout) -type DataSource = AsyncGenerator +type DataSource = AsyncGenerator export async function* pollDataURL(url: string, intervalSecs: number) { const refreshInterval = intervalSecs * 1000 let lastData = [] while (true) { - let data: StreamData[] = [] + let data: StreamDataContent[] = [] try { const resp = await fetch(url) - data = (await resp.json()) as StreamData[] + data = (await resp.json()) as StreamDataContent[] } catch (err) { console.warn('error loading stream data', err) } @@ -65,33 +69,50 @@ export async function* markDataSource(dataSource: DataSource, name: string) { } } -export async function* combineDataSources(dataSources: DataSource[]) { +export async function* combineDataSources( + dataSources: DataSource[], + idGen: StreamIDGenerator, +) { for await (const streamLists of Repeater.latest(dataSources)) { const dataByURL = new Map() for (const list of streamLists) { for (const data of list) { const existing = dataByURL.get(data.link) - dataByURL.set(data.link, { ...existing, ...data }) + dataByURL.set(data.link, { ...existing, ...data } as StreamData) } } - const streams: StreamList = [...dataByURL.values()] + + const streams = idGen.process([...dataByURL.values()]) as StreamList + // Retain the index to speed up local lookups streams.byURL = dataByURL yield streams } } -export class LocalStreamData extends EventEmitter { - dataByURL: Map> +interface LocalStreamDataEvents { + update: [StreamDataContent[]] +} - constructor() { +export class LocalStreamData extends EventEmitter { + dataByURL: Map + + constructor(entries: StreamDataContent[] = []) { super() this.dataByURL = new Map() + for (const entry of entries) { + if (!entry.link) { + continue + } + this.dataByURL.set(entry.link, entry) + } } - update(url: string, data: Partial) { + update(url: string, data: Partial) { const existing = this.dataByURL.get(url) - this.dataByURL.set(data.link ?? url, { ...existing, ...data, link: url }) + const kind = data.kind ?? existing?.kind ?? 'video' + const updated: StreamDataContent = { ...existing, ...data, kind, link: url } + this.dataByURL.set(data.link ?? url, updated) if (data.link != null && url !== data.link) { this.dataByURL.delete(url) } @@ -107,9 +128,9 @@ export class LocalStreamData extends EventEmitter { this.emit('update', [...this.dataByURL.values()]) } - gen(): AsyncGenerator { + gen(): AsyncGenerator { return new Repeater(async (push, stop) => { - await push([]) + await push([...this.dataByURL.values()]) this.on('update', push) await stop this.off('update', push) @@ -126,7 +147,7 @@ export class StreamIDGenerator { this.idSet = new Set() } - process(streams: StreamData[]) { + process(streams: StreamDataContent[]) { const { idMap, idSet } = this for (const stream of streams) { diff --git a/packages/streamwall/src/main/index.ts b/packages/streamwall/src/main/index.ts index c4e37fe..49cd42d 100644 --- a/packages/streamwall/src/main/index.ts +++ b/packages/streamwall/src/main/index.ts @@ -250,9 +250,20 @@ async function main(argv: ReturnType) { callback(false) }) + const db = await loadStorage( + join(app.getPath('userData'), 'streamwall-storage.json'), + ) + console.debug('Creating StreamWindow...') const idGen = new StreamIDGenerator() - const localStreamData = new LocalStreamData() + + const localStreamData = new LocalStreamData(db.data.localStreamData) + localStreamData.on('update', (entries) => { + db.update((data) => { + data.localStreamData = entries + }) + }) + const overlayStreamData = new LocalStreamData() const streamWindowConfig = { @@ -306,9 +317,6 @@ async function main(argv: ReturnType) { const stateDoc = new Y.Doc() const viewsState = stateDoc.getMap>('views') - const db = await loadStorage( - join(app.getPath('userData'), 'streamwall-storage.json'), - ) if (db.data.stateDoc) { console.log('Loading stateDoc from storage...') try { @@ -531,11 +539,10 @@ async function main(argv: ReturnType) { return markDataSource(watchDataFile(path), 'toml-file') }), markDataSource(localStreamData.gen(), 'custom'), - overlayStreamData.gen(), + markDataSource(overlayStreamData.gen(), 'overlay'), ] - for await (const rawStreams of combineDataSources(dataSources)) { - const streams = idGen.process(rawStreams) + for await (const streams of combineDataSources(dataSources, idGen)) { updateState({ streams }) updateViewsFromStateDoc() } diff --git a/packages/streamwall/src/main/storage.ts b/packages/streamwall/src/main/storage.ts index 004baa0..46a4a0d 100644 --- a/packages/streamwall/src/main/storage.ts +++ b/packages/streamwall/src/main/storage.ts @@ -1,17 +1,32 @@ -import type { Low } from 'lowdb' +import { Low, Memory } from 'lowdb' import { JSONFilePreset } from 'lowdb/node' +import { StreamDataContent } from 'streamwall-shared' export interface StreamwallStoredData { stateDoc: string + localStreamData: StreamDataContent[] } const defaultData: StreamwallStoredData = { stateDoc: '', + localStreamData: [], } export type StorageDB = Low export async function loadStorage(dbPath: string) { - const db = await JSONFilePreset(dbPath, defaultData) + let db: StorageDB | undefined = undefined + + try { + db = await JSONFilePreset(dbPath, defaultData) + } catch (err) { + console.warn( + 'Failed to load storage at', + dbPath, + ' -- changes will not be persisted', + ) + db = new Low(new Memory(), defaultData) + } + return db }