184 lines
6.1 KiB
C#
184 lines
6.1 KiB
C#
using CommonSocketLibrary.Backoff;
|
|
using Serilog;
|
|
using System.Collections;
|
|
using System.Net.WebSockets;
|
|
using System.Text.Json;
|
|
|
|
namespace CommonSocketLibrary.Abstract
|
|
{
|
|
public abstract class SocketClient<Message> : IDisposable where Message : class
|
|
{
|
|
protected ClientWebSocket? _socket;
|
|
protected CancellationTokenSource? _cts;
|
|
|
|
private readonly int ReceiveBufferSize = 8192;
|
|
protected readonly ILogger _logger;
|
|
protected readonly JsonSerializerOptions _options;
|
|
private bool _disposed;
|
|
|
|
public event EventHandler<EventArgs> OnConnected;
|
|
public event EventHandler<SocketDisconnectionEventArgs> OnDisconnected;
|
|
|
|
|
|
public SocketClient(ILogger logger, JsonSerializerOptions options)
|
|
{
|
|
_logger = logger;
|
|
_options = options;
|
|
_disposed = false;
|
|
}
|
|
|
|
|
|
public abstract Task Connect();
|
|
|
|
protected async Task ConnectAsync(string url)
|
|
{
|
|
if (_socket != null)
|
|
{
|
|
if (_socket.State == WebSocketState.Open) return;
|
|
else if (!_disposed) _socket.Dispose();
|
|
}
|
|
|
|
_socket = new ClientWebSocket();
|
|
_socket.Options.RemoteCertificateValidationCallback = (o, c, ch, er) => true;
|
|
_socket.Options.UseDefaultCredentials = false;
|
|
_disposed = false;
|
|
if (_cts != null) _cts.Dispose();
|
|
_cts = new CancellationTokenSource();
|
|
await _socket.ConnectAsync(new Uri(url), _cts.Token);
|
|
await Task.Factory.StartNew(ReceiveLoop, _cts.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
|
|
OnConnected?.Invoke(this, EventArgs.Empty);
|
|
}
|
|
|
|
public async Task DisconnectAsync(SocketDisconnectionEventArgs args)
|
|
{
|
|
if (_disposed || _socket == null || _cts == null)
|
|
return;
|
|
|
|
if (_socket.State == WebSocketState.Open)
|
|
{
|
|
_cts.CancelAfter(TimeSpan.FromMilliseconds(500));
|
|
await _socket.CloseOutputAsync(WebSocketCloseStatus.Empty, "", CancellationToken.None);
|
|
await _socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
|
|
}
|
|
|
|
OnDisconnected?.Invoke(this, args);
|
|
_socket.Dispose();
|
|
_socket = null;
|
|
_cts.Dispose();
|
|
_cts = null;
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
if (_disposed)
|
|
return;
|
|
|
|
_disposed = true;
|
|
}
|
|
|
|
private async Task ReceiveLoop()
|
|
{
|
|
if (_socket == null || _cts == null) return;
|
|
|
|
var loopToken = _cts.Token;
|
|
MemoryStream? outputStream = null;
|
|
WebSocketReceiveResult? receiveResult = null;
|
|
var buffer = new byte[ReceiveBufferSize];
|
|
try
|
|
{
|
|
while (!loopToken.IsCancellationRequested)
|
|
{
|
|
outputStream = new MemoryStream(ReceiveBufferSize);
|
|
do
|
|
{
|
|
receiveResult = await _socket.ReceiveAsync(buffer, _cts.Token);
|
|
if (receiveResult.MessageType != WebSocketMessageType.Close)
|
|
outputStream.Write(buffer, 0, receiveResult.Count);
|
|
}
|
|
while (!receiveResult.EndOfMessage);
|
|
if (receiveResult.MessageType == WebSocketMessageType.Close) break;
|
|
outputStream.Position = 0;
|
|
await ResponseReceived(outputStream);
|
|
}
|
|
}
|
|
catch (WebSocketException wse)
|
|
{
|
|
string data = string.Join(string.Empty, wse.Data.Cast<DictionaryEntry>().Select(e => e.Key + "=" + e.Value));
|
|
_logger.Error($"Websocket connection problem while receiving data [state: {_socket.State}][code: {wse.ErrorCode}][data: {data}]");
|
|
}
|
|
catch (TaskCanceledException)
|
|
{
|
|
_logger.Error($"Socket's receive loop got canceled forcefully [state: {_socket.State}]");
|
|
}
|
|
finally
|
|
{
|
|
if (_socket.State.ToString().Contains("Close") || _socket.State == WebSocketState.Aborted)
|
|
await DisconnectAsync(new SocketDisconnectionEventArgs(_socket.CloseStatus.ToString()!, _socket.CloseStatusDescription ?? string.Empty));
|
|
}
|
|
}
|
|
|
|
protected async Task Reconnect(IBackoff backoff)
|
|
{
|
|
while (true)
|
|
{
|
|
try
|
|
{
|
|
TimeSpan delay = backoff.GetNextDelay();
|
|
await Task.Delay(delay);
|
|
await Connect();
|
|
backoff.Reset();
|
|
break;
|
|
}
|
|
catch
|
|
{
|
|
_logger.Error("Unable to reconnect to server.");
|
|
}
|
|
}
|
|
}
|
|
|
|
private async Task ResponseReceived(Stream stream)
|
|
{
|
|
Message? data = null;
|
|
try
|
|
{
|
|
data = await JsonSerializer.DeserializeAsync<Message>(stream, _options);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.Error(ex, "Failed to read a websocket message.");
|
|
}
|
|
finally
|
|
{
|
|
stream.Dispose();
|
|
}
|
|
if (data == null)
|
|
{
|
|
_logger.Error("Failed to read a websocket message.");
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
await OnResponseReceived(data);
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.Error(ex, "Failed to execute a websocket message.");
|
|
}
|
|
}
|
|
|
|
protected abstract Task OnResponseReceived(Message? content);
|
|
}
|
|
|
|
public class SocketDisconnectionEventArgs : EventArgs
|
|
{
|
|
public string Status { get; }
|
|
public string Reason { get; }
|
|
|
|
public SocketDisconnectionEventArgs(string status, string reason)
|
|
{
|
|
Status = status;
|
|
Reason = reason;
|
|
}
|
|
}
|
|
} |