using System.Net; using System.Net.WebSockets; using System.Text.Json; using System.Text.Json.Serialization; using KickWsClient.Models; using Websocket.Client; using NLog; namespace KickWsClient; public class KickWsClient : IDisposable { public event EventHandlers.OnPusherConnectionEstablishedEventHandler? OnPusherConnectionEstablished; public event EventHandlers.OnPusherSubscriptionSucceededEventHandler? OnPusherSubscriptionSucceeded; public event EventHandlers.OnPusherPongEventHandler? OnPusherPong; public event EventHandlers.OnFollowersUpdatedEventHandler? OnFollowersUpdated; public event EventHandlers.OnChatMessageEventHandler? OnChatMessage; public event EventHandlers.OnChannelSubscriptionEventHandler? OnChannelSubscription; public event EventHandlers.OnSubscriptionEventHandler? OnSubscription; public event EventHandlers.OnMessageDeletedEventHandler? OnMessageDeleted; public event EventHandlers.OnUserBannedEventHandler? OnUserBanned; public event EventHandlers.OnUserUnbannedEventHandler? OnUserUnbanned; public event EventHandlers.OnUpdatedLiveStreamEventHandler? OnUpdatedLiveStream; public event EventHandlers.OnStopStreamBroadcastEventHandler? OnStopStreamBroadcast; public event EventHandlers.OnStreamerIsLiveEventHandler? OnStreamerIsLive; public event EventHandlers.OnWsDisconnectionEventHandler? OnWsDisconnection; public event EventHandlers.OnWsReconnectEventHandler? OnWsReconnect; // You really shouldn't use this unless you're extending the functionality of the library, e.g. adding support for // not yet implemented message types. public event EventHandlers.OnWsMessageReceivedEventHandler? OnWsMessageReceived; public event EventHandlers.OnPollUpdateEventHandler? OnPollUpdate; private WebsocketClient? _wsClient; private readonly Logger _logger = LogManager.GetCurrentClassLogger(); private Uri _kickPusherUri; private int _reconnectTimeout; private string? _proxy; private bool _reconnectionEnabled; private Task? _pingTask; private TimeSpan _pingInterval = TimeSpan.FromSeconds(15); private readonly CancellationTokenSource _pingCts = new(); public KickWsClient( string kickPusherUri = "wss://ws-us2.pusher.com/app/32cbd69e4b950bf97679?protocol=7&client=js&version=7.6.0&flash=false", string? proxy = null, int reconnectTimeout = 30, bool reconnectionEnabled = true) { _kickPusherUri = new Uri(kickPusherUri); _proxy = proxy; _reconnectTimeout = reconnectTimeout; _reconnectionEnabled = reconnectionEnabled; } public async Task StartWsClient() { _logger.Debug("StartWsClient() called, creating client"); _wsClient = await CreateWsClient(); } public void Disconnect() { _logger.Debug("Disconnect() called, closing Websocket"); _wsClient?.Stop(WebSocketCloseStatus.NormalClosure, "Closing websocket").Wait(); } private async Task CreateWsClient() { var factory = new Func(() => { var clientWs = new ClientWebSocket(); if (_proxy == null) return clientWs; clientWs.Options.Proxy = new WebProxy(_proxy); return clientWs; }); var client = new WebsocketClient(_kickPusherUri, factory) { ReconnectTimeout = TimeSpan.FromSeconds(_reconnectTimeout), IsReconnectionEnabled = _reconnectionEnabled }; client.ReconnectionHappened.Subscribe(WsReconnection); client.MessageReceived.Subscribe(WsMessageReceived); client.DisconnectionHappened.Subscribe(WsDisconnection); _logger.Debug("Websocket client has been built, about to start"); await client.Start(); _logger.Debug("Websocket client started!"); return client; } public bool IsConnected() { return _wsClient is { IsRunning: true }; } private void WsDisconnection(DisconnectionInfo disconnectionInfo) { _logger.Error($"Client disconnected from the chat (or never successfully connected). Type is {disconnectionInfo.Type}"); _logger.Error($"Close Status => {disconnectionInfo.CloseStatus}; Close Status Description => {disconnectionInfo.CloseStatusDescription}"); _logger.Error(disconnectionInfo.Exception); OnWsDisconnection?.Invoke(this, disconnectionInfo); } private void WsReconnection(ReconnectionInfo reconnectionInfo) { _logger.Error($"Websocket connection dropped and reconnected. Reconnection type is {reconnectionInfo.Type}"); if (reconnectionInfo.Type == ReconnectionType.Initial) { _logger.Error("Not firing the reconnection event as this is the initial event"); return; } OnWsReconnect?.Invoke(this, reconnectionInfo); } /// /// Send a generic Pusher packet /// /// Event name /// Event data public void SendPusherPacket(string eventName, object data) { var pkt = new PusherModels.BasePusherRequestModel { Event = eventName, Data = data}; var json = JsonSerializer.Serialize(pkt); _logger.Debug("Sending message to Pusher"); _logger.Debug(json); _wsClient?.Send(json); } /// /// Send a ping packet. You should expect a pong response immediately after /// public void SendPusherPing() { SendPusherPacket("pusher:ping", new object()); } private async Task PingTimer() { using var timer = new PeriodicTimer(_pingInterval); while (await timer.WaitForNextTickAsync(_pingCts.Token)) { if (_wsClient == null) { _logger.Debug("_wsClient doesn't exist yet, not going to try ping"); continue; } _logger.Debug("Sent ping to Pusher"); SendPusherPing(); } } /// /// Send a pusher subscribe packet to subscribe to a channel or chatroom. You should receive a subscription succeeded packet /// /// Channel string e.g. channel.2515504 /// Optional authentication string. Empty string means guest public void SendPusherSubscribe(string channel, string auth = "") { var subPacket = new PusherModels.PusherSubscribeRequestModel { Auth = auth, Channel = channel }; SendPusherPacket("pusher:subscribe", subPacket); } /// /// Send pusher unsubscribe packet to unsub from a channel or chatroom. Expect no response /// /// Channel string e.g. channel.2515504 public void SendPusherUnsubscribe(string channel) { var unsubPacket = new PusherModels.PusherUnsubscribeRequestModel { Channel = channel }; SendPusherPacket("pusher:unsubscribe", unsubPacket); } private void WsMessageReceived(ResponseMessage message) { OnWsMessageReceived?.Invoke(this, message); var options = new JsonSerializerOptions { NumberHandling = JsonNumberHandling.AllowReadingFromString }; if (message.Text == null) { _logger.Info("Websocket message was null, ignoring packet"); return; } PusherModels.BasePusherEventModel pusherMsg; try { pusherMsg = JsonSerializer.Deserialize(message.Text, options) ?? throw new InvalidOperationException(); } catch (Exception e) { _logger.Error("Failed to parse Pusher message. Exception follows:"); _logger.Error(e); _logger.Error("--- Message from Pusher follows ---"); _logger.Error(message.Text); _logger.Error("--- /end of message ---"); return; } _logger.Debug($"Pusher event receievd: {pusherMsg.Event}"); switch (pusherMsg.Event) { case "pusher:connection_established": { var data = JsonSerializer.Deserialize(pusherMsg.Data, options); OnPusherConnectionEstablished?.Invoke(this, data); if (_pingTask != null) return; _pingTask = Task.Run(PingTimer); _logger.Debug("Started ping timer"); return; } case "pusher_internal:subscription_succeeded": OnPusherSubscriptionSucceeded?.Invoke(this, pusherMsg); return; case "pusher:pong": OnPusherPong?.Invoke(this, pusherMsg); return; case @"App\Events\FollowersUpdated": { var data = JsonSerializer.Deserialize(pusherMsg.Data, options); OnFollowersUpdated?.Invoke(this, data); return; } case @"App\Events\ChatMessageEvent": { var data = JsonSerializer.Deserialize(pusherMsg.Data, options); OnChatMessage?.Invoke(this, data); return; } case @"App\Events\ChannelSubscriptionEvent": { var data = JsonSerializer.Deserialize(pusherMsg.Data, options); OnChannelSubscription?.Invoke(this, data); return; } case @"App\Events\SubscriptionEvent": { var data = JsonSerializer.Deserialize(pusherMsg.Data, options); OnSubscription?.Invoke(this, data); return; } case @"App\Events\MessageDeletedEvent": { var data = JsonSerializer.Deserialize(pusherMsg.Data, options); OnMessageDeleted?.Invoke(this, data); return; } case @"App\Events\UserBannedEvent": { var data = JsonSerializer.Deserialize(pusherMsg.Data, options); OnUserBanned?.Invoke(this, data); return; } case @"App\Events\UserUnbannedEvent": { var data = JsonSerializer.Deserialize(pusherMsg.Data, options); OnUserUnbanned?.Invoke(this, data); return; } case @"App\Events\LiveStream\UpdatedLiveStreamEvent": { var data = JsonSerializer.Deserialize(pusherMsg.Data, options); OnUpdatedLiveStream?.Invoke(this, data); return; } case @"App\Events\StopStreamBroadcast": { var data = JsonSerializer.Deserialize(pusherMsg.Data, options); OnStopStreamBroadcast?.Invoke(this, data); return; } case @"App\Events\StreamerIsLive": { var data = JsonSerializer.Deserialize(pusherMsg.Data, options); OnStreamerIsLive?.Invoke(this, data); return; } case @"App\Events\PollUpdateEvent": { var data = JsonSerializer.Deserialize(pusherMsg.Data, options); OnPollUpdate?.Invoke(this, data); return; } default: _logger.Info("Event unhandled. JSON payload follows"); _logger.Info(message.Text); break; } } public void Dispose() { _logger.Debug("Disposing Kick WS Client"); _pingCts.Cancel(); _pingCts.Dispose(); _pingTask?.Dispose(); _wsClient?.Dispose(); GC.SuppressFinalize(this); } }