using System.Text.Json; using CvMatcher.Models.Requests; using CvSearch.Data; using CvSearch.Data.Entities; using CvMatcher.Models.Settings; using CvSearchJob.Clients; using CvSearchJob.Services; using JobScheduler.Tasks; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using PageFetcher.Models; namespace CvSearchJob.Tasks; /// /// Background job task that processes pending job search sessions: scrapes providers, /// scores each URL against the CV via the matcher API, persists results, and sends the results email. /// public sealed class CvSearchJobTask : IJobTask { private readonly IServiceScopeFactory _scopeFactory; private readonly JobSearchSettings _settings; private readonly HtmlJobSearcher _searcher; private readonly ICvMatcherInternalApi _matcherApi; private readonly IPageFetcherApiClient _pageFetcher; private readonly CvSearchEmailSender _emailSender; private readonly ILogger _logger; public string TaskType => "CvSearch"; public CvSearchJobTask( IServiceScopeFactory scopeFactory, IOptions settings, HtmlJobSearcher searcher, ICvMatcherInternalApi matcherApi, IPageFetcherApiClient pageFetcher, CvSearchEmailSender emailSender, ILogger logger) { _scopeFactory = scopeFactory; _settings = settings.Value; _searcher = searcher; _matcherApi = matcherApi; _pageFetcher = pageFetcher; _emailSender = emailSender; _logger = logger; } /// /// Called by the scheduler on each tick. Resets orphaned sessions, picks the oldest pending session, /// runs the full search pipeline, and sends the results email. /// Does nothing when JobSearch:Enabled is false. /// public async Task ExecuteAsync(IConfiguration parametersSection, CancellationToken cancellationToken) { if (!_settings.Enabled) return; using var scope = _scopeFactory.CreateScope(); var db = scope.ServiceProvider.GetRequiredService(); // Recover orphaned Processing sessions (container crashed mid-run) var stuckCutoff = DateTime.UtcNow.AddMinutes(-10); var stuckSessions = await db.JobSearchSessions .Where(s => s.Status == JobSearchStatus.Processing && s.CreatedAt < stuckCutoff) .ToListAsync(cancellationToken); foreach (var stuck in stuckSessions) { stuck.Status = JobSearchStatus.Pending; _logger.LogWarning("Reset stuck session {SessionId} back to Pending", stuck.Id); } if (stuckSessions.Count > 0) await db.SaveChangesAsync(cancellationToken); var pending = await db.JobSearchSessions .Where(s => s.Status == JobSearchStatus.Pending) .OrderBy(s => s.CreatedAt) .Take(1) .FirstOrDefaultAsync(cancellationToken); if (pending is null) return; _logger.LogInformation("Processing job search session {SessionId}", pending.Id); pending.Status = JobSearchStatus.Processing; await db.SaveChangesAsync(cancellationToken); try { var cvKeywords = pending.Keywords .Split(',', StringSplitOptions.RemoveEmptyEntries) .Select(k => k.Trim()) .Where(k => k.Length > 0) .ToList(); var providers = GetProviders(pending.ProviderConfigJson); _logger.LogInformation( "Session {SessionId}: keywords=[{Keywords}] | providers=[{Providers}]", pending.Id, cvKeywords.Count > 0 ? string.Join(", ", cvKeywords) : "(none)", providers.Count > 0 ? string.Join(", ", providers.Select(p => p.Name)) : "(none)"); var results = await RunSearchAsync(pending, cvKeywords, providers, db, cancellationToken); pending.Status = JobSearchStatus.Done; await db.SaveChangesAsync(cancellationToken); var attachmentFileName = BuildCvFileName(pending.CvDocumentId); await _emailSender.SendResultsAsync( pending.Email, attachmentFileName, results, cvKeywords, providers.Select(p => p.Name).ToList(), pending.Language, pending.Location, cancellationToken); _logger.LogInformation("Session {SessionId} done. {Count} results sent.", pending.Id, results.Count); } catch (Exception ex) { _logger.LogError(ex, "Session {SessionId} failed.", pending.Id); pending.Status = JobSearchStatus.Failed; await db.SaveChangesAsync(cancellationToken); } } /// /// Runs the full search pipeline for a session: scrapes all providers, deduplicates URLs, /// fetches each individual job page via page-fetcher-api, applies a keyword pre-filter, /// scores passing candidates via the matcher API, and persists results that meet the minimum score threshold. /// private async Task> RunSearchAsync( JobSearchSessionEntity session, List cvKeywords, List providers, CvSearchDbContext db, CancellationToken ct) { if (cvKeywords.Count == 0) _logger.LogWarning("Session {SessionId}: keyword list is empty — scraper will rely on provider InitialKeywords only", session.Id); var jobCandidates = new Dictionary(StringComparer.OrdinalIgnoreCase); // url → title foreach (var provider in providers) { var candidates = await _searcher.SearchJobUrlsAsync(provider, cvKeywords, session.Location, ct); _logger.LogInformation("Session {SessionId}: provider {Provider} returned {Count} candidates", session.Id, provider.Name, candidates.Count); foreach (var c in candidates) jobCandidates.TryAdd(c.Url, c.Title); } var deduped = jobCandidates.Take(_settings.MaxJobsToMatch).ToList(); _logger.LogInformation( "Session {SessionId}: {Total} unique URLs across all providers, processing up to {Cap}", session.Id, jobCandidates.Count, deduped.Count); var results = new List(); foreach (var (url, title) in deduped) { try { // Fetch individual job page text via page-fetcher-api var fetchResponse = await _pageFetcher.FetchAsync(new FetchPageRequest { Url = url, WaitFor = "domcontentloaded", CallerService = "cv-search-job", JobSearchSessionId = session.Id }, ct); if (!fetchResponse.Success || string.IsNullOrWhiteSpace(fetchResponse.Text)) { _logger.LogWarning("Session {SessionId}: fetch failed for {Url} — {Error}", session.Id, url, fetchResponse.Error); continue; } var jobText = fetchResponse.Text; // Keyword pre-filter: skip LLM call if no CV keyword appears in the job page text if (cvKeywords.Count > 0 && !cvKeywords.Any(k => jobText.Contains(k, StringComparison.OrdinalIgnoreCase))) { _logger.LogInformation( "Session {SessionId}: pre-filter skip | {Url} | no CV keyword found in job text", session.Id, url); continue; } var matchRequest = new MatchJobRequest { CvDocumentId = session.CvDocumentId, JobUrl = url, // Pre-fetched text passed directly so cv-matcher-api skips re-fetching the page JobDescription = jobText, // User already gave GDPR consent when they clicked the one-time job search link GdprConsent = true, // Propagate language so the LLM uses the correct language-specific prompt Language = session.Language }; var matchResult = await _matcherApi.MatchJobAsync(matchRequest, ct); _logger.LogInformation( "Session {SessionId}: {Url} → score={Score}% (threshold={Threshold}%) {Verdict}", session.Id, url, matchResult.Score, _settings.MinMatchScore, matchResult.Score >= _settings.MinMatchScore ? "ACCEPTED" : "rejected"); if (matchResult.Score < _settings.MinMatchScore) continue; var entity = new JobSearchResultEntity { Id = Guid.NewGuid().ToString("N"), SessionId = session.Id, ProviderName = GuessProvider(url, providers), JobUrl = url, JobTitle = matchResult.Summary.Split('.').FirstOrDefault()?.Trim() ?? title, JobText = jobText, Score = matchResult.Score, ResultJson = JsonSerializer.Serialize(matchResult, new JsonSerializerOptions(JsonSerializerDefaults.Web)), Email = session.Email, ClientIpAddress = session.ClientIpAddress, CreatedAt = DateTime.UtcNow }; db.JobSearchResults.Add(entity); await db.SaveChangesAsync(ct); results.Add(entity); } catch (Exception ex) { _logger.LogWarning(ex, "Session {SessionId}: match failed for {Url}", session.Id, url); } } results.Sort((a, b) => b.Score.CompareTo(a.Score)); return results; } /// /// Deserialises the provider configuration snapshot stored on the session. /// Providers are always snapshotted from the DB at session-creation time, so the snapshot /// should always be present. Returns an empty list (with a warning) when it is missing or corrupt. /// private List GetProviders(string? providerConfigJson) { if (string.IsNullOrWhiteSpace(providerConfigJson)) { _logger.LogWarning("Session has no provider config snapshot — returning empty provider list"); return []; } try { return JsonSerializer.Deserialize>(providerConfigJson, new JsonSerializerOptions(JsonSerializerDefaults.Web)) ?? []; } catch (Exception ex) { _logger.LogWarning(ex, "Failed to deserialise provider config snapshot — returning empty provider list"); return []; } } /// /// Infers the provider name from the job URL by matching against each provider's JobLinkContains pattern. /// Falls back to the URL hostname when no provider matches. /// private static string GuessProvider(string url, List providers) { foreach (var p in providers) { if (!string.IsNullOrWhiteSpace(p.JobLinkContains) && url.Contains(p.JobLinkContains, StringComparison.OrdinalIgnoreCase)) return p.Name; } return Uri.TryCreate(url, UriKind.Absolute, out var uri) ? uri.Host : "unknown"; } /// /// Constructs the CV PDF filename from the document ID. /// private static string BuildCvFileName(string cvDocumentId) { // Strip non-alphanumeric characters so the filename is safe for all OS/email clients. var safeId = string.Concat(cvDocumentId.Where(char.IsLetterOrDigit)); if (string.IsNullOrWhiteSpace(safeId)) safeId = "cv"; return $"{safeId}.pdf"; } }