using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using UniVerse.Api.BackgroundServices; using UniVerse.Api.Options; using UniVerse.Application.Interfaces; using UniVerse.Infrastructure.Data; using Xunit; namespace UniVerse.Api.Tests.Reviews; public class ReviewAnalysisWorkerTests { [Theory] [InlineData(1)] [InlineData(2)] public async Task Worker_DoesNotExceedConfiguredConcurrency(int maxConcurrentProcessing) { var queue = new ReviewAnalysisQueue(); var analysisService = new RecordingLlmAnalysisService(); await using var provider = CreateServiceProvider(analysisService); var worker = new ReviewAnalysisWorker( provider, queue, Microsoft.Extensions.Options.Options.Create( new ReviewAnalysisOptions { MaxConcurrentProcessing = maxConcurrentProcessing }), NullLogger.Instance); for (var reviewId = 1; reviewId <= 6; reviewId++) await queue.EnqueueAsync(reviewId); analysisService.ExpectProcessed(6); await worker.StartAsync(CancellationToken.None); await analysisService.WaitForProcessedAsync(); await worker.StopAsync(CancellationToken.None); Assert.True( analysisService.MaxRunning <= maxConcurrentProcessing, $"Expected at most {maxConcurrentProcessing} concurrent analyses, got {analysisService.MaxRunning}."); } private static ServiceProvider CreateServiceProvider(ILlmAnalysisService analysisService) { var services = new ServiceCollection(); services.AddDbContext(options => options.UseInMemoryDatabase($"ReviewAnalysisWorkerTests_{Guid.NewGuid()}")); services.AddScoped(_ => analysisService); return services.BuildServiceProvider(); } private sealed class RecordingLlmAnalysisService : ILlmAnalysisService { private readonly TaskCompletionSource _processedAll = new(TaskCreationOptions.RunContinuationsAsynchronously); private int _expectedCount; private int _processedCount; private int _running; private int _maxRunning; public int MaxRunning => _maxRunning; public void ExpectProcessed(int expectedCount) { Volatile.Write(ref _expectedCount, expectedCount); } public async Task AnalyzeReviewAsync(int reviewId) { var running = Interlocked.Increment(ref _running); UpdateMaxRunning(running); await Task.Delay(50); Interlocked.Decrement(ref _running); if (Interlocked.Increment(ref _processedCount) >= Volatile.Read(ref _expectedCount)) _processedAll.TrySetResult(); } public async Task WaitForProcessedAsync() { using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5)); using var registration = timeout.Token.Register(() => _processedAll.TrySetCanceled(timeout.Token)); await _processedAll.Task; } private void UpdateMaxRunning(int running) { while (true) { var current = Volatile.Read(ref _maxRunning); if (running <= current) return; if (Interlocked.CompareExchange(ref _maxRunning, running, current) == current) return; } } } }