package sneed import ( "encoding/json" "fmt" "html" "log" "net/http" "regexp" "sync" "time" "github.com/gorilla/websocket" "local/sneedchatbridge/cookie" "local/sneedchatbridge/utils" ) const ( ProcessedCacheSize = 1000 // Increased from 250 ReconnectInterval = 7 * time.Second MappingCacheSize = 1000 MappingCleanupInterval = 5 * time.Minute MappingMaxAge = 1 * time.Hour OutboundMatchWindow = 60 * time.Second PingIdleThreshold = 60 * time.Second StaleRejoinThreshold = 90 * time.Second StaleReconnectThreshold = 3 * time.Minute RejoinCooldown = 30 * time.Second ) type Client struct { wsURL string roomID int cookies *cookie.CookieRefreshService conn *websocket.Conn connected bool mu sync.RWMutex lastMessage time.Time lastJoinAttempt time.Time stopCh chan struct{} wg sync.WaitGroup processedMu sync.Mutex processedMessageIDs []int messageEditDates *utils.BoundedMap OnMessage func(map[string]interface{}) OnEdit func(int, string) OnDelete func(int) OnConnect func() OnDisconnect func() recentOutboundIter func() []map[string]interface{} mapDiscordSneed func(int, int, string) bridgeUserID int bridgeUsername string baseLoopsStarted bool } func NewClient(roomID int, cookieSvc *cookie.CookieRefreshService) *Client { return &Client{ wsURL: "wss://kiwifarms.st:9443/chat.ws", roomID: roomID, cookies: cookieSvc, stopCh: make(chan struct{}), processedMessageIDs: make([]int, 0, ProcessedCacheSize), messageEditDates: utils.NewBoundedMap(MappingCacheSize, MappingMaxAge), lastMessage: time.Now(), } } func (c *Client) SetBridgeIdentity(userID int, username string) { c.bridgeUserID = userID c.bridgeUsername = username } func (c *Client) Connect() error { c.mu.Lock() if c.connected { c.mu.Unlock() return nil } c.mu.Unlock() headers := http.Header{} if ck := c.cookies.GetCurrentCookie(); ck != "" { headers.Add("Cookie", ck) } log.Printf("Connecting to Sneedchat room %d", c.roomID) conn, _, err := websocket.DefaultDialer.Dial(c.wsURL, headers) if err != nil { return fmt.Errorf("websocket connection failed: %w", err) } c.mu.Lock() c.conn = conn c.connected = true c.lastMessage = time.Now() c.mu.Unlock() if !c.baseLoopsStarted { c.baseLoopsStarted = true c.wg.Add(2) go c.heartbeatLoop() go c.cleanupLoop() } c.wg.Add(1) go c.readLoop() c.joinRoom() log.Printf("✅ Successfully connected to Sneedchat room %d", c.roomID) if c.OnConnect != nil { c.OnConnect() } return nil } func (c *Client) joinRoom() bool { sent := c.Send(fmt.Sprintf("/join %d", c.roomID)) if sent { c.mu.Lock() c.lastJoinAttempt = time.Now() c.mu.Unlock() } return sent } func (c *Client) readLoop() { defer c.wg.Done() for { select { case <-c.stopCh: return default: } c.mu.RLock() conn := c.conn c.mu.RUnlock() if conn == nil { return } _, message, err := conn.ReadMessage() if err != nil { log.Printf("Sneedchat read error: %v", err) c.handleDisconnect() return } c.lastMessage = time.Now() c.handleIncoming(string(message)) } } func (c *Client) heartbeatLoop() { defer c.wg.Done() t := time.NewTicker(15 * time.Second) defer t.Stop() for { select { case <-t.C: c.mu.RLock() connected := c.connected conn := c.conn c.mu.RUnlock() if !connected || conn == nil { continue } silence := time.Since(c.lastMessage) if silence > PingIdleThreshold { _ = conn.WriteMessage(websocket.TextMessage, []byte("/ping")) } c.handleStaleState(silence) case <-c.stopCh: return } } } func (c *Client) handleStaleState(silence time.Duration) { if silence < StaleRejoinThreshold { return } if silence >= StaleReconnectThreshold { log.Printf("⚠️ No Sneedchat messages for %s; recycling websocket", silence.Round(time.Second)) c.handleDisconnect() return } c.mu.RLock() lastJoin := c.lastJoinAttempt c.mu.RUnlock() if time.Since(lastJoin) < RejoinCooldown { return } if c.joinRoom() { log.Printf("⚠️ Sneedchat feed silent for %s, reasserted /join %d", silence.Round(time.Second), c.roomID) } else { log.Printf("⚠️ Sneedchat feed silent for %s but websocket not writable; waiting", silence.Round(time.Second)) } } func (c *Client) cleanupLoop() { defer c.wg.Done() t := time.NewTicker(MappingCleanupInterval) defer t.Stop() for { select { case <-t.C: removed := c.messageEditDates.CleanupOldEntries() if removed > 0 { log.Printf("🧹 Cleaned up %d old message tracking entries", removed) } case <-c.stopCh: return } } } func (c *Client) Send(s string) bool { c.mu.RLock() conn := c.conn ok := c.connected && conn != nil c.mu.RUnlock() if !ok { return false } if err := conn.WriteMessage(websocket.TextMessage, []byte(s)); err != nil { log.Printf("Sneedchat write error: %v", err) return false } return true } func (c *Client) handleDisconnect() { c.mu.RLock() alreadyDisconnected := !c.connected c.mu.RUnlock() if alreadyDisconnected { return } select { case <-c.stopCh: return default: } c.mu.Lock() c.connected = false if c.conn != nil { c.conn.Close() c.conn = nil } c.mu.Unlock() log.Println("🔴 Sneedchat disconnected") if c.OnDisconnect != nil { c.OnDisconnect() } // Reconnection loop with exponential backoff delay := ReconnectInterval maxDelay := 2 * time.Minute attempt := 0 for { select { case <-c.stopCh: log.Println("Reconnection cancelled - bridge stopping") return case <-time.After(delay): attempt++ log.Printf("🔄 Reconnection attempt #%d...", attempt) if err := c.Connect(); err != nil { log.Printf("⚠️ Reconnect attempt #%d failed: %v", attempt, err) // Exponential backoff delay *= 2 if delay > maxDelay { delay = maxDelay } continue } log.Println("🟢 Reconnected successfully") // Allow websocket to stabilize time.Sleep(2 * time.Second) // Re-join room c.joinRoom() c.Send("/ping") log.Printf("📍 Rejoined Sneedchat room %d after reconnect", c.roomID) return } } } func (c *Client) Disconnect() { close(c.stopCh) c.mu.Lock() if c.conn != nil { c.conn.Close() } c.connected = false c.mu.Unlock() c.wg.Wait() } func (c *Client) handleIncoming(raw string) { var payload SneedPayload if err := json.Unmarshal([]byte(raw), &payload); err != nil { return } if payload.Delete != nil { var ids []int switch v := payload.Delete.(type) { case float64: ids = []int{int(v)} case []interface{}: for _, x := range v { if fid, ok := x.(float64); ok { ids = append(ids, int(fid)) } } } for _, id := range ids { c.messageEditDates.Delete(id) c.removeFromProcessed(id) if c.OnDelete != nil { c.OnDelete(id) } } } var messages []SneedMessage if len(payload.Messages) > 0 { messages = payload.Messages } else if payload.Message != nil { messages = []SneedMessage{*payload.Message} } for _, m := range messages { c.processMessage(m) } } func (c *Client) processMessage(m SneedMessage) { username := "Unknown" var userID int if a, ok := m.Author["username"].(string); ok { username = a } if id, ok := m.Author["id"].(float64); ok { userID = int(id) } messageText := m.MessageRaw if messageText == "" { messageText = m.Message } messageText = html.UnescapeString(messageText) editDate := m.MessageEditDate deleted := m.Deleted || m.IsDeleted if deleted { c.messageEditDates.Delete(m.MessageID) c.removeFromProcessed(m.MessageID) if c.OnDelete != nil { c.OnDelete(m.MessageID) } return } if (c.bridgeUserID > 0 && userID == c.bridgeUserID) || (c.bridgeUsername != "" && username == c.bridgeUsername) { if m.MessageID > 0 && c.recentOutboundIter != nil && c.mapDiscordSneed != nil { now := time.Now() for _, entry := range c.recentOutboundIter() { if mapped, ok := entry["mapped"].(bool); ok && mapped { continue } content, _ := entry["content"].(string) if ts, ok := entry["ts"].(time.Time); ok { if content == messageText && now.Sub(ts) <= OutboundMatchWindow { if discordID, ok := entry["discord_id"].(int); ok { c.mapDiscordSneed(discordID, m.MessageID, username) entry["mapped"] = true break } } } } } c.addToProcessed(m.MessageID) c.messageEditDates.Set(m.MessageID, editDate) return } if c.isProcessed(m.MessageID) { if prev, exists := c.messageEditDates.Get(m.MessageID); exists { if editDate > prev.(int) { c.messageEditDates.Set(m.MessageID, editDate) if c.OnEdit != nil { c.OnEdit(m.MessageID, messageText) } } } return } c.addToProcessed(m.MessageID) c.messageEditDates.Set(m.MessageID, editDate) if c.OnMessage != nil { c.OnMessage(map[string]interface{}{ "username": username, "content": messageText, "message_id": m.MessageID, "author_id": userID, "raw": m, }) } } func (c *Client) isProcessed(id int) bool { c.processedMu.Lock() defer c.processedMu.Unlock() for _, x := range c.processedMessageIDs { if x == id { return true } } return false } func (c *Client) addToProcessed(id int) { c.processedMu.Lock() defer c.processedMu.Unlock() c.processedMessageIDs = append(c.processedMessageIDs, id) // Hard cap: keep only the most recent 1000 messages (FIFO) if len(c.processedMessageIDs) > ProcessedCacheSize { excess := len(c.processedMessageIDs) - ProcessedCacheSize c.processedMessageIDs = c.processedMessageIDs[excess:] // Log when significant eviction happens if excess > 50 { log.Printf("⚠️ Processed message cache full, evicted %d old entries", excess) } } } func (c *Client) removeFromProcessed(id int) { c.processedMu.Lock() defer c.processedMu.Unlock() for i, x := range c.processedMessageIDs { if x == id { c.processedMessageIDs = append(c.processedMessageIDs[:i], c.processedMessageIDs[i+1:]...) return } } } func (c *Client) SetOutboundIter(f func() []map[string]interface{}) { c.recentOutboundIter = f } func (c *Client) SetMapDiscordSneed(f func(int, int, string)) { c.mapDiscordSneed = f } func ReplaceBridgeMention(content, bridgeUsername, pingID string) string { if bridgeUsername == "" || pingID == "" { return content } pat := regexp.MustCompile(fmt.Sprintf(`(?i)@%s(?:\W|$)`, regexp.QuoteMeta(bridgeUsername))) return pat.ReplaceAllString(content, fmt.Sprintf("<@%s>", pingID)) }