Added observable & observer for TTS, to react faster to new messages when waiting.

This commit is contained in:
Tom 2024-08-12 17:49:36 +00:00
parent f0071cae81
commit 6810132dde
5 changed files with 112 additions and 10 deletions

View File

@ -0,0 +1,41 @@
using Serilog;
using TwitchChatTTS.Chat.Speech;
namespace TwitchChatTTS.Chat.Observers
{
public class TTSConsumer : IObserver<TTSGroupedMessage>
{
private readonly TTSEngine _engine;
private readonly ILogger _logger;
private IDisposable? _cancellation;
public TTSConsumer(TTSEngine engine, ILogger logger)
{
_engine = engine;
_logger = logger;
}
public virtual void Subscribe(TTSPublisher provider) =>
_cancellation = provider.Subscribe(this);
public virtual void Unsubscribe()
{
_cancellation?.Dispose();
}
public void OnCompleted()
{
}
public void OnError(Exception error)
{
_logger.Error(error, "An error happened while observing for TTS messages.");
}
public void OnNext(TTSGroupedMessage value)
{
_engine.PlayerSource?.Cancel();
}
}
}

View File

@ -0,0 +1,40 @@
using TwitchChatTTS.Chat.Speech;
namespace TwitchChatTTS.Chat.Observers
{
public class TTSPublisher : IObservable<TTSGroupedMessage>
{
private readonly HashSet<IObserver<TTSGroupedMessage>> _observers;
private readonly HashSet<TTSGroupedMessage> _messages;
public TTSPublisher()
{
_observers = new();
_messages = new();
}
public IDisposable Subscribe(IObserver<TTSGroupedMessage> observer)
{
if (_observers.Add(observer))
{
foreach (var item in _messages)
observer.OnNext(item);
}
return new Unsubscriber<TTSGroupedMessage>(_observers, observer);
}
}
internal sealed class Unsubscriber<T> : IDisposable
{
private readonly ISet<IObserver<T>> _observers;
private readonly IObserver<T> _observer;
internal Unsubscriber(ISet<IObserver<T>> observers, IObserver<T> observer)
=> (_observers, _observer) = (observers, observer);
public void Dispose() => _observers.Remove(_observer);
}
}

View File

@ -31,6 +31,7 @@ using TwitchChatTTS.Twitch.Socket.Handlers;
using CommonSocketLibrary.Backoff;
using TwitchChatTTS.Chat.Speech;
using TwitchChatTTS.Chat.Messaging;
using TwitchChatTTS.Chat.Observers;
// dotnet publish -r linux-x64 -p:PublishSingleFile=true --self-contained true
// dotnet publish -r win-x64 -p:PublishSingleFile=true --self-contained true
@ -90,6 +91,9 @@ s.AddSingleton<TwitchApiClient>();
s.AddSingleton<SevenApiClient>();
s.AddSingleton<IEmoteDatabase, EmoteDatabase>();
s.AddSingleton<TTSConsumer>();
s.AddSingleton<TTSPublisher>();
// OBS websocket
s.AddKeyedSingleton<IWebSocketHandler, HelloHandler>("obs");
s.AddKeyedSingleton<IWebSocketHandler, IdentifiedHandler>("obs");
@ -155,8 +159,10 @@ s.AddKeyedSingleton<IWebSocketHandler, RequestAckHandler>("hermes");
s.AddKeyedSingleton<MessageTypeManager<IWebSocketHandler>, HermesMessageTypeManager>("hermes");
s.AddKeyedSingleton<SocketClient<WebSocketMessage>, HermesSocketClient>("hermes");
s.AddHostedService<TTS>();
s.AddSingleton<TTSEngine>();
s.AddHostedService<TTSListening>();
s.AddHostedService<TTSEngine>();
s.AddHostedService<TTSEngine>(p => p.GetRequiredService<TTSEngine>());
s.AddHostedService<TTS>();
using IHost host = builder.Build();
await host.RunAsync();

View File

@ -1,8 +1,4 @@
using System.Runtime.InteropServices;
using System.Web;
using Microsoft.Extensions.Hosting;
using NAudio.Wave;
using NAudio.Wave.SampleProviders;
using Serilog;
using TwitchChatTTS.Chat.Speech;
@ -14,6 +10,8 @@ namespace TwitchChatTTS
private readonly TTSPlayer _player;
private readonly ILogger _logger;
public CancellationTokenSource? PlayerSource;
public TTSEngine(AudioPlaybackEngine playback, TTSPlayer player, ILogger logger)
{
@ -27,6 +25,7 @@ namespace TwitchChatTTS
{
Task.Run(async () =>
{
PlayerSource = new CancellationTokenSource();
while (true)
{
try
@ -36,12 +35,21 @@ namespace TwitchChatTTS
_logger.Warning("TTS Engine - Cancellation requested.");
return;
}
while (_player.IsEmpty() || _player.Playing != null)
if (_player.IsEmpty())
{
await Task.Delay(200, cancellationToken);
continue;
try
{
PlayerSource.Token.WaitHandle.WaitOne();
}
catch (Exception) { }
}
while (_player.Playing != null)
{
await Task.Delay(100);
}
PlayerSource = new CancellationTokenSource();
var messageData = _player.ReceiveReady();
if (messageData == null)
continue;

View File

@ -4,6 +4,7 @@ using Microsoft.Extensions.Hosting;
using NAudio.Wave;
using NAudio.Wave.SampleProviders;
using Serilog;
using TwitchChatTTS.Chat.Observers;
using TwitchChatTTS.Chat.Speech;
namespace TwitchChatTTS
@ -12,13 +13,17 @@ namespace TwitchChatTTS
{
private readonly AudioPlaybackEngine _playback;
private readonly TTSPlayer _player;
private readonly TTSConsumer _consumer;
private readonly IDisposable _subscription;
private readonly ILogger _logger;
public TTSListening(AudioPlaybackEngine playback, TTSPlayer player, ILogger logger)
public TTSListening(AudioPlaybackEngine playback, TTSPlayer player, TTSPublisher publisher, TTSConsumer consumer, ILogger logger)
{
_playback = playback;
_player = player;
_consumer = consumer;
_subscription = publisher.Subscribe(consumer);
_logger = logger;
}
@ -67,6 +72,7 @@ namespace TwitchChatTTS
public Task StopAsync(CancellationToken cancellationToken)
{
_subscription?.Dispose();
return Task.CompletedTask;
}
@ -111,6 +117,7 @@ namespace TwitchChatTTS
var merged = new ConcatenatingSampleProvider(list);
group.Audio = merged;
_player.Ready(group);
_consumer.OnNext(group);
});
}
}