diff --git a/Chat/Observers/TTSConsumer.cs b/Chat/Observers/TTSConsumer.cs new file mode 100644 index 0000000..e3721db --- /dev/null +++ b/Chat/Observers/TTSConsumer.cs @@ -0,0 +1,41 @@ +using Serilog; +using TwitchChatTTS.Chat.Speech; + +namespace TwitchChatTTS.Chat.Observers +{ + public class TTSConsumer : IObserver + { + private readonly TTSEngine _engine; + private readonly ILogger _logger; + + private IDisposable? _cancellation; + + public TTSConsumer(TTSEngine engine, ILogger logger) + { + _engine = engine; + _logger = logger; + } + + public virtual void Subscribe(TTSPublisher provider) => + _cancellation = provider.Subscribe(this); + + public virtual void Unsubscribe() + { + _cancellation?.Dispose(); + } + + public void OnCompleted() + { + } + + public void OnError(Exception error) + { + _logger.Error(error, "An error happened while observing for TTS messages."); + } + + public void OnNext(TTSGroupedMessage value) + { + _engine.PlayerSource?.Cancel(); + } + } +} \ No newline at end of file diff --git a/Chat/Observers/TTSPublisher.cs b/Chat/Observers/TTSPublisher.cs new file mode 100644 index 0000000..fe81125 --- /dev/null +++ b/Chat/Observers/TTSPublisher.cs @@ -0,0 +1,40 @@ +using TwitchChatTTS.Chat.Speech; + +namespace TwitchChatTTS.Chat.Observers +{ + public class TTSPublisher : IObservable + { + private readonly HashSet> _observers; + private readonly HashSet _messages; + + + public TTSPublisher() + { + _observers = new(); + _messages = new(); + } + + + public IDisposable Subscribe(IObserver observer) + { + if (_observers.Add(observer)) + { + foreach (var item in _messages) + observer.OnNext(item); + } + + return new Unsubscriber(_observers, observer); + } + } + + internal sealed class Unsubscriber : IDisposable + { + private readonly ISet> _observers; + private readonly IObserver _observer; + + internal Unsubscriber(ISet> observers, IObserver observer) + => (_observers, _observer) = (observers, observer); + + public void Dispose() => _observers.Remove(_observer); + } +} \ No newline at end of file diff --git a/Startup.cs b/Startup.cs index 9c445e4..d5569ec 100644 --- a/Startup.cs +++ b/Startup.cs @@ -31,6 +31,7 @@ using TwitchChatTTS.Twitch.Socket.Handlers; using CommonSocketLibrary.Backoff; using TwitchChatTTS.Chat.Speech; using TwitchChatTTS.Chat.Messaging; +using TwitchChatTTS.Chat.Observers; // dotnet publish -r linux-x64 -p:PublishSingleFile=true --self-contained true // dotnet publish -r win-x64 -p:PublishSingleFile=true --self-contained true @@ -90,6 +91,9 @@ s.AddSingleton(); s.AddSingleton(); s.AddSingleton(); +s.AddSingleton(); +s.AddSingleton(); + // OBS websocket s.AddKeyedSingleton("obs"); s.AddKeyedSingleton("obs"); @@ -155,8 +159,10 @@ s.AddKeyedSingleton("hermes"); s.AddKeyedSingleton, HermesMessageTypeManager>("hermes"); s.AddKeyedSingleton, HermesSocketClient>("hermes"); -s.AddHostedService(); +s.AddSingleton(); + s.AddHostedService(); -s.AddHostedService(); +s.AddHostedService(p => p.GetRequiredService()); +s.AddHostedService(); using IHost host = builder.Build(); await host.RunAsync(); \ No newline at end of file diff --git a/TTSEngine.cs b/TTSEngine.cs index b993470..a95928c 100644 --- a/TTSEngine.cs +++ b/TTSEngine.cs @@ -1,8 +1,4 @@ -using System.Runtime.InteropServices; -using System.Web; using Microsoft.Extensions.Hosting; -using NAudio.Wave; -using NAudio.Wave.SampleProviders; using Serilog; using TwitchChatTTS.Chat.Speech; @@ -14,6 +10,8 @@ namespace TwitchChatTTS private readonly TTSPlayer _player; private readonly ILogger _logger; + public CancellationTokenSource? PlayerSource; + public TTSEngine(AudioPlaybackEngine playback, TTSPlayer player, ILogger logger) { @@ -27,6 +25,7 @@ namespace TwitchChatTTS { Task.Run(async () => { + PlayerSource = new CancellationTokenSource(); while (true) { try @@ -36,12 +35,21 @@ namespace TwitchChatTTS _logger.Warning("TTS Engine - Cancellation requested."); return; } - while (_player.IsEmpty() || _player.Playing != null) + if (_player.IsEmpty()) { - await Task.Delay(200, cancellationToken); - continue; + try + { + PlayerSource.Token.WaitHandle.WaitOne(); + } + catch (Exception) { } } + while (_player.Playing != null) + { + await Task.Delay(100); + } + + PlayerSource = new CancellationTokenSource(); var messageData = _player.ReceiveReady(); if (messageData == null) continue; diff --git a/TTSListening.cs b/TTSListening.cs index bd57610..d4a386e 100644 --- a/TTSListening.cs +++ b/TTSListening.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Hosting; using NAudio.Wave; using NAudio.Wave.SampleProviders; using Serilog; +using TwitchChatTTS.Chat.Observers; using TwitchChatTTS.Chat.Speech; namespace TwitchChatTTS @@ -12,13 +13,17 @@ namespace TwitchChatTTS { private readonly AudioPlaybackEngine _playback; private readonly TTSPlayer _player; + private readonly TTSConsumer _consumer; + private readonly IDisposable _subscription; private readonly ILogger _logger; - public TTSListening(AudioPlaybackEngine playback, TTSPlayer player, ILogger logger) + public TTSListening(AudioPlaybackEngine playback, TTSPlayer player, TTSPublisher publisher, TTSConsumer consumer, ILogger logger) { _playback = playback; _player = player; + _consumer = consumer; + _subscription = publisher.Subscribe(consumer); _logger = logger; } @@ -67,6 +72,7 @@ namespace TwitchChatTTS public Task StopAsync(CancellationToken cancellationToken) { + _subscription?.Dispose(); return Task.CompletedTask; } @@ -111,6 +117,7 @@ namespace TwitchChatTTS var merged = new ConcatenatingSampleProvider(list); group.Audio = merged; _player.Ready(group); + _consumer.OnNext(group); }); } }