feat: изменил логику анализа отзывов
Backend CI / build-and-test (push) Failing after 14m19s
🚀 Create and publish a Docker image / Detect changes in backend and frontend (push) Failing after 12m5s
Frontend CI / build-and-check (push) Failing after 17m58s
🚀 Create and publish a Docker image / Build & publish frontend image (push) Failing after 10m11s
🚀 Create and publish a Docker image / Build & publish backend image (push) Failing after 11m3s
🚀 Create and publish a Docker image / Update stack on Portainer (push) Failing after 14m58s
Backend CI / build-and-test (push) Failing after 14m19s
🚀 Create and publish a Docker image / Detect changes in backend and frontend (push) Failing after 12m5s
Frontend CI / build-and-check (push) Failing after 17m58s
🚀 Create and publish a Docker image / Build & publish frontend image (push) Failing after 10m11s
🚀 Create and publish a Docker image / Build & publish backend image (push) Failing after 11m3s
🚀 Create and publish a Docker image / Update stack on Portainer (push) Failing after 14m58s
This commit is contained in:
@@ -0,0 +1,96 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Options;
|
||||
using UniVerse.Api.Options;
|
||||
using UniVerse.Application.Interfaces;
|
||||
using UniVerse.Domain.Enums;
|
||||
using UniVerse.Infrastructure.Data;
|
||||
|
||||
namespace UniVerse.Api.BackgroundServices;
|
||||
|
||||
public sealed class ReviewAnalysisWorker : BackgroundService
|
||||
{
|
||||
private readonly IServiceProvider _services;
|
||||
private readonly ReviewAnalysisQueue _queue;
|
||||
private readonly ReviewAnalysisOptions _options;
|
||||
private readonly ILogger<ReviewAnalysisWorker> _logger;
|
||||
|
||||
public ReviewAnalysisWorker(
|
||||
IServiceProvider services,
|
||||
ReviewAnalysisQueue queue,
|
||||
IOptions<ReviewAnalysisOptions> options,
|
||||
ILogger<ReviewAnalysisWorker> logger)
|
||||
{
|
||||
_services = services;
|
||||
_queue = queue;
|
||||
_options = options.Value;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
var maxConcurrency = Math.Max(1, _options.MaxConcurrentProcessing);
|
||||
_logger.LogInformation(
|
||||
"Review analysis worker started with max concurrency {MaxConcurrency}",
|
||||
maxConcurrency);
|
||||
|
||||
await EnqueueExistingPendingReviewsAsync(stoppingToken);
|
||||
|
||||
var workers = Enumerable.Range(1, maxConcurrency)
|
||||
.Select(workerNumber => ProcessQueueAsync(workerNumber, stoppingToken))
|
||||
.ToArray();
|
||||
|
||||
try
|
||||
{
|
||||
await Task.WhenAll(workers);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
_logger.LogInformation("Review analysis worker stopped");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task EnqueueExistingPendingReviewsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
using var scope = _services.CreateScope();
|
||||
var db = scope.ServiceProvider.GetRequiredService<AppDbContext>();
|
||||
|
||||
var pendingReviewIds = await db.Reviews
|
||||
.Where(r => r.LlmStatus == ReviewLlmStatus.Pending)
|
||||
.OrderBy(r => r.CreatedAt)
|
||||
.Select(r => r.Id)
|
||||
.ToListAsync(cancellationToken);
|
||||
|
||||
foreach (var reviewId in pendingReviewIds)
|
||||
await _queue.EnqueueAsync(reviewId, cancellationToken);
|
||||
|
||||
if (pendingReviewIds.Count > 0)
|
||||
_logger.LogInformation(
|
||||
"Queued {ReviewCount} pending reviews for immediate analysis",
|
||||
pendingReviewIds.Count);
|
||||
}
|
||||
|
||||
private async Task ProcessQueueAsync(int workerNumber, CancellationToken cancellationToken)
|
||||
{
|
||||
await foreach (var reviewId in _queue.ReadAllAsync(cancellationToken))
|
||||
{
|
||||
try
|
||||
{
|
||||
using var scope = _services.CreateScope();
|
||||
var llmService = scope.ServiceProvider.GetRequiredService<ILlmAnalysisService>();
|
||||
await llmService.AnalyzeReviewAsync(reviewId);
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(
|
||||
ex,
|
||||
"Review analysis worker {WorkerNumber} failed to process review {ReviewId}",
|
||||
workerNumber,
|
||||
reviewId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user