The Final Solution to the Websocket question? I've gone through like 10 different iterations to try and get ByServer reconnections to work correctly. Now just disabled reconnection altogether and I'm manually disposing and recreating the instance whenever it dies using a watchdog task. So far working great after 12 hours!

This commit is contained in:
barelyprofessional
2024-07-20 10:42:38 +10:00
parent 2574d278a7
commit 8676241fbf
6 changed files with 158 additions and 100 deletions

View File

@@ -7,7 +7,7 @@ using Websocket.Client;
namespace KfChatDotNetKickBot.Services;
public class DiscordService
public class DiscordService : IDisposable
{
private readonly Logger _logger = LogManager.GetCurrentClassLogger();
private WebsocketClient _wsClient;
@@ -63,7 +63,8 @@ public class DiscordService
var client = new WebsocketClient(_wsUri, factory)
{
ReconnectTimeout = TimeSpan.FromSeconds(ReconnectTimeout)
ReconnectTimeout = TimeSpan.FromSeconds(ReconnectTimeout),
IsReconnectionEnabled = false
};
client.ReconnectionHappened.Subscribe(WsReconnection);
@@ -91,12 +92,6 @@ public class DiscordService
_logger.Debug("_wsClient doesn't exist yet, not going to try ping");
continue;
}
if (!IsConnected())
{
_logger.Info("Not connected not going to try send a ping actually");
continue;
}
var heartbeatPacket = JsonSerializer.Serialize(new DiscordPacketWriteModel
{
OpCode = 1,
@@ -114,11 +109,6 @@ public class DiscordService
_logger.Error($"Close Status => {disconnectionInfo.CloseStatus}; Close Status Description => {disconnectionInfo.CloseStatusDescription}");
_logger.Error(disconnectionInfo.Exception);
OnWsDisconnection?.Invoke(this, disconnectionInfo);
if (disconnectionInfo.Type == DisconnectionType.ByServer)
{
_logger.Info("Forcing reconnection as the type was ByServer");
_wsClient.Reconnect().Wait(_cancellationToken);
}
}
private void WsReconnection(ReconnectionInfo reconnectionInfo)
@@ -161,25 +151,16 @@ public class DiscordService
"\"since\":0,\"activities\":[],\"afk\":false},\"compress\":false,\"client_state\":{\"guild_versions\":{}}}}";
_logger.Debug(initPayload);
_wsClient.SendInstant(initPayload).Wait(_cancellationToken);
if (_heartbeatTask != null)
{
_pingCts.Cancel();
while (!_heartbeatTask.IsCompleted)
{
_logger.Debug("Waiting for heartbeat task to die");
Task.Delay(TimeSpan.FromMilliseconds(100), _cancellationToken).Wait(_cancellationToken);
}
_heartbeatTask.Dispose();
}
_heartbeatInterval =
TimeSpan.FromMilliseconds(packet.Data.GetProperty("heartbeat_interval").GetInt32());
if (_heartbeatTask != null) return;
_heartbeatTask = Task.Run(HeartbeatTimer, _cancellationToken);
return;
}
if (packet.OpCode == 11)
{
_logger.Debug("Received heartbeat ack from Discord");
_logger.Info("Received heartbeat ack from Discord");
return;
}
@@ -206,8 +187,8 @@ public class DiscordService
packet.Data.Deserialize<DiscordMessageModel>() ?? throw new InvalidOperationException());
return;
default:
_logger.Info($"{packet.DispatchEvent} was unhandled. JSON follows");
_logger.Info(message.Text);
_logger.Debug($"{packet.DispatchEvent} was unhandled. JSON follows");
_logger.Debug(message.Text);
break;
}
}
@@ -220,6 +201,16 @@ public class DiscordService
_logger.Error("--- End of JSON Payload ---");
}
}
public void Dispose()
{
_logger.Info("Disposing of the Discord service");
_wsClient.Dispose();
_pingCts.Cancel();
_pingCts.Dispose();
_heartbeatTask?.Dispose();
GC.SuppressFinalize(this);
}
}
public class DiscordPacketModel<T>

View File

@@ -1,5 +1,4 @@
using System.Net;
using System.Net.Http.Json;
using System.Net.WebSockets;
using System.Text.Json;
using KfChatDotNetKickBot.Models;
@@ -8,7 +7,7 @@ using Websocket.Client;
namespace KfChatDotNetKickBot.Services;
public class Howlgg
public class Howlgg : IDisposable
{
private Logger _logger = LogManager.GetCurrentClassLogger();
private WebsocketClient _wsClient;
@@ -54,7 +53,8 @@ public class Howlgg
var client = new WebsocketClient(_wsUri, factory)
{
ReconnectTimeout = TimeSpan.FromSeconds(_reconnectTimeout)
ReconnectTimeout = TimeSpan.FromSeconds(_reconnectTimeout),
IsReconnectionEnabled = false
};
client.ReconnectionHappened.Subscribe(WsReconnection);
@@ -90,12 +90,6 @@ public class Howlgg
_logger.Debug("_wsClient doesn't exist yet, not going to try ping");
continue;
}
if (!IsConnected())
{
_logger.Info("Not connected not going to try send a ping actually");
continue;
}
_logger.Debug("Sending Howl.gg ping packet");
await _wsClient.SendInstant("2");
}
@@ -107,11 +101,6 @@ public class Howlgg
_logger.Error($"Close Status => {disconnectionInfo.CloseStatus}; Close Status Description => {disconnectionInfo.CloseStatusDescription}");
_logger.Error(disconnectionInfo.Exception);
OnWsDisconnection?.Invoke(this, disconnectionInfo);
if (disconnectionInfo.Type == DisconnectionType.ByServer)
{
_logger.Info("Forcing reconnection as the type was ByServer");
_wsClient.Reconnect().Wait(_cancellationToken);
}
}
private void WsReconnection(ReconnectionInfo reconnectionInfo)
@@ -143,19 +132,9 @@ public class Howlgg
// Received on initial connection
var packetData = JsonSerializer.Deserialize<JsonElement>(message.Text.TrimStart('0'));
_heartbeatInterval = TimeSpan.FromMilliseconds(packetData.GetProperty("pingInterval").GetInt32());
if (_heartbeatTask != null)
{
_pingCts.Cancel();
while (!_heartbeatTask.IsCompleted)
{
_logger.Debug("Waiting for heartbeat task to die");
Task.Delay(TimeSpan.FromMilliseconds(100), _cancellationToken).Wait(_cancellationToken);
}
_heartbeatTask.Dispose();
}
_heartbeatTask = Task.Run(HeartbeatTimer, _cancellationToken);
_logger.Info("Received connection packet from Howl.gg. Setting up heartbeat timer");
if (_heartbeatTask != null) return;
_heartbeatTask = Task.Run(HeartbeatTimer, _cancellationToken);
return;
}
@@ -185,4 +164,13 @@ public class Howlgg
_logger.Error("--- End of Payload ---");
}
}
public void Dispose()
{
_wsClient.Dispose();
_pingCts.Cancel();
_pingCts.Dispose();
_heartbeatTask?.Dispose();
GC.SuppressFinalize(this);
}
}

View File

@@ -8,7 +8,7 @@ using Websocket.Client;
namespace KfChatDotNetKickBot.Services;
public class Shuffle
public class Shuffle : IDisposable
{
private Logger _logger = LogManager.GetCurrentClassLogger();
private WebsocketClient _wsClient;
@@ -21,13 +21,14 @@ public class Shuffle
public event OnWsDisconnectionEventHandler OnWsDisconnection;
private CancellationToken _cancellationToken = CancellationToken.None;
private CancellationTokenSource _pingCts = new();
private Task _pingTask;
public Shuffle(string? proxy = null, CancellationToken? cancellationToken = null)
{
_proxy = proxy;
if (cancellationToken != null) _cancellationToken = cancellationToken.Value;
// Moved it up here as I'm concerned about the possibility of reconnections creating multiple ping tasks
_ = PeriodicPing();
_pingTask = PeriodicPing();
}
public async Task StartWsClient()
@@ -52,7 +53,8 @@ public class Shuffle
var client = new WebsocketClient(_wsUri, factory)
{
ReconnectTimeout = TimeSpan.FromSeconds(_reconnectTimeout)
ReconnectTimeout = TimeSpan.FromSeconds(_reconnectTimeout),
IsReconnectionEnabled = false // Watchdog will self-destruct this instead
};
client.ReconnectionHappened.Subscribe(WsReconnection);
@@ -71,12 +73,6 @@ public class Shuffle
return _wsClient is { IsRunning: true };
}
private void SendPing()
{
_logger.Debug("Sending ping to Shuffle");
_wsClient.SendInstant("{\"type\":\"ping\"}").Wait(_cancellationToken);
}
private async Task PeriodicPing()
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(10));
@@ -87,12 +83,8 @@ public class Shuffle
_logger.Debug("_wsClient doesn't exist yet, not going to try ping");
continue;
}
if (!IsConnected())
{
_logger.Info("Not connected not going to try send a ping actually");
continue;
}
SendPing();
_logger.Debug("Sending ping to Shuffle");
_wsClient.Send("{\"type\":\"ping\"}");
}
}
@@ -102,11 +94,6 @@ public class Shuffle
_logger.Error($"Close Status => {disconnectionInfo.CloseStatus}; Close Status Description => {disconnectionInfo.CloseStatusDescription}");
_logger.Error(disconnectionInfo.Exception);
OnWsDisconnection?.Invoke(this, disconnectionInfo);
if (disconnectionInfo.Type == DisconnectionType.ByServer)
{
_logger.Info("Forcing reconnection as the type was ByServer");
_wsClient.Reconnect().Wait(_cancellationToken);
}
}
private void WsReconnection(ReconnectionInfo reconnectionInfo)
@@ -116,7 +103,7 @@ public class Shuffle
var initPayload =
"{\"type\":\"connection_init\",\"payload\":{\"x-correlation-id\":\"pdvlnd9tej-di27abvq19-1.30.2-1i0nef1m7-g::anon\",\"authorization\":\"\"}}";
_logger.Debug(initPayload);
_wsClient.SendInstant(initPayload).Wait(_cancellationToken);
_wsClient.Send(initPayload);
}
private void WsMessageReceived(ResponseMessage message)
@@ -147,7 +134,7 @@ public class Shuffle
if (packetType == "pong")
{
_logger.Debug("Shuffle pong packet");
_logger.Info("Shuffle pong packet");
return;
}
@@ -216,4 +203,13 @@ public class Shuffle
}
public class ShuffleUserNotFoundException : Exception;
public void Dispose()
{
_wsClient.Dispose();
_pingCts.Cancel();
_pingCts.Dispose();
_pingTask.Dispose();
GC.SuppressFinalize(this);
}
}

View File

@@ -7,7 +7,7 @@ using Websocket.Client;
namespace KfChatDotNetKickBot.Services;
public class Twitch
public class Twitch : IDisposable
{
private Logger _logger = LogManager.GetCurrentClassLogger();
private WebsocketClient _wsClient;
@@ -18,6 +18,8 @@ public class Twitch
public delegate void OnStreamStateUpdateEventHandler(object sender, int channelId, bool isLive);
public event OnStreamStateUpdateEventHandler OnStreamStateUpdated;
private CancellationToken _cancellationToken = CancellationToken.None;
private Task? _pingTask = null;
private CancellationTokenSource _pingCts = new();
public Twitch(List<int> channels, string? proxy = null, CancellationToken? cancellationToken = null)
{
@@ -45,7 +47,8 @@ public class Twitch
var client = new WebsocketClient(_wsUri, factory)
{
ReconnectTimeout = TimeSpan.FromSeconds(_reconnectTimeout)
ReconnectTimeout = TimeSpan.FromSeconds(_reconnectTimeout),
IsReconnectionEnabled = false
};
client.ReconnectionHappened.Subscribe(WsReconnection);
@@ -58,7 +61,7 @@ public class Twitch
await client.Start();
_logger.Debug("Websocket client started!");
SendPing();
_ = PeriodicPing();
_pingTask = PeriodicPing();
}
public bool IsConnected()
@@ -68,14 +71,14 @@ public class Twitch
private void SendPing()
{
_logger.Debug("Sending ping to Twitch");
_logger.Info("Sending ping to Twitch");
_wsClient.Send("{\"type\":\"PING\"}");
}
private async Task PeriodicPing()
{
using var timer = new PeriodicTimer(TimeSpan.FromMinutes(1));
while (await timer.WaitForNextTickAsync(_cancellationToken))
while (await timer.WaitForNextTickAsync(_pingCts.Token))
{
SendPing();
}
@@ -86,11 +89,6 @@ public class Twitch
_logger.Error($"Client disconnected from the chat (or never successfully connected). Type is {disconnectionInfo.Type}");
_logger.Error($"Close Status => {disconnectionInfo.CloseStatus}; Close Status Description => {disconnectionInfo.CloseStatusDescription}");
_logger.Error(disconnectionInfo.Exception);
if (disconnectionInfo.Type == DisconnectionType.ByServer)
{
_logger.Info("Forcing reconnection as the type was ByServer");
_wsClient.Reconnect().Wait(_cancellationToken);
}
}
private void WsReconnection(ReconnectionInfo reconnectionInfo)
@@ -205,4 +203,13 @@ public class Twitch
}
public class TwitchUserNotFoundException : Exception;
public void Dispose()
{
_wsClient.Dispose();
_pingCts.Cancel();
_pingCts.Dispose();
_pingTask?.Dispose();
GC.SuppressFinalize(this);
}
}

View File

@@ -7,7 +7,7 @@ using Websocket.Client;
namespace KfChatDotNetKickBot.Services;
public class TwitchChat
public class TwitchChat : IDisposable
{
private readonly Logger _logger = LogManager.GetCurrentClassLogger();
private WebsocketClient _wsClient;
@@ -54,7 +54,8 @@ public class TwitchChat
var client = new WebsocketClient(_wsUri, factory)
{
ReconnectTimeout = TimeSpan.FromSeconds(ReconnectTimeout)
ReconnectTimeout = TimeSpan.FromSeconds(ReconnectTimeout),
IsReconnectionEnabled = false
};
client.ReconnectionHappened.Subscribe(WsReconnection);
@@ -79,11 +80,6 @@ public class TwitchChat
_logger.Error($"Close Status => {disconnectionInfo.CloseStatus}; Close Status Description => {disconnectionInfo.CloseStatusDescription}");
_logger.Error(disconnectionInfo.Exception);
OnWsDisconnection?.Invoke(this, disconnectionInfo);
if (disconnectionInfo.Type == DisconnectionType.ByServer)
{
_logger.Info("Forcing reconnection as the type was ByServer");
_wsClient.Reconnect().Wait(_cancellationToken);
}
}
private void WsReconnection(ReconnectionInfo reconnectionInfo)
@@ -150,7 +146,7 @@ public class TwitchChat
switch (command)
{
case "PING":
_logger.Debug("Received PING, sending PONG");
_logger.Info("Received PING, sending PONG");
_wsClient.Send("PONG");
return;
case "JOIN":
@@ -175,4 +171,10 @@ public class TwitchChat
_logger.Error("--- End of IRC Message ---");
}
}
public void Dispose()
{
_wsClient.Dispose();
GC.SuppressFinalize(this);
}
}