Overflow mode when server exceeds Discord's ratelimiting. Will compact many messages into one embed with timestamps
This commit is contained in:
@@ -22,6 +22,9 @@ import (
|
||||
const (
|
||||
MaxAttachments = 4
|
||||
ProcessedCacheSize = 250
|
||||
OverflowThreshold = 10
|
||||
OverflowRecoveryThreshold = 5
|
||||
OverflowBatchSize = 15
|
||||
MappingCacheSize = 1000
|
||||
MappingMaxAge = 1 * time.Hour
|
||||
MappingCleanupInterval = 5 * time.Minute
|
||||
@@ -73,8 +76,10 @@ type Bridge struct {
|
||||
outageMessagesMu sync.Mutex
|
||||
outageStart time.Time
|
||||
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
msgQueue chan map[string]interface{}
|
||||
overflow bool
|
||||
}
|
||||
|
||||
func NewBridge(cfg *config.Config, sneedClient *sneed.Client) (*Bridge, error) {
|
||||
@@ -98,10 +103,11 @@ func NewBridge(cfg *config.Config, sneedClient *sneed.Client) (*Bridge, error) {
|
||||
queuedOutbound: make([]QueuedMessage, 0),
|
||||
outageNotices: make([]*discordgo.Message, 0),
|
||||
stopCh: make(chan struct{}),
|
||||
msgQueue: make(chan map[string]interface{}, 500),
|
||||
}
|
||||
|
||||
// hook Sneed client callbacks
|
||||
sneedClient.OnMessage = b.onSneedMessage
|
||||
sneedClient.OnMessage = b.enqueueSneedMessage
|
||||
sneedClient.OnEdit = b.handleSneedEdit
|
||||
sneedClient.OnDelete = b.handleSneedDelete
|
||||
sneedClient.OnConnect = b.onSneedConnect
|
||||
@@ -124,8 +130,10 @@ func (b *Bridge) Start() error {
|
||||
if err := b.session.Open(); err != nil {
|
||||
return err
|
||||
}
|
||||
b.wg.Add(1)
|
||||
b.wg.Add(3)
|
||||
go b.cleanupLoop()
|
||||
go b.messageWorker()
|
||||
go b.messageWorker()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -315,6 +323,91 @@ func (b *Bridge) onDiscordMessageDelete(s *discordgo.Session, m *discordgo.Messa
|
||||
b.sneed.Send(fmt.Sprintf("/delete %s", sneedUUID))
|
||||
}
|
||||
|
||||
func (b *Bridge) messageWorker() {
|
||||
defer b.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-b.stopCh:
|
||||
return
|
||||
case msg, ok := <-b.msgQueue:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
depth := len(b.msgQueue)
|
||||
if !b.overflow && depth >= OverflowThreshold {
|
||||
b.overflow = true
|
||||
log.Printf("⚠️ Message queue overflow (%d pending), switching to batch mode", depth)
|
||||
} else if b.overflow && depth <= OverflowRecoveryThreshold {
|
||||
b.overflow = false
|
||||
log.Printf("✅ Message queue recovered (%d pending), switching to normal mode", depth)
|
||||
}
|
||||
if b.overflow {
|
||||
batch := []map[string]interface{}{msg}
|
||||
for len(batch) < OverflowBatchSize {
|
||||
select {
|
||||
case next, ok := <-b.msgQueue:
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
batch = append(batch, next)
|
||||
default:
|
||||
goto flushBatch
|
||||
}
|
||||
}
|
||||
flushBatch:
|
||||
b.sendOverflowBatch(batch)
|
||||
} else {
|
||||
b.onSneedMessage(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bridge) sendOverflowBatch(batch []map[string]interface{}) {
|
||||
if len(batch) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var description strings.Builder
|
||||
for _, msg := range batch {
|
||||
username, _ := msg["username"].(string)
|
||||
rawContent, _ := msg["content"].(string)
|
||||
content := utils.BBCodeToMarkdown(rawContent)
|
||||
content = sneed.ReplaceBridgeMention(content, b.cfg.BridgeUsername, b.cfg.DiscordPingUserID)
|
||||
|
||||
var ts string
|
||||
if raw, ok := msg["raw"].(sneed.SneedMessage); ok && raw.MessageDate > 0 {
|
||||
ts = time.Unix(int64(raw.MessageDate), 0).Format("3:04:05 PM")
|
||||
}
|
||||
|
||||
description.WriteString(fmt.Sprintf("**%s** %s\n%s\n\n", username, ts, content))
|
||||
}
|
||||
|
||||
webhookID, webhookToken := parseWebhookURL(b.cfg.DiscordWebhookURL)
|
||||
embed := &discordgo.MessageEmbed{
|
||||
Title: "overflow mode",
|
||||
Description: strings.TrimSpace(description.String()),
|
||||
Color: OutageEmbedColorActive,
|
||||
}
|
||||
params := &discordgo.WebhookParams{
|
||||
Embeds: []*discordgo.MessageEmbed{embed},
|
||||
}
|
||||
_, err := b.session.WebhookExecute(webhookID, webhookToken, false, params)
|
||||
if err != nil {
|
||||
log.Printf("❌ Failed to send overflow batch: %v", err)
|
||||
return
|
||||
}
|
||||
log.Printf("📦 Sent overflow batch of %d messages", len(batch))
|
||||
}
|
||||
|
||||
func (b *Bridge) enqueueSneedMessage(msg map[string]interface{}) {
|
||||
select {
|
||||
case b.msgQueue <- msg:
|
||||
default:
|
||||
log.Printf("⚠️ Message queue full, dropping message from %s", msg["username"])
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bridge) onSneedMessage(msg map[string]interface{}) {
|
||||
username, _ := msg["username"].(string)
|
||||
rawContent, _ := msg["content"].(string)
|
||||
|
||||
@@ -5,6 +5,7 @@ type SneedMessage struct {
|
||||
MessageUUID string `json:"message_uuid"`
|
||||
Message string `json:"message"`
|
||||
MessageRaw string `json:"message_raw"`
|
||||
MessageDate int `json:"message_date"`
|
||||
MessageEditDate int `json:"message_edit_date"`
|
||||
Author map[string]interface{} `json:"author"`
|
||||
Deleted bool `json:"deleted"`
|
||||
|
||||
Reference in New Issue
Block a user