Persist local stream data

This commit is contained in:
Max Goodhart
2025-06-17 01:21:22 +00:00
parent fc42a69b62
commit a36a07d177
5 changed files with 75 additions and 27 deletions

View File

@@ -1297,7 +1297,7 @@ function CustomStreamInput({
return (
<div>
<LazyChangeInput
value={props.label}
value={props.label ?? ''}
onChange={handleChangeLabel}
placeholder="Label (optional)"
/>{' '}

View File

@@ -23,16 +23,21 @@ export interface ContentViewInfo {
export type ContentKind = 'video' | 'audio' | 'web' | 'background' | 'overlay'
export interface StreamData extends ContentDisplayOptions {
export interface StreamDataContent extends ContentDisplayOptions {
kind: ContentKind
link: string
label: string
label?: string
labelPosition?: 'top-left' | 'top-right' | 'bottom-right' | 'bottom-left'
source?: string
notes?: string
status?: string
city?: string
state?: string
_id?: string
_dataSource?: string
}
export interface StreamData extends StreamDataContent {
_id: string
_dataSource: string
}

View File

@@ -6,20 +6,24 @@ import { promises as fsPromises } from 'fs'
import { isArray } from 'lodash-es'
import fetch from 'node-fetch'
import { promisify } from 'util'
import { StreamData, StreamList } from '../../../streamwall-shared/src/types'
import {
StreamData,
StreamDataContent,
StreamList,
} from '../../../streamwall-shared/src/types'
const sleep = promisify(setTimeout)
type DataSource = AsyncGenerator<StreamData[]>
type DataSource = AsyncGenerator<StreamDataContent[]>
export async function* pollDataURL(url: string, intervalSecs: number) {
const refreshInterval = intervalSecs * 1000
let lastData = []
while (true) {
let data: StreamData[] = []
let data: StreamDataContent[] = []
try {
const resp = await fetch(url)
data = (await resp.json()) as StreamData[]
data = (await resp.json()) as StreamDataContent[]
} catch (err) {
console.warn('error loading stream data', err)
}
@@ -65,33 +69,50 @@ export async function* markDataSource(dataSource: DataSource, name: string) {
}
}
export async function* combineDataSources(dataSources: DataSource[]) {
export async function* combineDataSources(
dataSources: DataSource[],
idGen: StreamIDGenerator,
) {
for await (const streamLists of Repeater.latest(dataSources)) {
const dataByURL = new Map<string, StreamData>()
for (const list of streamLists) {
for (const data of list) {
const existing = dataByURL.get(data.link)
dataByURL.set(data.link, { ...existing, ...data })
dataByURL.set(data.link, { ...existing, ...data } as StreamData)
}
}
const streams: StreamList = [...dataByURL.values()]
const streams = idGen.process([...dataByURL.values()]) as StreamList
// Retain the index to speed up local lookups
streams.byURL = dataByURL
yield streams
}
}
export class LocalStreamData extends EventEmitter {
dataByURL: Map<string, Partial<StreamData>>
constructor() {
super()
this.dataByURL = new Map()
interface LocalStreamDataEvents {
update: [StreamDataContent[]]
}
update(url: string, data: Partial<StreamData>) {
export class LocalStreamData extends EventEmitter<LocalStreamDataEvents> {
dataByURL: Map<string, StreamDataContent>
constructor(entries: StreamDataContent[] = []) {
super()
this.dataByURL = new Map()
for (const entry of entries) {
if (!entry.link) {
continue
}
this.dataByURL.set(entry.link, entry)
}
}
update(url: string, data: Partial<StreamDataContent>) {
const existing = this.dataByURL.get(url)
this.dataByURL.set(data.link ?? url, { ...existing, ...data, link: url })
const kind = data.kind ?? existing?.kind ?? 'video'
const updated: StreamDataContent = { ...existing, ...data, kind, link: url }
this.dataByURL.set(data.link ?? url, updated)
if (data.link != null && url !== data.link) {
this.dataByURL.delete(url)
}
@@ -107,9 +128,9 @@ export class LocalStreamData extends EventEmitter {
this.emit('update', [...this.dataByURL.values()])
}
gen(): AsyncGenerator<StreamData[]> {
gen(): AsyncGenerator<StreamDataContent[]> {
return new Repeater(async (push, stop) => {
await push([])
await push([...this.dataByURL.values()])
this.on('update', push)
await stop
this.off('update', push)
@@ -126,7 +147,7 @@ export class StreamIDGenerator {
this.idSet = new Set()
}
process(streams: StreamData[]) {
process(streams: StreamDataContent[]) {
const { idMap, idSet } = this
for (const stream of streams) {

View File

@@ -250,9 +250,20 @@ async function main(argv: ReturnType<typeof parseArgs>) {
callback(false)
})
const db = await loadStorage(
join(app.getPath('userData'), 'streamwall-storage.json'),
)
console.debug('Creating StreamWindow...')
const idGen = new StreamIDGenerator()
const localStreamData = new LocalStreamData()
const localStreamData = new LocalStreamData(db.data.localStreamData)
localStreamData.on('update', (entries) => {
db.update((data) => {
data.localStreamData = entries
})
})
const overlayStreamData = new LocalStreamData()
const streamWindowConfig = {
@@ -306,9 +317,6 @@ async function main(argv: ReturnType<typeof parseArgs>) {
const stateDoc = new Y.Doc()
const viewsState = stateDoc.getMap<Y.Map<string | undefined>>('views')
const db = await loadStorage(
join(app.getPath('userData'), 'streamwall-storage.json'),
)
if (db.data.stateDoc) {
console.log('Loading stateDoc from storage...')
try {
@@ -531,11 +539,10 @@ async function main(argv: ReturnType<typeof parseArgs>) {
return markDataSource(watchDataFile(path), 'toml-file')
}),
markDataSource(localStreamData.gen(), 'custom'),
overlayStreamData.gen(),
markDataSource(overlayStreamData.gen(), 'overlay'),
]
for await (const rawStreams of combineDataSources(dataSources)) {
const streams = idGen.process(rawStreams)
for await (const streams of combineDataSources(dataSources, idGen)) {
updateState({ streams })
updateViewsFromStateDoc()
}

View File

@@ -1,17 +1,32 @@
import type { Low } from 'lowdb'
import { Low, Memory } from 'lowdb'
import { JSONFilePreset } from 'lowdb/node'
import { StreamDataContent } from 'streamwall-shared'
export interface StreamwallStoredData {
stateDoc: string
localStreamData: StreamDataContent[]
}
const defaultData: StreamwallStoredData = {
stateDoc: '',
localStreamData: [],
}
export type StorageDB = Low<StreamwallStoredData>
export async function loadStorage(dbPath: string) {
const db = await JSONFilePreset<StreamwallStoredData>(dbPath, defaultData)
let db: StorageDB | undefined = undefined
try {
db = await JSONFilePreset<StreamwallStoredData>(dbPath, defaultData)
} catch (err) {
console.warn(
'Failed to load storage at',
dbPath,
' -- changes will not be persisted',
)
db = new Low<StreamwallStoredData>(new Memory(), defaultData)
}
return db
}