diff --git a/Bus/ServiceBusCentral.cs b/Bus/ServiceBusCentral.cs index 832b0f5..170e7ee 100644 --- a/Bus/ServiceBusCentral.cs +++ b/Bus/ServiceBusCentral.cs @@ -37,7 +37,7 @@ namespace TwitchChatTTS.Bus { if (!_topics.TryGetValue(topic, out var bus)) { - bus = new ServiceBusObservable(topic, this); + bus = new ServiceBusObservable(topic, this, _logger); _topics.Add(topic, bus); } return bus; diff --git a/Bus/ServiceBusObservable.cs b/Bus/ServiceBusObservable.cs index a1b677a..486210e 100644 --- a/Bus/ServiceBusObservable.cs +++ b/Bus/ServiceBusObservable.cs @@ -1,4 +1,5 @@ using System.Reactive; +using Serilog; namespace TwitchChatTTS.Bus { @@ -6,11 +7,13 @@ namespace TwitchChatTTS.Bus { private readonly string _topic; private readonly ServiceBusCentral _central; + private readonly ILogger _logger; - public ServiceBusObservable(string topic, ServiceBusCentral central) + public ServiceBusObservable(string topic, ServiceBusCentral central, ILogger logger) { _topic = topic; _central = central; + _logger = logger; } protected override IDisposable SubscribeCore(IObserver observer) @@ -19,6 +22,10 @@ namespace TwitchChatTTS.Bus return new ServiceBusUnsubscriber(_topic, _central, observer); } + public IDisposable Subscribe(Action action) { + return Subscribe(new ServiceBusObserver(action, _logger)); + } + private sealed class ServiceBusUnsubscriber : IDisposable { private readonly string _topic; diff --git a/Hermes/Socket/HermesSocketClient.cs b/Hermes/Socket/HermesSocketClient.cs index 3c20670..f4bdd90 100644 --- a/Hermes/Socket/HermesSocketClient.cs +++ b/Hermes/Socket/HermesSocketClient.cs @@ -66,18 +66,18 @@ namespace TwitchChatTTS.Hermes.Socket _lock = new object(); var ttsCreateUserVoice = _bus.GetTopic("tts.user.voice.create"); - ttsCreateUserVoice.Subscribe(new ServiceBusObserver(async data => await Send(3, new RequestMessage() + ttsCreateUserVoice.Subscribe(async data => await Send(3, new RequestMessage() { Type = "create_tts_user", Data = (IDictionary) data.Value! - }), logger)); + })); var ttsUpdateUserVoice = _bus.GetTopic("tts.user.voice.update"); - ttsUpdateUserVoice.Subscribe(new ServiceBusObserver(async data => await Send(3, new RequestMessage() + ttsUpdateUserVoice.Subscribe(async data => await Send(3, new RequestMessage() { Type = "update_tts_user", Data = (IDictionary) data.Value! - }), logger)); + })); } diff --git a/Twitch/Redemptions/RedemptionManager.cs b/Twitch/Redemptions/RedemptionManager.cs index 77e6a25..78361b7 100644 --- a/Twitch/Redemptions/RedemptionManager.cs +++ b/Twitch/Redemptions/RedemptionManager.cs @@ -53,7 +53,7 @@ namespace TwitchChatTTS.Twitch.Redemptions _lock = new object(); var topic = _bus.GetTopic("redemptions_initiation"); - topic.Subscribe(new ServiceBusObserver(data => + topic.Subscribe(data => { if (data.Value is not RedemptionInitiation init) return; @@ -76,7 +76,7 @@ namespace TwitchChatTTS.Twitch.Redemptions Add(redemption); Initialize(); - }, _logger)); + }); } public void Add(RedeemableAction action)