hermes-client/Bus/ServiceBusCentral.cs

83 lines
2.5 KiB
C#

using System.Collections.Immutable;
using Serilog;
namespace TwitchChatTTS.Bus
{
public class ServiceBusCentral
{
private readonly IDictionary<string, ServiceBusObservable> _topics;
private readonly IDictionary<string, ISet<IObserver<ServiceBusData>>> _receivers;
private readonly ILogger _logger;
private readonly object _lock;
public ServiceBusCentral(ILogger logger)
{
_topics = new Dictionary<string, ServiceBusObservable>();
_receivers = new Dictionary<string, ISet<IObserver<ServiceBusData>>>();
_logger = logger;
_lock = new object();
}
public void Add(string topic, IObserver<ServiceBusData> observer)
{
lock (_lock)
{
if (!_receivers.TryGetValue(topic, out var observers))
{
observers = new HashSet<IObserver<ServiceBusData>>();
_receivers.Add(topic, observers);
}
observers.Add(observer);
}
}
public ServiceBusObservable GetTopic(string topic)
{
lock (_lock)
{
if (!_topics.TryGetValue(topic, out var bus))
{
bus = new ServiceBusObservable(topic, this, _logger);
_topics.Add(topic, bus);
}
return bus;
}
}
public IEnumerable<IObserver<ServiceBusData>> GetObservers(string topic)
{
lock (_lock)
{
if (_receivers.TryGetValue(topic, out var observers))
return observers.ToImmutableArray();
}
return [];
}
public bool RemoveObserver(string topic, IObserver<ServiceBusData> observer)
{
lock (_lock)
{
if (_receivers.TryGetValue(topic, out var observers))
return observers.Remove(observer);
}
return false;
}
public void Send(object sender, string topic, object value)
{
var observers = GetObservers(topic);
foreach (var consumer in observers)
{
try
{
consumer.OnNext(new ServiceBusData(sender, topic, value));
}
catch (Exception ex)
{
_logger.Error(ex, "Failed to execute observer on send.");
}
}
}
}
}