@@ -0,0 +1,195 @@
|
||||
using Api.Data;
|
||||
using Api.Data.Entities;
|
||||
using Api.Services.Contracts;
|
||||
using Api.Services.Contracts.Models;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
|
||||
namespace Api.Services;
|
||||
|
||||
public sealed class EfRagRepository : IRagRepository
|
||||
{
|
||||
private readonly RagDbContext _db;
|
||||
private readonly ILogger<EfRagRepository> _logger;
|
||||
|
||||
public EfRagRepository(RagDbContext db, ILogger<EfRagRepository> logger)
|
||||
{
|
||||
_db = db;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
public async Task InitializeAsync(CancellationToken ct)
|
||||
{
|
||||
_logger.LogInformation("Ensuring RAG database schema exists using EF Core");
|
||||
await _db.Database.EnsureCreatedAsync(ct);
|
||||
}
|
||||
|
||||
public async Task<RagDocumentRecord?> GetDocumentByTextHashAsync(string textHash, string? sourceUrl, CancellationToken ct)
|
||||
{
|
||||
var query = _db.RagDocuments
|
||||
.AsNoTracking()
|
||||
.Where(x => x.TextHash == textHash);
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(sourceUrl))
|
||||
{
|
||||
query = query.Where(x => x.SourceUrl == sourceUrl);
|
||||
}
|
||||
|
||||
var entity = await query
|
||||
.OrderByDescending(x => x.CreatedAt)
|
||||
.FirstOrDefaultAsync(ct);
|
||||
|
||||
return entity is null ? null : ToRecord(entity);
|
||||
}
|
||||
|
||||
public async Task<RagDocumentRecord?> GetDocumentByIdAsync(string id, CancellationToken ct)
|
||||
{
|
||||
var entity = await _db.RagDocuments
|
||||
.AsNoTracking()
|
||||
.FirstOrDefaultAsync(x => x.Id == id, ct);
|
||||
|
||||
return entity is null ? null : ToRecord(entity);
|
||||
}
|
||||
|
||||
public async Task SaveDocumentAsync(RagDocumentRecord document, IReadOnlyList<RagChunkRecord> chunks, CancellationToken ct)
|
||||
{
|
||||
var exists = await _db.RagDocuments.AnyAsync(x => x.Id == document.Id, ct);
|
||||
if (exists)
|
||||
{
|
||||
_logger.LogInformation("RAG document already exists. DocumentId={DocumentId}", document.Id);
|
||||
return;
|
||||
}
|
||||
|
||||
var entity = new RagDocumentEntity
|
||||
{
|
||||
Id = document.Id,
|
||||
DocumentType = document.DocumentType,
|
||||
Title = document.Title,
|
||||
SourceUrl = document.SourceUrl,
|
||||
RawText = document.Text,
|
||||
TextHash = document.TextHash,
|
||||
TypeConfidence = document.TypeConfidence,
|
||||
MetadataJson = document.MetadataJson,
|
||||
CreatedAt = document.CreatedAt.UtcDateTime,
|
||||
Chunks = chunks.Select(chunk => new RagChunkEntity
|
||||
{
|
||||
Id = chunk.Id,
|
||||
DocumentId = chunk.DocumentId,
|
||||
ChunkIndex = chunk.ChunkIndex,
|
||||
Text = chunk.Text,
|
||||
Embedding = VectorSerializer.ToBytes(chunk.Embedding)
|
||||
}).ToList()
|
||||
};
|
||||
|
||||
_db.RagDocuments.Add(entity);
|
||||
await _db.SaveChangesAsync(ct);
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<SearchCandidateChunk>> SearchChunksAsync(
|
||||
float[] queryEmbedding,
|
||||
IReadOnlyList<string>? targetTypes,
|
||||
int topK,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var types = targetTypes?
|
||||
.Where(x => !string.IsNullOrWhiteSpace(x))
|
||||
.Select(x => x.Trim().ToLowerInvariant())
|
||||
.Distinct()
|
||||
.ToArray() ?? [];
|
||||
|
||||
var query = _db.RagChunks
|
||||
.AsNoTracking()
|
||||
.Include(x => x.Document)
|
||||
.AsQueryable();
|
||||
|
||||
if (types.Length > 0)
|
||||
{
|
||||
query = query.Where(x => x.Document != null && types.Contains(x.Document.DocumentType.ToLower()));
|
||||
}
|
||||
|
||||
var rows = await query.ToListAsync(ct);
|
||||
|
||||
return rows
|
||||
.Where(x => x.Document is not null)
|
||||
.Select(x => new SearchCandidateChunk
|
||||
{
|
||||
Document = ToRecord(x.Document!),
|
||||
Chunk = new RagChunkRecord
|
||||
{
|
||||
Id = x.Id,
|
||||
DocumentId = x.DocumentId,
|
||||
ChunkIndex = x.ChunkIndex,
|
||||
Text = x.Text,
|
||||
Embedding = VectorSerializer.FromBytes(x.Embedding)
|
||||
},
|
||||
Score = VectorSerializer.CosineSimilarity(queryEmbedding, VectorSerializer.FromBytes(x.Embedding))
|
||||
})
|
||||
.OrderByDescending(x => x.Score)
|
||||
.Take(Math.Max(topK * 4, topK))
|
||||
.ToList();
|
||||
}
|
||||
|
||||
public async Task<float[]?> GetEmbeddingAsync(string cacheKey, CancellationToken ct)
|
||||
{
|
||||
var entry = await _db.RagEmbeddingCache
|
||||
.AsNoTracking()
|
||||
.FirstOrDefaultAsync(x => x.CacheKey == cacheKey, ct);
|
||||
|
||||
return entry is null ? null : VectorSerializer.FromBytes(entry.Vector);
|
||||
}
|
||||
|
||||
public async Task SaveEmbeddingAsync(string cacheKey, string model, string textHash, float[] vector, CancellationToken ct)
|
||||
{
|
||||
var exists = await _db.RagEmbeddingCache.AnyAsync(x => x.CacheKey == cacheKey, ct);
|
||||
if (exists) return;
|
||||
|
||||
_db.RagEmbeddingCache.Add(new RagEmbeddingCacheEntity
|
||||
{
|
||||
CacheKey = cacheKey,
|
||||
Model = model,
|
||||
TextHash = textHash,
|
||||
Vector = VectorSerializer.ToBytes(vector),
|
||||
CreatedAt = DateTime.UtcNow
|
||||
});
|
||||
|
||||
await _db.SaveChangesAsync(ct);
|
||||
}
|
||||
|
||||
public async Task<string?> GetChatCompletionAsync(string cacheKey, CancellationToken ct)
|
||||
{
|
||||
return await _db.RagChatCompletionCache
|
||||
.AsNoTracking()
|
||||
.Where(x => x.CacheKey == cacheKey)
|
||||
.Select(x => x.ResponseText)
|
||||
.FirstOrDefaultAsync(ct);
|
||||
}
|
||||
|
||||
public async Task SaveChatCompletionAsync(string cacheKey, string model, decimal temperature, string responseText, CancellationToken ct)
|
||||
{
|
||||
var exists = await _db.RagChatCompletionCache.AnyAsync(x => x.CacheKey == cacheKey, ct);
|
||||
if (exists) return;
|
||||
|
||||
_db.RagChatCompletionCache.Add(new RagChatCompletionCacheEntity
|
||||
{
|
||||
CacheKey = cacheKey,
|
||||
Model = model,
|
||||
Temperature = temperature,
|
||||
ResponseText = responseText,
|
||||
CreatedAt = DateTime.UtcNow
|
||||
});
|
||||
|
||||
await _db.SaveChangesAsync(ct);
|
||||
}
|
||||
|
||||
private static RagDocumentRecord ToRecord(RagDocumentEntity entity) => new()
|
||||
{
|
||||
Id = entity.Id,
|
||||
DocumentType = entity.DocumentType,
|
||||
Title = entity.Title,
|
||||
SourceUrl = entity.SourceUrl,
|
||||
Text = entity.RawText,
|
||||
TextHash = entity.TextHash,
|
||||
TypeConfidence = entity.TypeConfidence,
|
||||
MetadataJson = entity.MetadataJson,
|
||||
CreatedAt = new DateTimeOffset(DateTime.SpecifyKind(entity.CreatedAt, DateTimeKind.Utc))
|
||||
};
|
||||
}
|
||||
@@ -1,238 +0,0 @@
|
||||
using Microsoft.Data.SqlClient;
|
||||
using Api.Services.Contracts;
|
||||
using Api.Services.Contracts.Models;
|
||||
|
||||
namespace Api.Services;
|
||||
|
||||
public sealed class SqlRagRepository : IRagRepository
|
||||
{
|
||||
private readonly string _connectionString;
|
||||
|
||||
public SqlRagRepository(IConfiguration configuration)
|
||||
{
|
||||
_connectionString = configuration.GetConnectionString("RagDb")
|
||||
?? throw new InvalidOperationException("Connection string 'RagDb' is missing.");
|
||||
}
|
||||
|
||||
public async Task InitializeAsync(CancellationToken ct)
|
||||
{
|
||||
await EnsureDatabaseExistsAsync(ct);
|
||||
var sql = await File.ReadAllTextAsync(Path.Combine(AppContext.BaseDirectory, "Database", "schema.sql"), ct);
|
||||
await using var connection = new SqlConnection(_connectionString);
|
||||
await connection.OpenAsync(ct);
|
||||
foreach (var commandText in sql.Split("GO", StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries))
|
||||
{
|
||||
await using var command = new SqlCommand(commandText, connection);
|
||||
await command.ExecuteNonQueryAsync(ct);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<RagDocumentRecord?> GetDocumentByTextHashAsync(string textHash, string? sourceUrl, CancellationToken ct)
|
||||
{
|
||||
const string sql = """
|
||||
SELECT TOP 1 Id, DocumentType, Title, SourceUrl, RawText, TextHash, TypeConfidence, MetadataJson, CreatedAt
|
||||
FROM RagDocuments
|
||||
WHERE TextHash = @TextHash AND (@SourceUrl IS NULL OR SourceUrl = @SourceUrl)
|
||||
ORDER BY CreatedAt DESC
|
||||
""";
|
||||
await using var connection = new SqlConnection(_connectionString);
|
||||
await connection.OpenAsync(ct);
|
||||
await using var command = new SqlCommand(sql, connection);
|
||||
command.Parameters.AddWithValue("@TextHash", textHash);
|
||||
command.Parameters.AddWithValue("@SourceUrl", (object?)sourceUrl ?? DBNull.Value);
|
||||
await using var reader = await command.ExecuteReaderAsync(ct);
|
||||
return await reader.ReadAsync(ct) ? ReadDocument(reader) : null;
|
||||
}
|
||||
|
||||
public async Task<RagDocumentRecord?> GetDocumentByIdAsync(string id, CancellationToken ct)
|
||||
{
|
||||
const string sql = """
|
||||
SELECT Id, DocumentType, Title, SourceUrl, RawText, TextHash, TypeConfidence, MetadataJson, CreatedAt
|
||||
FROM RagDocuments
|
||||
WHERE Id = @Id
|
||||
""";
|
||||
await using var connection = new SqlConnection(_connectionString);
|
||||
await connection.OpenAsync(ct);
|
||||
await using var command = new SqlCommand(sql, connection);
|
||||
command.Parameters.AddWithValue("@Id", id);
|
||||
await using var reader = await command.ExecuteReaderAsync(ct);
|
||||
return await reader.ReadAsync(ct) ? ReadDocument(reader) : null;
|
||||
}
|
||||
|
||||
public async Task SaveDocumentAsync(RagDocumentRecord document, IReadOnlyList<RagChunkRecord> chunks, CancellationToken ct)
|
||||
{
|
||||
await using var connection = new SqlConnection(_connectionString);
|
||||
await connection.OpenAsync(ct);
|
||||
await using var tx = (SqlTransaction)await connection.BeginTransactionAsync(ct);
|
||||
try
|
||||
{
|
||||
const string insertDoc = """
|
||||
INSERT INTO RagDocuments (Id, DocumentType, Title, SourceUrl, RawText, TextHash, TypeConfidence, MetadataJson, CreatedAt)
|
||||
VALUES (@Id, @DocumentType, @Title, @SourceUrl, @RawText, @TextHash, @TypeConfidence, @MetadataJson, @CreatedAt)
|
||||
""";
|
||||
await using (var command = new SqlCommand(insertDoc, connection, tx))
|
||||
{
|
||||
command.Parameters.AddWithValue("@Id", document.Id);
|
||||
command.Parameters.AddWithValue("@DocumentType", document.DocumentType);
|
||||
command.Parameters.AddWithValue("@Title", document.Title);
|
||||
command.Parameters.AddWithValue("@SourceUrl", (object?)document.SourceUrl ?? DBNull.Value);
|
||||
command.Parameters.AddWithValue("@RawText", document.Text);
|
||||
command.Parameters.AddWithValue("@TextHash", document.TextHash);
|
||||
command.Parameters.AddWithValue("@TypeConfidence", document.TypeConfidence);
|
||||
command.Parameters.AddWithValue("@MetadataJson", document.MetadataJson);
|
||||
command.Parameters.AddWithValue("@CreatedAt", document.CreatedAt.UtcDateTime);
|
||||
await command.ExecuteNonQueryAsync(ct);
|
||||
}
|
||||
|
||||
const string insertChunk = """
|
||||
INSERT INTO RagChunks (Id, DocumentId, ChunkIndex, Text, Embedding)
|
||||
VALUES (@Id, @DocumentId, @ChunkIndex, @Text, @Embedding)
|
||||
""";
|
||||
foreach (var chunk in chunks)
|
||||
{
|
||||
await using var command = new SqlCommand(insertChunk, connection, tx);
|
||||
command.Parameters.AddWithValue("@Id", chunk.Id);
|
||||
command.Parameters.AddWithValue("@DocumentId", document.Id);
|
||||
command.Parameters.AddWithValue("@ChunkIndex", chunk.ChunkIndex);
|
||||
command.Parameters.AddWithValue("@Text", chunk.Text);
|
||||
command.Parameters.AddWithValue("@Embedding", VectorSerializer.ToBytes(chunk.Embedding));
|
||||
await command.ExecuteNonQueryAsync(ct);
|
||||
}
|
||||
await tx.CommitAsync(ct);
|
||||
}
|
||||
catch
|
||||
{
|
||||
await tx.RollbackAsync(ct);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<SearchCandidateChunk>> SearchChunksAsync(float[] queryEmbedding, IReadOnlyList<string>? targetTypes, int topK, CancellationToken ct)
|
||||
{
|
||||
var types = targetTypes?.Where(x => !string.IsNullOrWhiteSpace(x)).Select(x => x.Trim().ToLowerInvariant()).Distinct().ToArray() ?? [];
|
||||
var sql = """
|
||||
SELECT d.Id, d.DocumentType, d.Title, d.SourceUrl, d.RawText, d.TextHash, d.TypeConfidence, d.MetadataJson, d.CreatedAt,
|
||||
c.Id, c.DocumentId, c.ChunkIndex, c.Text, c.Embedding
|
||||
FROM RagChunks c
|
||||
INNER JOIN RagDocuments d ON d.Id = c.DocumentId
|
||||
""";
|
||||
|
||||
if (types.Length > 0)
|
||||
{
|
||||
sql += " WHERE LOWER(d.DocumentType) IN (" + string.Join(',', types.Select((_, i) => $"@Type{i}")) + ")";
|
||||
}
|
||||
|
||||
await using var connection = new SqlConnection(_connectionString);
|
||||
await connection.OpenAsync(ct);
|
||||
await using var command = new SqlCommand(sql, connection);
|
||||
for (var i = 0; i < types.Length; i++) command.Parameters.AddWithValue($"@Type{i}", types[i]);
|
||||
await using var reader = await command.ExecuteReaderAsync(ct);
|
||||
var candidates = new List<SearchCandidateChunk>();
|
||||
while (await reader.ReadAsync(ct))
|
||||
{
|
||||
var doc = ReadDocument(reader, 0);
|
||||
var chunk = new RagChunkRecord
|
||||
{
|
||||
Id = reader.GetString(9),
|
||||
DocumentId = reader.GetString(10),
|
||||
ChunkIndex = reader.GetInt32(11),
|
||||
Text = reader.GetString(12),
|
||||
Embedding = VectorSerializer.FromBytes((byte[])reader[13])
|
||||
};
|
||||
candidates.Add(new SearchCandidateChunk
|
||||
{
|
||||
Document = doc,
|
||||
Chunk = chunk,
|
||||
Score = VectorSerializer.CosineSimilarity(queryEmbedding, chunk.Embedding)
|
||||
});
|
||||
}
|
||||
|
||||
return candidates
|
||||
.OrderByDescending(x => x.Score)
|
||||
.Take(Math.Max(topK * 4, topK))
|
||||
.ToList();
|
||||
}
|
||||
|
||||
public async Task<float[]?> GetEmbeddingAsync(string cacheKey, CancellationToken ct)
|
||||
{
|
||||
const string sql = "SELECT Vector FROM RagEmbeddingCache WHERE CacheKey = @CacheKey";
|
||||
await using var connection = new SqlConnection(_connectionString);
|
||||
await connection.OpenAsync(ct);
|
||||
await using var command = new SqlCommand(sql, connection);
|
||||
command.Parameters.AddWithValue("@CacheKey", cacheKey);
|
||||
var value = await command.ExecuteScalarAsync(ct);
|
||||
return value is byte[] bytes ? VectorSerializer.FromBytes(bytes) : null;
|
||||
}
|
||||
|
||||
public async Task SaveEmbeddingAsync(string cacheKey, string model, string textHash, float[] vector, CancellationToken ct)
|
||||
{
|
||||
const string sql = """
|
||||
IF NOT EXISTS (SELECT 1 FROM RagEmbeddingCache WHERE CacheKey = @CacheKey)
|
||||
INSERT INTO RagEmbeddingCache (CacheKey, Model, TextHash, Vector, CreatedAt)
|
||||
VALUES (@CacheKey, @Model, @TextHash, @Vector, SYSUTCDATETIME())
|
||||
""";
|
||||
await using var connection = new SqlConnection(_connectionString);
|
||||
await connection.OpenAsync(ct);
|
||||
await using var command = new SqlCommand(sql, connection);
|
||||
command.Parameters.AddWithValue("@CacheKey", cacheKey);
|
||||
command.Parameters.AddWithValue("@Model", model);
|
||||
command.Parameters.AddWithValue("@TextHash", textHash);
|
||||
command.Parameters.AddWithValue("@Vector", VectorSerializer.ToBytes(vector));
|
||||
await command.ExecuteNonQueryAsync(ct);
|
||||
}
|
||||
|
||||
public async Task<string?> GetChatCompletionAsync(string cacheKey, CancellationToken ct)
|
||||
{
|
||||
const string sql = "SELECT ResponseText FROM RagChatCompletionCache WHERE CacheKey = @CacheKey";
|
||||
await using var connection = new SqlConnection(_connectionString);
|
||||
await connection.OpenAsync(ct);
|
||||
await using var command = new SqlCommand(sql, connection);
|
||||
command.Parameters.AddWithValue("@CacheKey", cacheKey);
|
||||
return await command.ExecuteScalarAsync(ct) as string;
|
||||
}
|
||||
|
||||
public async Task SaveChatCompletionAsync(string cacheKey, string model, decimal temperature, string responseText, CancellationToken ct)
|
||||
{
|
||||
const string sql = """
|
||||
IF NOT EXISTS (SELECT 1 FROM RagChatCompletionCache WHERE CacheKey = @CacheKey)
|
||||
INSERT INTO RagChatCompletionCache (CacheKey, Model, Temperature, ResponseText, CreatedAt)
|
||||
VALUES (@CacheKey, @Model, @Temperature, @ResponseText, SYSUTCDATETIME())
|
||||
""";
|
||||
await using var connection = new SqlConnection(_connectionString);
|
||||
await connection.OpenAsync(ct);
|
||||
await using var command = new SqlCommand(sql, connection);
|
||||
command.Parameters.AddWithValue("@CacheKey", cacheKey);
|
||||
command.Parameters.AddWithValue("@Model", model);
|
||||
command.Parameters.AddWithValue("@Temperature", temperature);
|
||||
command.Parameters.AddWithValue("@ResponseText", responseText);
|
||||
await command.ExecuteNonQueryAsync(ct);
|
||||
}
|
||||
|
||||
private static RagDocumentRecord ReadDocument(SqlDataReader reader, int offset = 0) => new()
|
||||
{
|
||||
Id = reader.GetString(offset),
|
||||
DocumentType = reader.GetString(offset + 1),
|
||||
Title = reader.GetString(offset + 2),
|
||||
SourceUrl = reader.IsDBNull(offset + 3) ? null : reader.GetString(offset + 3),
|
||||
Text = reader.GetString(offset + 4),
|
||||
TextHash = reader.GetString(offset + 5),
|
||||
TypeConfidence = Convert.ToDouble(reader.GetValue(offset + 6)),
|
||||
MetadataJson = reader.GetString(offset + 7),
|
||||
CreatedAt = new DateTimeOffset(reader.GetDateTime(offset + 8), TimeSpan.Zero)
|
||||
};
|
||||
private async Task EnsureDatabaseExistsAsync(CancellationToken ct)
|
||||
{
|
||||
var builder = new SqlConnectionStringBuilder(_connectionString);
|
||||
var databaseName = builder.InitialCatalog;
|
||||
if (string.IsNullOrWhiteSpace(databaseName)) return;
|
||||
|
||||
builder.InitialCatalog = "master";
|
||||
await using var connection = new SqlConnection(builder.ConnectionString);
|
||||
await connection.OpenAsync(ct);
|
||||
var safeName = databaseName.Replace("]", "]]" );
|
||||
await using var command = new SqlCommand($"IF DB_ID(@DatabaseName) IS NULL EXEC('CREATE DATABASE [{safeName}]')", connection);
|
||||
command.Parameters.AddWithValue("@DatabaseName", databaseName);
|
||||
await command.ExecuteNonQueryAsync(ct);
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user