using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Options; using UniVerse.Api.Options; using UniVerse.Application.Interfaces; using UniVerse.Domain.Enums; using UniVerse.Infrastructure.Data; namespace UniVerse.Api.BackgroundServices; public sealed class ReviewAnalysisWorker : BackgroundService { private readonly IServiceProvider _services; private readonly ReviewAnalysisQueue _queue; private readonly ReviewAnalysisOptions _options; private readonly ILogger _logger; public ReviewAnalysisWorker( IServiceProvider services, ReviewAnalysisQueue queue, IOptions options, ILogger logger) { _services = services; _queue = queue; _options = options.Value; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var maxConcurrency = Math.Max(1, _options.MaxConcurrentProcessing); _logger.LogInformation( "Review analysis worker started with max concurrency {MaxConcurrency}", maxConcurrency); await EnqueueExistingPendingReviewsAsync(stoppingToken); var workers = Enumerable.Range(1, maxConcurrency) .Select(workerNumber => ProcessQueueAsync(workerNumber, stoppingToken)) .ToArray(); try { await Task.WhenAll(workers); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { _logger.LogInformation("Review analysis worker stopped"); } } private async Task EnqueueExistingPendingReviewsAsync(CancellationToken cancellationToken) { using var scope = _services.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); var pendingReviewIds = await db.Reviews .Where(r => r.LlmStatus == ReviewLlmStatus.Pending) .OrderBy(r => r.CreatedAt) .Select(r => r.Id) .ToListAsync(cancellationToken); foreach (var reviewId in pendingReviewIds) await _queue.EnqueueAsync(reviewId, cancellationToken); if (pendingReviewIds.Count > 0) _logger.LogInformation( "Queued {ReviewCount} pending reviews for immediate analysis", pendingReviewIds.Count); } private async Task ProcessQueueAsync(int workerNumber, CancellationToken cancellationToken) { await foreach (var reviewId in _queue.ReadAllAsync(cancellationToken)) { try { using var scope = _services.CreateScope(); var llmService = scope.ServiceProvider.GetRequiredService(); await llmService.AnalyzeReviewAsync(reviewId); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { throw; } catch (Exception ex) { _logger.LogError( ex, "Review analysis worker {WorkerNumber} failed to process review {ReviewId}", workerNumber, reviewId); } } } }