From ab90d47b895a752c28ba1ee3435b0959981963df Mon Sep 17 00:00:00 2001 From: Tom Date: Sat, 10 Aug 2024 19:31:08 +0000 Subject: [PATCH] Fixed several socket issues. Added backoff for reconnection. --- Abstract/HandlerTypeManager.cs | 40 +++++--- Abstract/SocketClient.cs | 139 +++++++++++++++----------- Backoff/ExponentialBackoff.cs | 32 ++++++ Backoff/IBackoff.cs | 8 ++ Common/IWebSocketHandler.cs | 3 +- Common/WebSocketClient.cs | 82 ++++++++++----- Common/WebSocketHandlerTypeManager.cs | 4 +- CommonSocketLibrary.csproj | 5 - 8 files changed, 207 insertions(+), 106 deletions(-) create mode 100644 Backoff/ExponentialBackoff.cs create mode 100644 Backoff/IBackoff.cs diff --git a/Abstract/HandlerTypeManager.cs b/Abstract/HandlerTypeManager.cs index 34c4309..b395fc3 100644 --- a/Abstract/HandlerTypeManager.cs +++ b/Abstract/HandlerTypeManager.cs @@ -2,41 +2,55 @@ namespace CommonSocketLibrary.Abstract { - public abstract class HandlerTypeManager + public interface ICodedOperation + { + int OperationCode { get; } + } + + public interface IMessageTypeManager + { + Type? GetMessageTypeByCode(int code); + } + + public abstract class MessageTypeManager : IMessageTypeManager where Handler : ICodedOperation { private readonly IDictionary _types; - public IDictionary HandlerTypes { get => _types; } protected readonly ILogger _logger; - public HandlerTypeManager(ILogger logger, HandlerManager handlers) + public MessageTypeManager(IEnumerable handlers, ILogger logger) { _types = new Dictionary(); _logger = logger; - GenerateHandlerTypes(handlers.Handlers); + GenerateHandlerTypes(handlers); } - - private void GenerateHandlerTypes(IDictionary handlers) + public Type? GetMessageTypeByCode(int code) { - foreach (var entry in handlers) + _types.TryGetValue(code, out Type? type); + return type; + } + + private void GenerateHandlerTypes(IEnumerable handlers) + { + foreach (var handler in handlers) { - if (entry.Value == null) + if (handler == null) { - _logger.Error($"Failed to link websocket handler #{entry.Key} due to null value."); + _logger.Error($"Failed to link websocket handler due to null value."); continue; } - var type = entry.Value.GetType(); + var type = handler.GetType(); var target = FetchMessageType(type); if (target == null) { - _logger.Error($"Failed to link websocket handler #{entry.Key} due to no match for {target}."); + _logger.Error($"Failed to link websocket handler #{handler.OperationCode} due to no match for {target}."); continue; } - _types.Add(entry.Key, target); - _logger.Debug($"Linked websocket handler #{entry.Key} to type {target.AssemblyQualifiedName}."); + _types.Add(handler.OperationCode, target); + _logger.Debug($"Linked websocket handler #{handler.OperationCode} to type {target.AssemblyQualifiedName}."); } } diff --git a/Abstract/SocketClient.cs b/Abstract/SocketClient.cs index 2356cf2..d81741b 100644 --- a/Abstract/SocketClient.cs +++ b/Abstract/SocketClient.cs @@ -1,51 +1,56 @@ +using CommonSocketLibrary.Backoff; using Serilog; +using System.Collections; using System.Net.WebSockets; -using System.Text; using System.Text.Json; namespace CommonSocketLibrary.Abstract { - public abstract class SocketClient : IDisposable + public abstract class SocketClient : IDisposable where Message : class { - private ClientWebSocket? _socket; - private CancellationTokenSource? _cts; + protected ClientWebSocket? _socket; + protected CancellationTokenSource? _cts; + private readonly int ReceiveBufferSize = 8192; protected readonly ILogger _logger; protected readonly JsonSerializerOptions _options; + private bool _disposed; - public bool Connected { get; set; } - public int ReceiveBufferSize { get; } = 8192; + public event EventHandler OnConnected; + public event EventHandler OnDisconnected; public SocketClient(ILogger logger, JsonSerializerOptions options) { _logger = logger; _options = options; - Connected = false; + _disposed = false; } - public async Task ConnectAsync(string url) + protected async Task ConnectAsync(string url) { if (_socket != null) { if (_socket.State == WebSocketState.Open) return; - else _socket.Dispose(); + else if (!_disposed) _socket.Dispose(); } _socket = new ClientWebSocket(); _socket.Options.RemoteCertificateValidationCallback = (o, c, ch, er) => true; _socket.Options.UseDefaultCredentials = false; + _disposed = false; if (_cts != null) _cts.Dispose(); _cts = new CancellationTokenSource(); await _socket.ConnectAsync(new Uri(url), _cts.Token); await Task.Factory.StartNew(ReceiveLoop, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); - await OnConnection(); + OnConnected?.Invoke(this, EventArgs.Empty); } - public async Task DisconnectAsync() + public async Task DisconnectAsync(SocketDisconnectionEventArgs args) { - if (_socket == null || _cts == null) return; - // TODO: requests cleanup code, sub-protocol dependent. + if (_disposed || _socket == null || _cts == null) + return; + if (_socket.State == WebSocketState.Open) { _cts.CancelAfter(TimeSpan.FromMilliseconds(500)); @@ -53,14 +58,20 @@ namespace CommonSocketLibrary.Abstract await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None); } - Connected = false; + OnDisconnected?.Invoke(this, args); _socket.Dispose(); _socket = null; _cts.Dispose(); _cts = null; } - public void Dispose() => DisconnectAsync().Wait(); + public void Dispose() + { + if (_disposed) + return; + + _disposed = true; + } private async Task ReceiveLoop() { @@ -87,78 +98,84 @@ namespace CommonSocketLibrary.Abstract await ResponseReceived(outputStream); } } - catch (TaskCanceledException) { } + catch (WebSocketException wse) + { + string data = string.Join(string.Empty, wse.Data.Cast().Select(e => e.Key + "=" + e.Value)); + _logger.Error($"Websocket connection problem while receiving data [state: {_socket.State}][code: {wse.ErrorCode}][data: {data}]"); + } + catch (TaskCanceledException) + { + _logger.Error($"Socket's receive loop got canceled forcefully [state: {_socket.State}]"); + } finally { - outputStream?.Dispose(); + if (_socket.State.ToString().Contains("Close") || _socket.State == WebSocketState.Aborted) + await DisconnectAsync(new SocketDisconnectionEventArgs(_socket.CloseStatus.ToString()!, _socket.CloseStatusDescription ?? string.Empty)); } } - public async Task SendRaw(string content) + protected async Task Reconnect(IBackoff backoff, Action reconnect) { - if (!Connected) return; - - var bytes = new byte[1024 * 4]; - var array = new ArraySegment(bytes); - var total = Encoding.UTF8.GetBytes(content).Length; - var current = 0; - - while (current < total) + while (true) { - var size = Encoding.UTF8.GetBytes(content.Substring(current), array); - await _socket.SendAsync(array, WebSocketMessageType.Text, true, _cts.Token); - current += size; - } - await OnMessageSend(-1, content); - } - - public async Task Send(int opcode, T data) - { - try - { - var message = GenerateMessage(opcode, data); - var content = JsonSerializer.Serialize(message, _options); - - var bytes = Encoding.UTF8.GetBytes(content); - var array = new ArraySegment(bytes); - var total = bytes.Length; - var current = 0; - - while (current < total) + try { - var size = Encoding.UTF8.GetBytes(content.Substring(current), array); - await _socket.SendAsync(array, WebSocketMessageType.Text, current + size >= total, _cts.Token); - current += size; + TimeSpan delay = backoff.GetNextDelay(); + await Task.Delay(delay); + reconnect.Invoke(); + backoff.Reset(); + break; + } + catch (Exception) + { + _logger.Error("Unable to reconnect to server."); } - await OnMessageSend(opcode, content); - } - catch (Exception e) - { - Connected = false; - _logger.Error(e, "Failed to send a message: " + opcode); } } private async Task ResponseReceived(Stream stream) { + Message? data = null; try { - var data = await JsonSerializer.DeserializeAsync(stream); - await OnResponseReceived(data); + data = await JsonSerializer.DeserializeAsync(stream, _options); } catch (Exception ex) { - _logger.Error(ex, "Failed to read or execute a websocket message."); + _logger.Error(ex, "Failed to read a websocket message."); } finally { stream.Dispose(); } + if (data == null) + { + _logger.Error("Failed to read a websocket message."); + return; + } + + try + { + await OnResponseReceived(data); + } + catch (Exception ex) + { + _logger.Error(ex, "Failed to execute a websocket message."); + } } - protected abstract Message GenerateMessage(int opcode, T data); protected abstract Task OnResponseReceived(Message? content); - protected abstract Task OnMessageSend(int opcode, string? content); - protected abstract Task OnConnection(); + } + + public class SocketDisconnectionEventArgs : EventArgs + { + public string Status { get; } + public string Reason { get; } + + public SocketDisconnectionEventArgs(string status, string reason) + { + Status = status; + Reason = reason; + } } } \ No newline at end of file diff --git a/Backoff/ExponentialBackoff.cs b/Backoff/ExponentialBackoff.cs new file mode 100644 index 0000000..332bb5a --- /dev/null +++ b/Backoff/ExponentialBackoff.cs @@ -0,0 +1,32 @@ +namespace CommonSocketLibrary.Backoff +{ + public class ExponentialBackoff : IBackoff + { + private int _initial; + private int _current; + private int _maximum; + + + public ExponentialBackoff(int initial, int maximum) + { + if (maximum < initial) + throw new InvalidOperationException("Initial backoff cannot be larger than maximum backoff."); + + _initial = initial; + _maximum = maximum; + Reset(); + } + + + public TimeSpan GetNextDelay() + { + _current = Math.Min(_current * 2, _maximum); + return TimeSpan.FromMilliseconds(_current); + } + + public void Reset() + { + _current = _initial / 2; + } + } +} \ No newline at end of file diff --git a/Backoff/IBackoff.cs b/Backoff/IBackoff.cs new file mode 100644 index 0000000..894a845 --- /dev/null +++ b/Backoff/IBackoff.cs @@ -0,0 +1,8 @@ +namespace CommonSocketLibrary.Backoff +{ + public interface IBackoff + { + TimeSpan GetNextDelay(); + void Reset(); + } +} \ No newline at end of file diff --git a/Common/IWebSocketHandler.cs b/Common/IWebSocketHandler.cs index 9518926..18bff12 100644 --- a/Common/IWebSocketHandler.cs +++ b/Common/IWebSocketHandler.cs @@ -2,9 +2,8 @@ using CommonSocketLibrary.Abstract; namespace CommonSocketLibrary.Common { - public interface IWebSocketHandler + public interface IWebSocketHandler : ICodedOperation { - int OperationCode { get; } Task Execute(SocketClient sender, Data data); } } \ No newline at end of file diff --git a/Common/WebSocketClient.cs b/Common/WebSocketClient.cs index ea85368..2fe6ddf 100644 --- a/Common/WebSocketClient.cs +++ b/Common/WebSocketClient.cs @@ -1,4 +1,6 @@ -using System.Text.Json; +using System.Net.WebSockets; +using System.Text; +using System.Text.Json; using CommonSocketLibrary.Abstract; using Serilog; @@ -6,21 +8,22 @@ namespace CommonSocketLibrary.Common { public class WebSocketClient : SocketClient { - private readonly HandlerManager _handlerManager; - private readonly HandlerTypeManager _handlerTypeManager; + protected IDictionary _handlers; + private readonly MessageTypeManager _messageTypeManager; public WebSocketClient( - ILogger logger, - HandlerManager handlerManager, - HandlerTypeManager typeManager, - JsonSerializerOptions serializerOptions + IEnumerable handlers, + MessageTypeManager typeManager, + JsonSerializerOptions serializerOptions, + ILogger logger ) : base(logger, serializerOptions) { - _handlerManager = handlerManager; - _handlerTypeManager = typeManager; + _handlers = handlers.ToDictionary(h => h.OperationCode, h => h); + _messageTypeManager = typeManager; } - protected override WebSocketMessage GenerateMessage(int opcode, T data) + + protected WebSocketMessage GenerateMessage(int opcode, T data) { return new WebSocketMessage() { @@ -29,28 +32,61 @@ namespace CommonSocketLibrary.Common }; } - protected override async Task OnResponseReceived(WebSocketMessage? data) + protected override async Task OnResponseReceived(WebSocketMessage? message) { - if (data == null) + if (message == null) return; - string content = data.Data?.ToString() ?? string.Empty; - _logger.Verbose("RX #" + data.OpCode + ": " + content); + string content = message.Data?.ToString() ?? string.Empty; + _logger.Verbose("RX #" + message.OpCode + ": " + content); - if (!_handlerTypeManager.HandlerTypes.TryGetValue(data.OpCode, out Type? type) || type == null) + var type = _messageTypeManager.GetMessageTypeByCode(message.OpCode); + if (type == null) + { return; + } - var obj = JsonSerializer.Deserialize(content, type, _options); - await _handlerManager.Execute(this, data.OpCode, obj); + var data = JsonSerializer.Deserialize(content, type, _options); + if (!_handlers.TryGetValue(message.OpCode, out IWebSocketHandler? handler) || handler == null) + { + return; + } + + await handler.Execute(this, data); } - - protected override async Task OnMessageSend(int opcode, string? content) + + public async Task Send(int opcode, T data) { - _logger.Verbose("TX #" + opcode + ": " + content); - } + if (_socket == null) + return; + + try + { + var message = GenerateMessage(opcode, data); + var content = JsonSerializer.Serialize(message, _options); - protected override async Task OnConnection() - { + var bytes = Encoding.UTF8.GetBytes(content); + var array = new ArraySegment(bytes); + var total = bytes.Length; + var current = 0; + + while (current < total) + { + var size = Encoding.UTF8.GetBytes(content.Substring(current), array); + await _socket!.SendAsync(array, WebSocketMessageType.Text, current + size >= total, _cts!.Token); + current += size; + } + _logger.Verbose("TX #" + opcode + ": " + content); + } + catch (Exception e) + { + if (_socket.State.ToString().Contains("Close") || _socket.State == WebSocketState.Aborted) + { + await DisconnectAsync(new SocketDisconnectionEventArgs(_socket.CloseStatus.ToString()!, _socket.CloseStatusDescription ?? string.Empty)); + _logger.Warning($"Socket state on closing = {_socket.State} | {_socket.CloseStatus?.ToString()} | {_socket.CloseStatusDescription}"); + } + _logger.Error(e, $"Failed to send a websocket message [op code: {opcode}]"); + } } } } \ No newline at end of file diff --git a/Common/WebSocketHandlerTypeManager.cs b/Common/WebSocketHandlerTypeManager.cs index 4f3354a..761e12f 100644 --- a/Common/WebSocketHandlerTypeManager.cs +++ b/Common/WebSocketHandlerTypeManager.cs @@ -4,9 +4,9 @@ using Serilog; namespace CommonSocketLibrary.Socket.Manager { - public abstract class WebSocketHandlerTypeManager : HandlerTypeManager + public abstract class WebSocketMessageTypeManager : MessageTypeManager { - public WebSocketHandlerTypeManager(ILogger logger, HandlerManager handlers) : base(logger, handlers) + public WebSocketMessageTypeManager(IEnumerable handlers, ILogger logger) : base(handlers, logger) { } diff --git a/CommonSocketLibrary.csproj b/CommonSocketLibrary.csproj index aebdfd9..7ad5d13 100644 --- a/CommonSocketLibrary.csproj +++ b/CommonSocketLibrary.csproj @@ -13,13 +13,8 @@ - - - - -