Improved reconnect logic to handle 1006 websocket errors
This commit is contained in:
@@ -37,25 +37,22 @@ type Client struct {
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
// processed
|
||||
processedMu sync.Mutex
|
||||
processedMessageIDs []int
|
||||
messageEditDates *utils.BoundedMap
|
||||
|
||||
messageEditDates *utils.BoundedMap
|
||||
|
||||
// event callbacks
|
||||
OnMessage func(map[string]interface{})
|
||||
OnEdit func(int, string)
|
||||
OnDelete func(int)
|
||||
OnConnect func()
|
||||
OnDisconnect func()
|
||||
|
||||
// outbound correlation for echo suppression / mapping
|
||||
recentOutboundIter func() []map[string]interface{}
|
||||
mapDiscordSneed func(int, int, string)
|
||||
|
||||
bridgeUserID int
|
||||
bridgeUsername string
|
||||
bridgeUserID int
|
||||
bridgeUsername string
|
||||
baseLoopsStarted bool
|
||||
}
|
||||
|
||||
func NewClient(roomID int, cookieSvc *cookie.CookieRefreshService) *Client {
|
||||
@@ -100,11 +97,17 @@ func (c *Client) Connect() error {
|
||||
c.lastMessage = time.Now()
|
||||
c.mu.Unlock()
|
||||
|
||||
c.wg.Add(3)
|
||||
go c.readLoop()
|
||||
go c.heartbeatLoop()
|
||||
go c.joinRoom()
|
||||
if !c.baseLoopsStarted {
|
||||
c.baseLoopsStarted = true
|
||||
c.wg.Add(2)
|
||||
go c.heartbeatLoop()
|
||||
go c.cleanupLoop()
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
go c.readLoop()
|
||||
|
||||
c.Send(fmt.Sprintf("/join %d", c.roomID))
|
||||
log.Printf("✅ Successfully connected to Sneedchat room %d", c.roomID)
|
||||
if c.OnConnect != nil {
|
||||
c.OnConnect()
|
||||
@@ -113,14 +116,11 @@ func (c *Client) Connect() error {
|
||||
}
|
||||
|
||||
func (c *Client) joinRoom() {
|
||||
defer c.wg.Done()
|
||||
c.Send(fmt.Sprintf("/join %d", c.roomID))
|
||||
}
|
||||
|
||||
func (c *Client) readLoop() {
|
||||
defer c.wg.Done()
|
||||
defer c.handleDisconnect()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.stopCh:
|
||||
@@ -138,6 +138,7 @@ func (c *Client) readLoop() {
|
||||
_, message, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
log.Printf("Sneedchat read error: %v", err)
|
||||
c.handleDisconnect()
|
||||
return
|
||||
}
|
||||
c.lastMessage = time.Now()
|
||||
@@ -165,6 +166,23 @@ func (c *Client) heartbeatLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) cleanupLoop() {
|
||||
defer c.wg.Done()
|
||||
t := time.NewTicker(MappingCleanupInterval)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
removed := c.messageEditDates.CleanupOldEntries()
|
||||
if removed > 0 {
|
||||
log.Printf("🧹 Cleaned up %d old message tracking entries", removed)
|
||||
}
|
||||
case <-c.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) Send(s string) bool {
|
||||
c.mu.RLock()
|
||||
conn := c.conn
|
||||
@@ -186,18 +204,29 @@ func (c *Client) handleDisconnect() {
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.connected = false
|
||||
if c.conn != nil {
|
||||
c.conn.Close()
|
||||
c.conn = nil
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
log.Println("🔴 Sneedchat disconnected")
|
||||
if c.OnDisconnect != nil {
|
||||
c.OnDisconnect()
|
||||
}
|
||||
|
||||
time.Sleep(ReconnectInterval)
|
||||
_ = c.Connect()
|
||||
|
||||
if err := c.Connect(); err != nil {
|
||||
log.Printf("Reconnect attempt failed: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Println("🟢 Reconnected successfully")
|
||||
c.joinRoom()
|
||||
}
|
||||
|
||||
func (c *Client) Disconnect() {
|
||||
@@ -217,7 +246,6 @@ func (c *Client) handleIncoming(raw string) {
|
||||
return
|
||||
}
|
||||
|
||||
// top-level deletes
|
||||
if payload.Delete != nil {
|
||||
var ids []int
|
||||
switch v := payload.Delete.(type) {
|
||||
@@ -239,7 +267,6 @@ func (c *Client) handleIncoming(raw string) {
|
||||
}
|
||||
}
|
||||
|
||||
// messages list or single
|
||||
var messages []SneedMessage
|
||||
if len(payload.Messages) > 0 {
|
||||
messages = payload.Messages
|
||||
@@ -278,10 +305,8 @@ func (c *Client) processMessage(m SneedMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
// suppress bridge echoes
|
||||
if (c.bridgeUserID > 0 && userID == c.bridgeUserID) ||
|
||||
(c.bridgeUsername != "" && username == c.bridgeUsername) {
|
||||
// correlate outbound -> map IDs
|
||||
if m.MessageID > 0 && c.recentOutboundIter != nil && c.mapDiscordSneed != nil {
|
||||
now := time.Now()
|
||||
for _, entry := range c.recentOutboundIter() {
|
||||
@@ -305,7 +330,6 @@ func (c *Client) processMessage(m SneedMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
// de-dup / edits
|
||||
if c.isProcessed(m.MessageID) {
|
||||
if prev, exists := c.messageEditDates.Get(m.MessageID); exists {
|
||||
if editDate > prev.(int) {
|
||||
@@ -318,7 +342,6 @@ func (c *Client) processMessage(m SneedMessage) {
|
||||
return
|
||||
}
|
||||
|
||||
// new message
|
||||
c.addToProcessed(m.MessageID)
|
||||
c.messageEditDates.Set(m.MessageID, editDate)
|
||||
|
||||
@@ -364,19 +387,18 @@ func (c *Client) removeFromProcessed(id int) {
|
||||
}
|
||||
}
|
||||
|
||||
// helpers for mapping in bridge
|
||||
func (c *Client) SetOutboundIter(f func() []map[string]interface{}) {
|
||||
c.recentOutboundIter = f
|
||||
}
|
||||
|
||||
func (c *Client) SetMapDiscordSneed(f func(int, int, string)) {
|
||||
c.mapDiscordSneed = f
|
||||
}
|
||||
|
||||
// expose helper for mention replacement
|
||||
func ReplaceBridgeMention(content, bridgeUsername, pingID string) string {
|
||||
if bridgeUsername == "" || pingID == "" {
|
||||
return content
|
||||
}
|
||||
pat := regexp.MustCompile(fmt.Sprintf(`(?i)@%s(?:\W|$)`, regexp.QuoteMeta(bridgeUsername)))
|
||||
pat := regexp.MustCompile(fmt.Sprintf(`(?i)@%s(?:\\W|$)`, regexp.QuoteMeta(bridgeUsername)))
|
||||
return pat.ReplaceAllString(content, fmt.Sprintf("<@%s>", pingID))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user