8ac593d36f
Backend CI / build-and-test (push) Failing after 14m19s
🚀 Create and publish a Docker image / Detect changes in backend and frontend (push) Failing after 12m5s
Frontend CI / build-and-check (push) Failing after 17m58s
🚀 Create and publish a Docker image / Build & publish frontend image (push) Failing after 10m11s
🚀 Create and publish a Docker image / Build & publish backend image (push) Failing after 11m3s
🚀 Create and publish a Docker image / Update stack on Portainer (push) Failing after 14m58s
97 lines
3.3 KiB
C#
97 lines
3.3 KiB
C#
using Microsoft.EntityFrameworkCore;
|
|
using Microsoft.Extensions.Options;
|
|
using UniVerse.Api.Options;
|
|
using UniVerse.Application.Interfaces;
|
|
using UniVerse.Domain.Enums;
|
|
using UniVerse.Infrastructure.Data;
|
|
|
|
namespace UniVerse.Api.BackgroundServices;
|
|
|
|
public sealed class ReviewAnalysisWorker : BackgroundService
|
|
{
|
|
private readonly IServiceProvider _services;
|
|
private readonly ReviewAnalysisQueue _queue;
|
|
private readonly ReviewAnalysisOptions _options;
|
|
private readonly ILogger<ReviewAnalysisWorker> _logger;
|
|
|
|
public ReviewAnalysisWorker(
|
|
IServiceProvider services,
|
|
ReviewAnalysisQueue queue,
|
|
IOptions<ReviewAnalysisOptions> options,
|
|
ILogger<ReviewAnalysisWorker> logger)
|
|
{
|
|
_services = services;
|
|
_queue = queue;
|
|
_options = options.Value;
|
|
_logger = logger;
|
|
}
|
|
|
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
|
{
|
|
var maxConcurrency = Math.Max(1, _options.MaxConcurrentProcessing);
|
|
_logger.LogInformation(
|
|
"Review analysis worker started with max concurrency {MaxConcurrency}",
|
|
maxConcurrency);
|
|
|
|
await EnqueueExistingPendingReviewsAsync(stoppingToken);
|
|
|
|
var workers = Enumerable.Range(1, maxConcurrency)
|
|
.Select(workerNumber => ProcessQueueAsync(workerNumber, stoppingToken))
|
|
.ToArray();
|
|
|
|
try
|
|
{
|
|
await Task.WhenAll(workers);
|
|
}
|
|
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
|
{
|
|
_logger.LogInformation("Review analysis worker stopped");
|
|
}
|
|
}
|
|
|
|
private async Task EnqueueExistingPendingReviewsAsync(CancellationToken cancellationToken)
|
|
{
|
|
using var scope = _services.CreateScope();
|
|
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
|
|
|
|
var pendingReviewIds = await db.Reviews
|
|
.Where(r => r.LlmStatus == ReviewLlmStatus.Pending)
|
|
.OrderBy(r => r.CreatedAt)
|
|
.Select(r => r.Id)
|
|
.ToListAsync(cancellationToken);
|
|
|
|
foreach (var reviewId in pendingReviewIds)
|
|
await _queue.EnqueueAsync(reviewId, cancellationToken);
|
|
|
|
if (pendingReviewIds.Count > 0)
|
|
_logger.LogInformation(
|
|
"Queued {ReviewCount} pending reviews for immediate analysis",
|
|
pendingReviewIds.Count);
|
|
}
|
|
|
|
private async Task ProcessQueueAsync(int workerNumber, CancellationToken cancellationToken)
|
|
{
|
|
await foreach (var reviewId in _queue.ReadAllAsync(cancellationToken))
|
|
{
|
|
try
|
|
{
|
|
using var scope = _services.CreateScope();
|
|
var llmService = scope.ServiceProvider.GetRequiredService<ILlmAnalysisService>();
|
|
await llmService.AnalyzeReviewAsync(reviewId);
|
|
}
|
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
{
|
|
throw;
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
_logger.LogError(
|
|
ex,
|
|
"Review analysis worker {WorkerNumber} failed to process review {ReviewId}",
|
|
workerNumber,
|
|
reviewId);
|
|
}
|
|
}
|
|
}
|
|
}
|