Files
Sneedchat-Matrix-Bridge-Go/sneed/client.go
2025-11-18 02:07:08 -05:00

489 lines
10 KiB
Go

package sneed
import (
"encoding/json"
"fmt"
"html"
"log"
"net/http"
"regexp"
"sync"
"time"
"github.com/gorilla/websocket"
"local/sneedchatbridge/cookie"
"local/sneedchatbridge/utils"
)
const (
ProcessedCacheSize = 1000
ReconnectInterval = 7 * time.Second
MappingCacheSize = 1000
MappingCleanupInterval = 5 * time.Minute
MappingMaxAge = 1 * time.Hour
OutboundMatchWindow = 60 * time.Second
)
// ------------------------------------------------------------
// Client structure
// ------------------------------------------------------------
type Client struct {
wsURL string
roomID int
cookies *cookie.CookieRefreshService
conn *websocket.Conn
connected bool
mu sync.RWMutex
lastMessage time.Time
stopCh chan struct{}
wg sync.WaitGroup
processedMu sync.Mutex
processedMessageIDs []int
messageEditDates *utils.BoundedMap
// APP-SERVICE CALLBACKS (Discord removed)
OnMessage func(msgID int, userID int, username string, content string)
OnEdit func(msgID int, userID int, newText string)
OnDelete func(msgID int, userID int)
OnConnect func()
OnDisconnect func()
recentOutboundIter func() []map[string]interface{}
bridgeUserID int
bridgeUsername string
baseLoopsStarted bool
}
// ------------------------------------------------------------
// Constructor
// ------------------------------------------------------------
func NewClient(roomID int, cookieSvc *cookie.CookieRefreshService) *Client {
return &Client{
wsURL: "wss://kiwifarms.st:9443/chat.ws",
roomID: roomID,
cookies: cookieSvc,
stopCh: make(chan struct{}),
processedMessageIDs: make([]int, 0, ProcessedCacheSize),
messageEditDates: utils.NewBoundedMap(MappingCacheSize, MappingMaxAge),
lastMessage: time.Now(),
}
}
func (c *Client) SetBridgeIdentity(userID int, username string) {
c.bridgeUserID = userID
c.bridgeUsername = username
}
func (c *Client) SetOutboundIter(f func() []map[string]interface{}) {
c.recentOutboundIter = f
}
// ------------------------------------------------------------
// Connect + Reconnect
// ------------------------------------------------------------
func (c *Client) Connect() error {
c.mu.Lock()
if c.connected {
c.mu.Unlock()
return nil
}
c.mu.Unlock()
headers := http.Header{}
if ck := c.cookies.GetCurrentCookie(); ck != "" {
headers.Add("Cookie", ck)
}
log.Printf("Connecting to Sneedchat room %d", c.roomID)
conn, _, err := websocket.DefaultDialer.Dial(c.wsURL, headers)
if err != nil {
return fmt.Errorf("websocket connection failed: %w", err)
}
c.mu.Lock()
c.conn = conn
c.connected = true
c.lastMessage = time.Now()
c.mu.Unlock()
if !c.baseLoopsStarted {
c.baseLoopsStarted = true
c.wg.Add(2)
go c.heartbeatLoop()
go c.cleanupLoop()
}
c.wg.Add(1)
go c.readLoop()
c.Send(fmt.Sprintf("/join %d", c.roomID))
log.Printf("✅ Successfully connected to Sneedchat room %d", c.roomID)
if c.OnConnect != nil {
c.OnConnect()
}
return nil
}
func (c *Client) joinRoom() {
c.Send(fmt.Sprintf("/join %d", c.roomID))
}
// ------------------------------------------------------------
// Read loop
// ------------------------------------------------------------
func (c *Client) readLoop() {
defer c.wg.Done()
for {
select {
case <-c.stopCh:
return
default:
}
c.mu.RLock()
conn := c.conn
c.mu.RUnlock()
if conn == nil {
return
}
_, raw, err := conn.ReadMessage()
if err != nil {
log.Printf("Sneedchat read error: %v", err)
c.handleDisconnect()
return
}
c.lastMessage = time.Now()
c.handleIncoming(string(raw))
}
}
// ------------------------------------------------------------
// Heartbeat
// ------------------------------------------------------------
func (c *Client) heartbeatLoop() {
defer c.wg.Done()
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for {
select {
case <-t.C:
c.mu.RLock()
connected := c.connected
conn := c.conn
c.mu.RUnlock()
if connected && time.Since(c.lastMessage) > 60*time.Second && conn != nil {
_ = conn.WriteMessage(websocket.TextMessage, []byte("/ping"))
}
case <-c.stopCh:
return
}
}
}
// ------------------------------------------------------------
// Cleanup loop
// ------------------------------------------------------------
func (c *Client) cleanupLoop() {
defer c.wg.Done()
t := time.NewTicker(MappingCleanupInterval)
defer t.Stop()
for {
select {
case <-t.C:
removed := c.messageEditDates.CleanupOldEntries()
if removed > 0 {
log.Printf("🧹 Cleaned %d old edit tracking entries", removed)
}
case <-c.stopCh:
return
}
}
}
// ------------------------------------------------------------
// Send
// ------------------------------------------------------------
func (c *Client) Send(s string) bool {
c.mu.RLock()
conn := c.conn
ok := c.connected && conn != nil
c.mu.RUnlock()
if !ok {
return false
}
if err := conn.WriteMessage(websocket.TextMessage, []byte(s)); err != nil {
log.Printf("Sneedchat write error: %v", err)
return false
}
return true
}
// ------------------------------------------------------------
// Disconnect + Reconnect
// ------------------------------------------------------------
func (c *Client) handleDisconnect() {
select {
case <-c.stopCh:
return
default:
}
c.mu.Lock()
c.connected = false
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.mu.Unlock()
log.Println("🔴 Sneedchat disconnected")
if c.OnDisconnect != nil {
c.OnDisconnect()
}
delay := ReconnectInterval
maxDelay := 2 * time.Minute
attempt := 0
for {
select {
case <-c.stopCh:
log.Println("Reconnection cancelled - bridge stopping")
return
case <-time.After(delay):
attempt++
log.Printf("🔄 Reconnection attempt #%d...", attempt)
if err := c.Connect(); err != nil {
log.Printf("⚠️ Reconnect attempt #%d failed: %v", attempt, err)
delay *= 2
if delay > maxDelay {
delay = maxDelay
}
continue
}
log.Println("🟢 Reconnected successfully")
time.Sleep(2 * time.Second)
c.joinRoom()
c.Send("/ping")
log.Printf("📍 Rejoined Sneedchat room %d after reconnect", c.roomID)
return
}
}
}
func (c *Client) Disconnect() {
close(c.stopCh)
c.mu.Lock()
if c.conn != nil {
c.conn.Close()
}
c.connected = false
c.mu.Unlock()
c.wg.Wait()
}
// ------------------------------------------------------------
// Incoming Message Parsing (Siropu Format)
// ------------------------------------------------------------
func (c *Client) handleIncoming(raw string) {
var payload SneedPayload
if err := json.Unmarshal([]byte(raw), &payload); err != nil {
return
}
// Handle Delete
if payload.Delete != nil {
var ids []int
switch v := payload.Delete.(type) {
case float64:
ids = []int{int(v)}
case []interface{}:
for _, x := range v {
if fid, ok := x.(float64); ok {
ids = append(ids, int(fid))
}
}
}
for _, id := range ids {
c.messageEditDates.Delete(id)
c.removeFromProcessed(id)
if c.OnDelete != nil {
c.OnDelete(id, 0) // userID unknown for batch deletes
}
}
}
// Messages
var msgs []SneedMessage
if len(payload.Messages) > 0 {
msgs = payload.Messages
} else if payload.Message != nil {
msgs = []SneedMessage{*payload.Message}
}
for _, m := range msgs {
c.processMessage(m)
}
}
// ------------------------------------------------------------
// Process a Sneedchat message
// ------------------------------------------------------------
func (c *Client) processMessage(m SneedMessage) {
username := "Unknown"
userID := 0
if a, ok := m.Author["username"].(string); ok {
username = a
}
if id, ok := m.Author["id"].(float64); ok {
userID = int(id)
}
msg := m.MessageRaw
if msg == "" {
msg = m.Message
}
msg = html.UnescapeString(msg)
editDate := m.MessageEditDate
deleted := m.Deleted || m.IsDeleted
// Delete
if deleted {
c.messageEditDates.Delete(m.MessageID)
c.removeFromProcessed(m.MessageID)
if c.OnDelete != nil {
c.OnDelete(m.MessageID, userID)
}
return
}
// Bridge echo suppression (Matrix only now; Discord logic removed)
if (c.bridgeUserID > 0 && userID == c.bridgeUserID) ||
(c.bridgeUsername != "" && username == c.bridgeUsername) {
c.addToProcessed(m.MessageID)
c.messageEditDates.Set(m.MessageID, editDate)
return
}
// Edits
if c.isProcessed(m.MessageID) {
if prev, exists := c.messageEditDates.Get(m.MessageID); exists {
if editDate > prev.(int) {
c.messageEditDates.Set(m.MessageID, editDate)
if c.OnEdit != nil {
c.OnEdit(m.MessageID, userID, msg)
}
}
}
return
}
// Fresh message
c.addToProcessed(m.MessageID)
c.messageEditDates.Set(m.MessageID, editDate)
if c.OnMessage != nil {
c.OnMessage(
m.MessageID,
userID,
username,
msg,
)
}
}
// ------------------------------------------------------------
// Message Processed Cache
// ------------------------------------------------------------
func (c *Client) isProcessed(id int) bool {
c.processedMu.Lock()
defer c.processedMu.Unlock()
for _, x := range c.processedMessageIDs {
if x == id {
return true
}
}
return false
}
func (c *Client) addToProcessed(id int) {
c.processedMu.Lock()
defer c.processedMu.Unlock()
c.processedMessageIDs = append(c.processedMessageIDs, id)
if len(c.processedMessageIDs) > ProcessedCacheSize {
excess := len(c.processedMessageIDs) - ProcessedCacheSize
c.processedMessageIDs = c.processedMessageIDs[excess:]
if excess > 50 {
log.Printf("⚠️ Processed message cache full, evicted %d entries", excess)
}
}
}
func (c *Client) removeFromProcessed(id int) {
c.processedMu.Lock()
defer c.processedMu.Unlock()
for i, x := range c.processedMessageIDs {
if x == id {
c.processedMessageIDs = append(
c.processedMessageIDs[:i],
c.processedMessageIDs[i+1:]...,
)
return
}
}
}
// ------------------------------------------------------------
// Utility
// ------------------------------------------------------------
func ReplaceBridgeMention(content, bridgeUsername, pingID string) string {
if bridgeUsername == "" || pingID == "" {
return content
}
pat := regexp.MustCompile(fmt.Sprintf(`(?i)@%s(?:\W|$)`, regexp.QuoteMeta(bridgeUsername)))
return pat.ReplaceAllString(content, fmt.Sprintf("<@%s>", pingID))
}