Added the ability to disable reconnection in the Kick library, fixed a null reference error if IsConnected() is called without ever being connected and incorporated the ping thread into the library rather than doing it separately

This commit is contained in:
barelyprofessional
2024-09-01 00:47:16 +08:00
parent 99b6afcec5
commit 26a327caa0

View File

@@ -9,7 +9,7 @@ using NLog;
namespace KickWsClient; namespace KickWsClient;
public class KickWsClient public class KickWsClient : IDisposable
{ {
public event EventHandlers.OnPusherConnectionEstablishedEventHandler OnPusherConnectionEstablished; public event EventHandlers.OnPusherConnectionEstablishedEventHandler OnPusherConnectionEstablished;
public event EventHandlers.OnPusherSubscriptionSucceededEventHandler OnPusherSubscriptionSucceeded; public event EventHandlers.OnPusherSubscriptionSucceededEventHandler OnPusherSubscriptionSucceeded;
@@ -31,20 +31,25 @@ public class KickWsClient
public event EventHandlers.OnWsMessageReceivedEventHandler OnWsMessageReceived; public event EventHandlers.OnWsMessageReceivedEventHandler OnWsMessageReceived;
public event EventHandlers.OnPollUpdateEventHandler OnPollUpdate; public event EventHandlers.OnPollUpdateEventHandler OnPollUpdate;
private WebsocketClient _wsClient; private WebsocketClient? _wsClient;
private readonly Logger _logger = LogManager.GetCurrentClassLogger(); private readonly Logger _logger = LogManager.GetCurrentClassLogger();
private Uri _kickPusherUri; private Uri _kickPusherUri;
private int _reconnectTimeout; private int _reconnectTimeout;
private string? _proxy; private string? _proxy;
private bool _reconnectionEnabled;
private Task? _pingTask;
private TimeSpan _pingInterval = TimeSpan.FromSeconds(15);
private readonly CancellationTokenSource _pingCts = new();
public KickWsClient( public KickWsClient(
string kickPusherUri = string kickPusherUri =
"wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=7.6.0&flash=false", "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=7.6.0&flash=false",
string? proxy = null, int reconnectTimeout = 30) string? proxy = null, int reconnectTimeout = 30, bool reconnectionEnabled = true)
{ {
_kickPusherUri = new Uri(kickPusherUri); _kickPusherUri = new Uri(kickPusherUri);
_proxy = proxy; _proxy = proxy;
_reconnectTimeout = reconnectTimeout; _reconnectTimeout = reconnectTimeout;
_reconnectionEnabled = reconnectionEnabled;
} }
public async Task StartWsClient() public async Task StartWsClient()
@@ -56,7 +61,7 @@ public class KickWsClient
public void Disconnect() public void Disconnect()
{ {
_logger.Debug("Disconnect() called, closing Websocket"); _logger.Debug("Disconnect() called, closing Websocket");
_wsClient.Stop(WebSocketCloseStatus.NormalClosure, "Closing websocket").Wait(); _wsClient?.Stop(WebSocketCloseStatus.NormalClosure, "Closing websocket").Wait();
} }
private async Task<WebsocketClient> CreateWsClient() private async Task<WebsocketClient> CreateWsClient()
@@ -71,7 +76,8 @@ public class KickWsClient
var client = new WebsocketClient(_kickPusherUri, factory) var client = new WebsocketClient(_kickPusherUri, factory)
{ {
ReconnectTimeout = TimeSpan.FromSeconds(_reconnectTimeout) ReconnectTimeout = TimeSpan.FromSeconds(_reconnectTimeout),
IsReconnectionEnabled = _reconnectionEnabled
}; };
client.ReconnectionHappened.Subscribe(WsReconnection); client.ReconnectionHappened.Subscribe(WsReconnection);
@@ -119,7 +125,7 @@ public class KickWsClient
var json = JsonSerializer.Serialize(pkt); var json = JsonSerializer.Serialize(pkt);
_logger.Debug("Sending message to Pusher"); _logger.Debug("Sending message to Pusher");
_logger.Debug(json); _logger.Debug(json);
_wsClient.Send(json); _wsClient?.Send(json);
} }
/// <summary> /// <summary>
@@ -130,6 +136,21 @@ public class KickWsClient
SendPusherPacket("pusher:ping", new object()); SendPusherPacket("pusher:ping", new object());
} }
private async Task PingTimer()
{
using var timer = new PeriodicTimer(_pingInterval);
while (await timer.WaitForNextTickAsync(_pingCts.Token))
{
if (_wsClient == null)
{
_logger.Debug("_wsClient doesn't exist yet, not going to try ping");
continue;
}
_logger.Debug("Sent ping to Pusher");
SendPusherPing();
}
}
/// <summary> /// <summary>
/// Send a pusher subscribe packet to subscribe to a channel or chatroom. You should receive a subscription succeeded packet /// Send a pusher subscribe packet to subscribe to a channel or chatroom. You should receive a subscription succeeded packet
/// </summary> /// </summary>
@@ -190,6 +211,9 @@ public class KickWsClient
var data = var data =
JsonSerializer.Deserialize<PusherModels.PusherConnectionEstablishedEventModel>(pusherMsg.Data, options); JsonSerializer.Deserialize<PusherModels.PusherConnectionEstablishedEventModel>(pusherMsg.Data, options);
OnPusherConnectionEstablished?.Invoke(this, data); OnPusherConnectionEstablished?.Invoke(this, data);
if (_pingTask != null) return;
_pingTask = Task.Run(PingTimer);
_logger.Debug("Started ping timer");
return; return;
} }
case "pusher_internal:subscription_succeeded": case "pusher_internal:subscription_succeeded":
@@ -270,4 +294,14 @@ public class KickWsClient
break; break;
} }
} }
public void Dispose()
{
_logger.Debug("Disposing Kick WS Client");
_pingCts.Cancel();
_pingCts.Dispose();
_pingTask?.Dispose();
_wsClient?.Dispose();
GC.SuppressFinalize(this);
}
} }