8ac593d36f
Backend CI / build-and-test (push) Failing after 14m19s
🚀 Create and publish a Docker image / Detect changes in backend and frontend (push) Failing after 12m5s
Frontend CI / build-and-check (push) Failing after 17m58s
🚀 Create and publish a Docker image / Build & publish frontend image (push) Failing after 10m11s
🚀 Create and publish a Docker image / Build & publish backend image (push) Failing after 11m3s
🚀 Create and publish a Docker image / Update stack on Portainer (push) Failing after 14m58s
97 lines
3.5 KiB
C#
97 lines
3.5 KiB
C#
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Microsoft.Extensions.Logging.Abstractions;
|
|
using Microsoft.Extensions.Options;
|
|
using UniVerse.Api.BackgroundServices;
|
|
using UniVerse.Api.Options;
|
|
using UniVerse.Application.Interfaces;
|
|
using UniVerse.Infrastructure.Data;
|
|
using Xunit;
|
|
|
|
namespace UniVerse.Api.Tests.Reviews;
|
|
|
|
public class ReviewAnalysisWorkerTests
|
|
{
|
|
[Theory]
|
|
[InlineData(1)]
|
|
[InlineData(2)]
|
|
public async Task Worker_DoesNotExceedConfiguredConcurrency(int maxConcurrentProcessing)
|
|
{
|
|
var queue = new ReviewAnalysisQueue();
|
|
var analysisService = new RecordingLlmAnalysisService();
|
|
await using var provider = CreateServiceProvider(analysisService);
|
|
var worker = new ReviewAnalysisWorker(
|
|
provider,
|
|
queue,
|
|
Microsoft.Extensions.Options.Options.Create(
|
|
new ReviewAnalysisOptions { MaxConcurrentProcessing = maxConcurrentProcessing }),
|
|
NullLogger<ReviewAnalysisWorker>.Instance);
|
|
|
|
for (var reviewId = 1; reviewId <= 6; reviewId++)
|
|
await queue.EnqueueAsync(reviewId);
|
|
|
|
analysisService.ExpectProcessed(6);
|
|
await worker.StartAsync(CancellationToken.None);
|
|
await analysisService.WaitForProcessedAsync();
|
|
await worker.StopAsync(CancellationToken.None);
|
|
|
|
Assert.True(
|
|
analysisService.MaxRunning <= maxConcurrentProcessing,
|
|
$"Expected at most {maxConcurrentProcessing} concurrent analyses, got {analysisService.MaxRunning}.");
|
|
}
|
|
|
|
private static ServiceProvider CreateServiceProvider(ILlmAnalysisService analysisService)
|
|
{
|
|
var services = new ServiceCollection();
|
|
services.AddDbContext<AppDbContext>(options =>
|
|
options.UseInMemoryDatabase($"ReviewAnalysisWorkerTests_{Guid.NewGuid()}"));
|
|
services.AddScoped(_ => analysisService);
|
|
return services.BuildServiceProvider();
|
|
}
|
|
|
|
private sealed class RecordingLlmAnalysisService : ILlmAnalysisService
|
|
{
|
|
private readonly TaskCompletionSource _processedAll = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
private int _expectedCount;
|
|
private int _processedCount;
|
|
private int _running;
|
|
private int _maxRunning;
|
|
|
|
public int MaxRunning => _maxRunning;
|
|
|
|
public void ExpectProcessed(int expectedCount)
|
|
{
|
|
Volatile.Write(ref _expectedCount, expectedCount);
|
|
}
|
|
|
|
public async Task AnalyzeReviewAsync(int reviewId)
|
|
{
|
|
var running = Interlocked.Increment(ref _running);
|
|
UpdateMaxRunning(running);
|
|
|
|
await Task.Delay(50);
|
|
|
|
Interlocked.Decrement(ref _running);
|
|
if (Interlocked.Increment(ref _processedCount) >= Volatile.Read(ref _expectedCount))
|
|
_processedAll.TrySetResult();
|
|
}
|
|
|
|
public async Task WaitForProcessedAsync()
|
|
{
|
|
using var timeout = new CancellationTokenSource(TimeSpan.FromSeconds(5));
|
|
using var registration = timeout.Token.Register(() => _processedAll.TrySetCanceled(timeout.Token));
|
|
await _processedAll.Task;
|
|
}
|
|
|
|
private void UpdateMaxRunning(int running)
|
|
{
|
|
while (true)
|
|
{
|
|
var current = Volatile.Read(ref _maxRunning);
|
|
if (running <= current) return;
|
|
if (Interlocked.CompareExchange(ref _maxRunning, running, current) == current) return;
|
|
}
|
|
}
|
|
}
|
|
}
|