mirror of
https://github.com/barelyprofessional/KfChatDotNet.git
synced 2026-05-02 12:32:03 -04:00
Added Twitch PubSub support. Heavily ripped off code from GambaSesh though rewrote it to use Websocket.Client and will refactor the JSON parsing once I know exactly what the payload looks like. Presently untested. Also added a new config option to suppress messages, so one can run a local copy of the bot without disturbing anyone and made the Proxy a single config rather than having one for KF and one for Pusher.
This commit is contained in:
145
KfChatDotNetKickBot/Services/TwitchWs.cs
Normal file
145
KfChatDotNetKickBot/Services/TwitchWs.cs
Normal file
@@ -0,0 +1,145 @@
|
||||
using System.Net;
|
||||
using System.Net.WebSockets;
|
||||
using System.Text.Json;
|
||||
using NLog;
|
||||
using Websocket.Client;
|
||||
|
||||
namespace KfChatDotNetKickBot.Services;
|
||||
|
||||
public class TwitchWs
|
||||
{
|
||||
private Logger _logger = LogManager.GetCurrentClassLogger();
|
||||
private WebsocketClient _wsClient;
|
||||
private Uri _wsUri = new("wss://pubsub-edge.twitch.tv/v1");
|
||||
private int _reconnectTimeout = 300;
|
||||
private string? _proxy;
|
||||
private List<int> _channels;
|
||||
public delegate void OnStreamStateUpdateEventHandler(object sender, int channelId, bool isLive);
|
||||
public event OnStreamStateUpdateEventHandler OnStreamStateUpdated;
|
||||
private CancellationToken _cancellationToken = CancellationToken.None;
|
||||
|
||||
public TwitchWs(List<int> channels, string? proxy = null, CancellationToken? cancellationToken = null)
|
||||
{
|
||||
_proxy = proxy;
|
||||
_channels = channels;
|
||||
if (cancellationToken != null) _cancellationToken = cancellationToken.Value;
|
||||
}
|
||||
|
||||
public async Task StartWsClient()
|
||||
{
|
||||
_logger.Debug("StartWsClient() called, creating client");
|
||||
await CreateWsClient();
|
||||
}
|
||||
|
||||
private async Task CreateWsClient()
|
||||
{
|
||||
var factory = new Func<ClientWebSocket>(() =>
|
||||
{
|
||||
var clientWs = new ClientWebSocket();
|
||||
if (_proxy == null) return clientWs;
|
||||
_logger.Debug($"Using proxy address {_proxy}");
|
||||
clientWs.Options.Proxy = new WebProxy(_proxy);
|
||||
return clientWs;
|
||||
});
|
||||
|
||||
var client = new WebsocketClient(_wsUri, factory)
|
||||
{
|
||||
ReconnectTimeout = TimeSpan.FromSeconds(_reconnectTimeout)
|
||||
};
|
||||
|
||||
client.ReconnectionHappened.Subscribe(WsReconnection);
|
||||
client.MessageReceived.Subscribe(WsMessageReceived);
|
||||
client.DisconnectionHappened.Subscribe(WsDisconnection);
|
||||
|
||||
_wsClient = client;
|
||||
|
||||
_logger.Debug("Websocket client has been built, about to start");
|
||||
await client.Start();
|
||||
_logger.Debug("Websocket client started!");
|
||||
SendPing();
|
||||
_ = PeriodicPing();
|
||||
}
|
||||
|
||||
public bool IsConnected()
|
||||
{
|
||||
return _wsClient is { IsRunning: true };
|
||||
}
|
||||
|
||||
private void SendPing()
|
||||
{
|
||||
_logger.Debug("Sending ping to Twitch");
|
||||
_wsClient.Send("{\"type\":\"PING\"}");
|
||||
}
|
||||
|
||||
private async Task PeriodicPing()
|
||||
{
|
||||
using var timer = new PeriodicTimer(TimeSpan.FromMinutes(1));
|
||||
while (await timer.WaitForNextTickAsync(_cancellationToken))
|
||||
{
|
||||
SendPing();
|
||||
}
|
||||
}
|
||||
|
||||
private void WsDisconnection(DisconnectionInfo disconnectionInfo)
|
||||
{
|
||||
_logger.Error($"Client disconnected from the chat (or never successfully connected). Type is {disconnectionInfo.Type}");
|
||||
_logger.Error(disconnectionInfo.Exception);
|
||||
}
|
||||
|
||||
private void WsReconnection(ReconnectionInfo reconnectionInfo)
|
||||
{
|
||||
_logger.Error($"Websocket connection dropped and reconnected. Reconnection type is {reconnectionInfo.Type}");
|
||||
_logger.Info("Sending subscription requests");
|
||||
foreach (var channel in _channels)
|
||||
{
|
||||
_logger.Info($"Subscribing to {channel}");
|
||||
_wsClient.Send("{\"data\":{\"topics\":[\"video-playback-by-id." + _channels + "\"]},\"nonce\":\"" + Guid.NewGuid() + "\",\"type\":\"LISTEN\"}");
|
||||
}
|
||||
}
|
||||
|
||||
private void WsMessageReceived(ResponseMessage message)
|
||||
{
|
||||
if (message.Text == null)
|
||||
{
|
||||
_logger.Info("Twitch sent a null message");
|
||||
return;
|
||||
}
|
||||
_logger.Debug($"Received event from Twitch: {message.Text}");
|
||||
|
||||
try
|
||||
{
|
||||
var packet = JsonSerializer.Deserialize<JsonElement>(message.Text);
|
||||
if (packet.GetProperty("type").GetString() != "MESSAGE")
|
||||
return;
|
||||
var data = packet.GetProperty("data")!;
|
||||
var topicString = data.GetProperty("topic")!.GetString()!;
|
||||
if (!topicString.StartsWith("video-playback-by-id."))
|
||||
return;
|
||||
var topicParts = topicString.Split('.');
|
||||
var channelId = int.Parse(topicParts[^1]);
|
||||
var twitchMessage = data.GetProperty("message")!.GetString()!;
|
||||
|
||||
if (twitchMessage.Contains("\"type\":\"stream-up\""))
|
||||
{
|
||||
OnStreamStateUpdated?.Invoke(this, channelId, true);
|
||||
return;
|
||||
}
|
||||
|
||||
if (twitchMessage.Contains("\"type\":\"stream-down\""))
|
||||
{
|
||||
OnStreamStateUpdated?.Invoke(this, channelId, false);
|
||||
return;
|
||||
}
|
||||
_logger.Info("Message from Twitch was unhandled");
|
||||
_logger.Info(message.Text);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.Error("Failed to handle message from Twitch");
|
||||
_logger.Error(e);
|
||||
_logger.Error("--- JSON Payload ---");
|
||||
_logger.Error(message.Text);
|
||||
_logger.Error("--- End of JSON Payload ---");
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user