Added reconnection backoff for 7tv & hermes

This commit is contained in:
Tom 2024-08-11 21:41:22 +00:00
parent 0ad063cebd
commit 9f61295d2d
4 changed files with 37 additions and 37 deletions

View File

@ -2,6 +2,7 @@ using System.Net.WebSockets;
using System.Text.Json; using System.Text.Json;
using System.Timers; using System.Timers;
using CommonSocketLibrary.Abstract; using CommonSocketLibrary.Abstract;
using CommonSocketLibrary.Backoff;
using CommonSocketLibrary.Common; using CommonSocketLibrary.Common;
using HermesSocketLibrary.Requests.Callbacks; using HermesSocketLibrary.Requests.Callbacks;
using HermesSocketLibrary.Requests.Messages; using HermesSocketLibrary.Requests.Messages;
@ -24,8 +25,8 @@ namespace TwitchChatTTS.Hermes.Socket
public DateTime LastHeartbeatReceived { get; set; } public DateTime LastHeartbeatReceived { get; set; }
public DateTime LastHeartbeatSent { get; set; } public DateTime LastHeartbeatSent { get; set; }
public string? UserId { get; set; } public string? UserId { get; set; }
private System.Timers.Timer _heartbeatTimer; private readonly System.Timers.Timer _heartbeatTimer;
private System.Timers.Timer _reconnectTimer; private readonly IBackoff _backoff;
public bool Connected { get; set; } public bool Connected { get; set; }
public bool LoggedIn { get; set; } public bool LoggedIn { get; set; }
@ -36,6 +37,7 @@ namespace TwitchChatTTS.Hermes.Socket
User user, User user,
Configuration configuration, Configuration configuration,
ICallbackManager<HermesRequestData> callbackManager, ICallbackManager<HermesRequestData> callbackManager,
[FromKeyedServices("hermes")] IBackoff backoff,
[FromKeyedServices("hermes")] IEnumerable<IWebSocketHandler> handlers, [FromKeyedServices("hermes")] IEnumerable<IWebSocketHandler> handlers,
[FromKeyedServices("hermes")] MessageTypeManager<IWebSocketHandler> typeManager, [FromKeyedServices("hermes")] MessageTypeManager<IWebSocketHandler> typeManager,
ILogger logger ILogger logger
@ -48,13 +50,10 @@ namespace TwitchChatTTS.Hermes.Socket
_user = user; _user = user;
_configuration = configuration; _configuration = configuration;
_callbackManager = callbackManager; _callbackManager = callbackManager;
_backoff = backoff;
_heartbeatTimer = new System.Timers.Timer(TimeSpan.FromSeconds(15)); _heartbeatTimer = new System.Timers.Timer(TimeSpan.FromSeconds(15));
_heartbeatTimer.Elapsed += async (sender, e) => await HandleHeartbeat(e); _heartbeatTimer.Elapsed += async (sender, e) => await HandleHeartbeat(e);
_reconnectTimer = new System.Timers.Timer(TimeSpan.FromSeconds(15));
_reconnectTimer.Elapsed += async (sender, e) => await Reconnect(e);
LastHeartbeatReceived = LastHeartbeatSent = DateTime.UtcNow; LastHeartbeatReceived = LastHeartbeatSent = DateTime.UtcNow;
URL = $"wss://{BASE_URL}"; URL = $"wss://{BASE_URL}";
} }
@ -216,7 +215,6 @@ namespace TwitchChatTTS.Hermes.Socket
Connected = true; Connected = true;
_logger.Information("Hermes websocket client connected."); _logger.Information("Hermes websocket client connected.");
_reconnectTimer.Enabled = false;
_heartbeatTimer.Enabled = true; _heartbeatTimer.Enabled = true;
LastHeartbeatReceived = DateTime.UtcNow; LastHeartbeatReceived = DateTime.UtcNow;
@ -228,7 +226,7 @@ namespace TwitchChatTTS.Hermes.Socket
}); });
}; };
OnDisconnected += (sender, e) => OnDisconnected += async (sender, e) =>
{ {
Connected = false; Connected = false;
LoggedIn = false; LoggedIn = false;
@ -236,7 +234,7 @@ namespace TwitchChatTTS.Hermes.Socket
_logger.Warning("Hermes websocket client disconnected."); _logger.Warning("Hermes websocket client disconnected.");
_heartbeatTimer.Enabled = false; _heartbeatTimer.Enabled = false;
_reconnectTimer.Enabled = true; await Reconnect(_backoff, async () => await Connect());
}; };
} }
@ -336,7 +334,7 @@ namespace TwitchChatTTS.Hermes.Socket
{ {
var signalTime = e.SignalTime.ToUniversalTime(); var signalTime = e.SignalTime.ToUniversalTime();
if (signalTime - LastHeartbeatReceived > TimeSpan.FromSeconds(60)) if (signalTime - LastHeartbeatReceived > TimeSpan.FromSeconds(30))
{ {
if (LastHeartbeatReceived > LastHeartbeatSent) if (LastHeartbeatReceived > LastHeartbeatSent)
{ {
@ -351,7 +349,7 @@ namespace TwitchChatTTS.Hermes.Socket
_logger.Error(ex, "Failed to send a heartbeat back to the Hermes websocket server."); _logger.Error(ex, "Failed to send a heartbeat back to the Hermes websocket server.");
} }
} }
else if (signalTime - LastHeartbeatReceived > TimeSpan.FromSeconds(120)) else if (signalTime - LastHeartbeatReceived > TimeSpan.FromSeconds(60))
{ {
try try
{ {
@ -364,11 +362,9 @@ namespace TwitchChatTTS.Hermes.Socket
LoggedIn = false; LoggedIn = false;
Connected = false; Connected = false;
} }
UserId = null; UserId = null;
_heartbeatTimer.Enabled = false; _heartbeatTimer.Enabled = false;
_logger.Warning("Logged off due to disconnection. Attempting to reconnect...");
_reconnectTimer.Enabled = true;
} }
} }
} }

View File

@ -15,12 +15,13 @@ namespace TwitchChatTTS.Seven.Socket.Handlers
_logger = logger; _logger = logger;
} }
public async Task Execute<Data>(SocketClient<WebSocketMessage> sender, Data data) public Task Execute<Data>(SocketClient<WebSocketMessage> sender, Data data)
{ {
if (data is not ReconnectMessage message || message == null) if (data is not ReconnectMessage message || message == null)
return; return Task.CompletedTask;
_logger.Information($"7tv server wants this client to reconnect (reason: {message.Reason})."); _logger.Information($"7tv server wants this client to reconnect [reason: {message.Reason}]");
return Task.CompletedTask;
} }
} }
} }

View File

@ -4,6 +4,7 @@ using Microsoft.Extensions.DependencyInjection;
using Serilog; using Serilog;
using TwitchChatTTS.Seven.Socket.Data; using TwitchChatTTS.Seven.Socket.Data;
using System.Text.Json; using System.Text.Json;
using CommonSocketLibrary.Backoff;
namespace TwitchChatTTS.Seven.Socket namespace TwitchChatTTS.Seven.Socket
{ {
@ -14,12 +15,15 @@ namespace TwitchChatTTS.Seven.Socket
private readonly int[] _reconnectDelay; private readonly int[] _reconnectDelay;
private string? URL; private string? URL;
private readonly IBackoff _backoff;
public bool Connected { get; set; } public bool Connected { get; set; }
public SevenHelloMessage? ConnectionDetails { get; set; } public SevenHelloMessage? ConnectionDetails { get; set; }
public SevenSocketClient( public SevenSocketClient(
User user, User user,
[FromKeyedServices("7tv")] IBackoff backoff,
[FromKeyedServices("7tv")] IEnumerable<IWebSocketHandler> handlers, [FromKeyedServices("7tv")] IEnumerable<IWebSocketHandler> handlers,
[FromKeyedServices("7tv")] MessageTypeManager<IWebSocketHandler> typeManager, [FromKeyedServices("7tv")] MessageTypeManager<IWebSocketHandler> typeManager,
ILogger logger ILogger logger
@ -30,6 +34,7 @@ namespace TwitchChatTTS.Seven.Socket
}, logger) }, logger)
{ {
_user = user; _user = user;
_backoff = backoff;
ConnectionDetails = null; ConnectionDetails = null;
_errorCodes = [ _errorCodes = [
@ -102,12 +107,6 @@ namespace TwitchChatTTS.Seven.Socket
{ {
_logger.Error(ex, "Could not connect to 7tv websocket."); _logger.Error(ex, "Could not connect to 7tv websocket.");
} }
if (!Connected)
{
await Task.Delay(TimeSpan.FromSeconds(30));
await Connect();
}
} }
private async void OnDisconnection(object? sender, SocketDisconnectionEventArgs e) private async void OnDisconnection(object? sender, SocketDisconnectionEventArgs e)
@ -128,27 +127,29 @@ namespace TwitchChatTTS.Seven.Socket
_logger.Error($"7tv client will remain disconnected due to a bad client implementation."); _logger.Error($"7tv client will remain disconnected due to a bad client implementation.");
return; return;
} }
else if (_reconnectDelay[code] > 0) else if (_reconnectDelay[code] > 1000)
await Task.Delay(_reconnectDelay[code]); await Task.Delay(_reconnectDelay[code] - 1000);
} }
else else
{ {
_logger.Warning("Unknown 7tv disconnection."); _logger.Warning("Unknown 7tv disconnection.");
await Task.Delay(TimeSpan.FromSeconds(30));
} }
await Connect(); Task.Run(async () =>
await Task.Delay(TimeSpan.FromMilliseconds(500)); {
await Reconnect(_backoff, async () => await Connect());
await Task.Delay(TimeSpan.FromMilliseconds(500));
if (Connected && ConnectionDetails?.SessionId != null) if (Connected && ConnectionDetails?.SessionId != null)
{ {
await Send(34, new ResumeMessage() { SessionId = ConnectionDetails.SessionId }); await Send(34, new ResumeMessage() { SessionId = ConnectionDetails.SessionId });
_logger.Debug("Resumed connection to 7tv websocket."); _logger.Debug("Resumed connection to 7tv websocket.");
} }
else else
{ {
_logger.Debug("Resumed connection to 7tv websocket on a different session."); _logger.Debug("Resumed connection to 7tv websocket on a different session.");
} }
});
} }
} }
} }

View File

@ -99,6 +99,7 @@ s.AddKeyedSingleton<MessageTypeManager<IWebSocketHandler>, OBSMessageTypeManager
s.AddKeyedSingleton<SocketClient<WebSocketMessage>, OBSSocketClient>("obs"); s.AddKeyedSingleton<SocketClient<WebSocketMessage>, OBSSocketClient>("obs");
// 7tv websocket // 7tv websocket
s.AddKeyedSingleton<IBackoff>("7tv", new ExponentialBackoff(1000, 30 * 1000));
s.AddKeyedSingleton<IWebSocketHandler, SevenHelloHandler>("7tv"); s.AddKeyedSingleton<IWebSocketHandler, SevenHelloHandler>("7tv");
s.AddKeyedSingleton<IWebSocketHandler, DispatchHandler>("7tv"); s.AddKeyedSingleton<IWebSocketHandler, DispatchHandler>("7tv");
s.AddKeyedSingleton<IWebSocketHandler, ReconnectHandler>("7tv"); s.AddKeyedSingleton<IWebSocketHandler, ReconnectHandler>("7tv");
@ -142,6 +143,7 @@ s.AddKeyedSingleton<ITwitchSocketHandler, ChannelSubscriptionHandler>("twitch-no
s.AddKeyedSingleton<ITwitchSocketHandler, ChannelSubscriptionGiftHandler>("twitch-notifications"); s.AddKeyedSingleton<ITwitchSocketHandler, ChannelSubscriptionGiftHandler>("twitch-notifications");
// hermes websocket // hermes websocket
s.AddKeyedSingleton<IBackoff>("hermes", new ExponentialBackoff(1000, 15 * 1000));
s.AddKeyedSingleton<IWebSocketHandler, HeartbeatHandler>("hermes"); s.AddKeyedSingleton<IWebSocketHandler, HeartbeatHandler>("hermes");
s.AddKeyedSingleton<IWebSocketHandler, LoginAckHandler>("hermes"); s.AddKeyedSingleton<IWebSocketHandler, LoginAckHandler>("hermes");
s.AddKeyedSingleton<IWebSocketHandler, RequestAckHandler>("hermes"); s.AddKeyedSingleton<IWebSocketHandler, RequestAckHandler>("hermes");