feat: reranking with fastembed added

This commit is contained in:
Per Stark
2025-10-27 13:05:10 +01:00
parent a0e9387c76
commit 72578296db
25 changed files with 1586 additions and 202 deletions

View File

@@ -8,19 +8,14 @@ use async_openai::{
};
use common::{
error::AppError,
storage::{
db::SurrealDbClient,
types::{
message::{format_history, Message},
system_settings::SystemSettings,
},
storage::types::{
message::{format_history, Message},
system_settings::SystemSettings,
},
};
use serde::Deserialize;
use serde_json::Value;
use crate::{retrieve_entities, retrieved_entities_to_json};
use super::answer_retrieval_helper::get_query_response_schema;
#[derive(Debug, Deserialize)]
@@ -36,53 +31,12 @@ pub struct LLMResponseFormat {
pub references: Vec<Reference>,
}
/// Orchestrates query processing and returns an answer with references
///
/// Takes a query and uses the provided clients to generate an answer with supporting references.
///
/// # Arguments
///
/// * `surreal_db_client` - Client for `SurrealDB` interactions
/// * `openai_client` - Client for `OpenAI` API calls
/// * `query` - The user's query string
/// * `user_id` - The user's id
///
/// # Returns
///
/// Returns a tuple of the answer and its references, or an API error
#[derive(Debug)]
pub struct Answer {
pub content: String,
pub references: Vec<String>,
}
pub async fn get_answer_with_references(
surreal_db_client: &SurrealDbClient,
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
query: &str,
user_id: &str,
) -> Result<Answer, AppError> {
let entities = retrieve_entities(surreal_db_client, openai_client, query, user_id).await?;
let settings = SystemSettings::get_current(surreal_db_client).await?;
let entities_json = retrieved_entities_to_json(&entities);
let user_message = create_user_message(&entities_json, query);
let request = create_chat_request(user_message, &settings)?;
let response = openai_client.chat().create(request).await?;
let llm_response = process_llm_response(response).await?;
Ok(Answer {
content: llm_response.answer,
references: llm_response
.references
.into_iter()
.map(|r| r.reference)
.collect(),
})
}
pub fn create_user_message(entities_json: &Value, query: &str) -> String {
format!(
r"

View File

@@ -3,6 +3,7 @@ pub mod answer_retrieval_helper;
pub mod fts;
pub mod graph;
pub mod pipeline;
pub mod reranking;
pub mod scoring;
pub mod vector;
@@ -13,6 +14,7 @@ use common::{
types::{knowledge_entity::KnowledgeEntity, text_chunk::TextChunk},
},
};
use reranking::RerankerLease;
use tracing::instrument;
pub use pipeline::{retrieved_entities_to_json, RetrievalConfig, RetrievalTuning};
@@ -39,6 +41,7 @@ pub async fn retrieve_entities(
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
input_text: &str,
user_id: &str,
reranker: Option<RerankerLease>,
) -> Result<Vec<RetrievedEntity>, AppError> {
pipeline::run_pipeline(
db_client,
@@ -46,6 +49,7 @@ pub async fn retrieve_entities(
input_text,
user_id,
RetrievalConfig::default(),
reranker,
)
.await
}
@@ -142,6 +146,7 @@ mod tests {
"Rust concurrency async tasks",
user_id,
RetrievalConfig::default(),
None,
)
.await
.expect("Hybrid retrieval failed");
@@ -232,6 +237,7 @@ mod tests {
"Rust concurrency async tasks",
user_id,
RetrievalConfig::default(),
None,
)
.await
.expect("Hybrid retrieval failed");

View File

@@ -17,6 +17,9 @@ pub struct RetrievalTuning {
pub graph_score_decay: f32,
pub graph_seed_min_score: f32,
pub graph_vector_inheritance: f32,
pub rerank_blend_weight: f32,
pub rerank_scores_only: bool,
pub rerank_keep_top: usize,
}
impl Default for RetrievalTuning {
@@ -36,6 +39,9 @@ impl Default for RetrievalTuning {
graph_score_decay: 0.75,
graph_seed_min_score: 0.4,
graph_vector_inheritance: 0.6,
rerank_blend_weight: 0.65,
rerank_scores_only: false,
rerank_keep_top: 8,
}
}
}

View File

@@ -4,7 +4,7 @@ mod state;
pub use config::{RetrievalConfig, RetrievalTuning};
use crate::RetrievedEntity;
use crate::{reranking::RerankerLease, RetrievedEntity};
use async_openai::Client;
use common::{error::AppError, storage::db::SurrealDbClient};
use tracing::info;
@@ -16,6 +16,7 @@ pub async fn run_pipeline(
input_text: &str,
user_id: &str,
config: RetrievalConfig,
reranker: Option<RerankerLease>,
) -> Result<Vec<RetrievedEntity>, AppError> {
let machine = state::ready();
let input_chars = input_text.chars().count();
@@ -35,11 +36,13 @@ pub async fn run_pipeline(
input_text.to_owned(),
user_id.to_owned(),
config,
reranker,
);
let machine = stages::embed(machine, &mut ctx).await?;
let machine = stages::collect_candidates(machine, &mut ctx).await?;
let machine = stages::expand_graph(machine, &mut ctx).await?;
let machine = stages::attach_chunks(machine, &mut ctx).await?;
let machine = stages::rerank(machine, &mut ctx).await?;
let results = stages::assemble(machine, &mut ctx)?;
Ok(results)
@@ -53,6 +56,7 @@ pub async fn run_pipeline_with_embedding(
input_text: &str,
user_id: &str,
config: RetrievalConfig,
reranker: Option<RerankerLease>,
) -> Result<Vec<RetrievedEntity>, AppError> {
let machine = state::ready();
let mut ctx = stages::PipelineContext::with_embedding(
@@ -62,11 +66,13 @@ pub async fn run_pipeline_with_embedding(
input_text.to_owned(),
user_id.to_owned(),
config,
reranker,
);
let machine = stages::embed(machine, &mut ctx).await?;
let machine = stages::collect_candidates(machine, &mut ctx).await?;
let machine = stages::expand_graph(machine, &mut ctx).await?;
let machine = stages::attach_chunks(machine, &mut ctx).await?;
let machine = stages::rerank(machine, &mut ctx).await?;
let results = stages::assemble(machine, &mut ctx)?;
Ok(results)

View File

@@ -7,6 +7,7 @@ use common::{
},
utils::embedding::generate_embedding,
};
use fastembed::RerankResult;
use futures::{stream::FuturesUnordered, StreamExt};
use state_machines::core::GuardError;
use std::collections::{HashMap, HashSet};
@@ -15,6 +16,7 @@ use tracing::{debug, instrument, warn};
use crate::{
fts::find_items_by_fts,
graph::{find_entities_by_relationship_by_id, find_entities_by_source_ids},
reranking::RerankerLease,
scoring::{
clamp_unit, fuse_scores, merge_scored_by_id, min_max_normalize, sort_by_fused_desc,
FusionWeights, Scored,
@@ -27,6 +29,7 @@ use super::{
config::RetrievalConfig,
state::{
CandidatesLoaded, ChunksAttached, Embedded, GraphExpanded, HybridRetrievalMachine, Ready,
Reranked,
},
};
@@ -41,6 +44,7 @@ pub struct PipelineContext<'a> {
pub chunk_candidates: HashMap<String, Scored<TextChunk>>,
pub filtered_entities: Vec<Scored<KnowledgeEntity>>,
pub chunk_values: Vec<Scored<TextChunk>>,
pub reranker: Option<RerankerLease>,
}
impl<'a> PipelineContext<'a> {
@@ -50,6 +54,7 @@ impl<'a> PipelineContext<'a> {
input_text: String,
user_id: String,
config: RetrievalConfig,
reranker: Option<RerankerLease>,
) -> Self {
Self {
db_client,
@@ -62,6 +67,7 @@ impl<'a> PipelineContext<'a> {
chunk_candidates: HashMap::new(),
filtered_entities: Vec::new(),
chunk_values: Vec::new(),
reranker,
}
}
@@ -73,8 +79,16 @@ impl<'a> PipelineContext<'a> {
input_text: String,
user_id: String,
config: RetrievalConfig,
reranker: Option<RerankerLease>,
) -> Self {
let mut ctx = Self::new(db_client, openai_client, input_text, user_id, config);
let mut ctx = Self::new(
db_client,
openai_client,
input_text,
user_id,
config,
reranker,
);
ctx.query_embedding = Some(query_embedding);
ctx
}
@@ -327,9 +341,58 @@ pub async fn attach_chunks(
}
#[instrument(level = "trace", skip_all)]
pub fn assemble(
pub async fn rerank(
machine: HybridRetrievalMachine<(), ChunksAttached>,
ctx: &mut PipelineContext<'_>,
) -> Result<HybridRetrievalMachine<(), Reranked>, AppError> {
let mut applied = false;
if let Some(reranker) = ctx.reranker.as_ref() {
if ctx.filtered_entities.len() > 1 {
let documents = build_rerank_documents(ctx, ctx.config.tuning.max_chunks_per_entity);
if documents.len() > 1 {
match reranker.rerank(&ctx.input_text, documents).await {
Ok(results) if !results.is_empty() => {
apply_rerank_results(ctx, results);
applied = true;
}
Ok(_) => {
debug!("Reranker returned no results; retaining original ordering");
}
Err(err) => {
warn!(
error = %err,
"Reranking failed; continuing with original ordering"
);
}
}
} else {
debug!(
document_count = documents.len(),
"Skipping reranking stage; insufficient document context"
);
}
} else {
debug!("Skipping reranking stage; less than two entities available");
}
} else {
debug!("No reranker lease provided; skipping reranking stage");
}
if applied {
debug!("Applied reranking adjustments to candidate ordering");
}
machine
.rerank()
.map_err(|(_, guard)| map_guard_error("rerank", guard))
}
#[instrument(level = "trace", skip_all)]
pub fn assemble(
machine: HybridRetrievalMachine<(), Reranked>,
ctx: &mut PipelineContext<'_>,
) -> Result<Vec<RetrievedEntity>, AppError> {
debug!("Assembling final retrieved entities");
let tuning = &ctx.config.tuning;
@@ -561,6 +624,113 @@ async fn enrich_chunks_from_entities(
Ok(())
}
fn build_rerank_documents(ctx: &PipelineContext<'_>, max_chunks_per_entity: usize) -> Vec<String> {
if ctx.filtered_entities.is_empty() {
return Vec::new();
}
let mut chunk_by_source: HashMap<&str, Vec<&Scored<TextChunk>>> = HashMap::new();
for chunk in &ctx.chunk_values {
chunk_by_source
.entry(chunk.item.source_id.as_str())
.or_default()
.push(chunk);
}
ctx.filtered_entities
.iter()
.map(|entity| {
let mut doc = format!(
"Name: {}\nType: {:?}\nDescription: {}\n",
entity.item.name, entity.item.entity_type, entity.item.description
);
if let Some(chunks) = chunk_by_source.get(entity.item.source_id.as_str()) {
let mut chunk_refs = chunks.clone();
chunk_refs.sort_by(|a, b| {
b.fused
.partial_cmp(&a.fused)
.unwrap_or(std::cmp::Ordering::Equal)
});
let mut header_added = false;
for chunk in chunk_refs.into_iter().take(max_chunks_per_entity.max(1)) {
let snippet = chunk.item.chunk.trim();
if snippet.is_empty() {
continue;
}
if !header_added {
doc.push_str("Chunks:\n");
header_added = true;
}
doc.push_str("- ");
doc.push_str(snippet);
doc.push('\n');
}
}
doc
})
.collect()
}
fn apply_rerank_results(ctx: &mut PipelineContext<'_>, results: Vec<RerankResult>) {
if results.is_empty() || ctx.filtered_entities.is_empty() {
return;
}
let mut remaining: Vec<Option<Scored<KnowledgeEntity>>> =
std::mem::take(&mut ctx.filtered_entities)
.into_iter()
.map(Some)
.collect();
let raw_scores: Vec<f32> = results.iter().map(|r| r.score).collect();
let normalized_scores = min_max_normalize(&raw_scores);
let use_only = ctx.config.tuning.rerank_scores_only;
let blend = if use_only {
1.0
} else {
clamp_unit(ctx.config.tuning.rerank_blend_weight)
};
let mut reranked: Vec<Scored<KnowledgeEntity>> = Vec::with_capacity(remaining.len());
for (result, normalized) in results.into_iter().zip(normalized_scores.into_iter()) {
if let Some(slot) = remaining.get_mut(result.index) {
if let Some(mut candidate) = slot.take() {
let original = candidate.fused;
let blended = if use_only {
clamp_unit(normalized)
} else {
clamp_unit(original * (1.0 - blend) + normalized * blend)
};
candidate.update_fused(blended);
reranked.push(candidate);
}
} else {
warn!(
result_index = result.index,
"Reranker returned out-of-range index; skipping"
);
}
if reranked.len() == remaining.len() {
break;
}
}
for slot in remaining.into_iter() {
if let Some(candidate) = slot {
reranked.push(candidate);
}
}
ctx.filtered_entities = reranked;
let keep_top = ctx.config.tuning.rerank_keep_top;
if keep_top > 0 && ctx.filtered_entities.len() > keep_top {
ctx.filtered_entities.truncate(keep_top);
}
}
fn estimate_tokens(text: &str, avg_chars_per_token: usize) -> usize {
let chars = text.chars().count().max(1);
(chars / avg_chars_per_token).max(1)

View File

@@ -4,18 +4,20 @@ state_machine! {
name: HybridRetrievalMachine,
state: HybridRetrievalState,
initial: Ready,
states: [Ready, Embedded, CandidatesLoaded, GraphExpanded, ChunksAttached, Completed, Failed],
states: [Ready, Embedded, CandidatesLoaded, GraphExpanded, ChunksAttached, Reranked, Completed, Failed],
events {
embed { transition: { from: Ready, to: Embedded } }
collect_candidates { transition: { from: Embedded, to: CandidatesLoaded } }
expand_graph { transition: { from: CandidatesLoaded, to: GraphExpanded } }
attach_chunks { transition: { from: GraphExpanded, to: ChunksAttached } }
assemble { transition: { from: ChunksAttached, to: Completed } }
rerank { transition: { from: ChunksAttached, to: Reranked } }
assemble { transition: { from: Reranked, to: Completed } }
abort {
transition: { from: Ready, to: Failed }
transition: { from: CandidatesLoaded, to: Failed }
transition: { from: GraphExpanded, to: Failed }
transition: { from: ChunksAttached, to: Failed }
transition: { from: Reranked, to: Failed }
}
}
}

View File

@@ -0,0 +1,170 @@
use std::{
env, fs,
path::{Path, PathBuf},
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
thread::available_parallelism,
};
use common::{error::AppError, utils::config::AppConfig};
use fastembed::{RerankInitOptions, RerankResult, TextRerank};
use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore};
use tracing::debug;
static NEXT_ENGINE: AtomicUsize = AtomicUsize::new(0);
fn pick_engine_index(pool_len: usize) -> usize {
let n = NEXT_ENGINE.fetch_add(1, Ordering::Relaxed);
n % pool_len
}
pub struct RerankerPool {
engines: Vec<Arc<Mutex<TextRerank>>>,
semaphore: Arc<Semaphore>,
}
impl RerankerPool {
/// Build the pool at startup.
/// `pool_size` controls max parallel reranks.
pub fn new(pool_size: usize) -> Result<Arc<Self>, AppError> {
Self::new_with_options(pool_size, RerankInitOptions::default())
}
fn new_with_options(
pool_size: usize,
init_options: RerankInitOptions,
) -> Result<Arc<Self>, AppError> {
if pool_size == 0 {
return Err(AppError::Validation(
"RERANKING_POOL_SIZE must be greater than zero".to_string(),
));
}
fs::create_dir_all(&init_options.cache_dir)?;
let mut engines = Vec::with_capacity(pool_size);
for x in 0..pool_size {
debug!("Creating reranking engine: {x}");
let model = TextRerank::try_new(init_options.clone())
.map_err(|e| AppError::InternalError(e.to_string()))?;
engines.push(Arc::new(Mutex::new(model)));
}
Ok(Arc::new(Self {
engines,
semaphore: Arc::new(Semaphore::new(pool_size)),
}))
}
/// Initialize a pool using application configuration.
pub fn maybe_from_config(config: &AppConfig) -> Result<Option<Arc<Self>>, AppError> {
if !config.reranking_enabled {
return Ok(None);
}
let pool_size = config.reranking_pool_size.unwrap_or_else(default_pool_size);
let init_options = build_rerank_init_options(config)?;
Self::new_with_options(pool_size, init_options).map(Some)
}
/// Check out capacity + pick an engine.
/// This returns a lease that can perform rerank().
pub async fn checkout(self: &Arc<Self>) -> RerankerLease {
// Acquire a permit. This enforces backpressure.
let permit = self
.semaphore
.clone()
.acquire_owned()
.await
.expect("semaphore closed");
// Pick an engine.
// This is naive: just pick based on a simple modulo counter.
// We use an atomic counter to avoid always choosing index 0.
let idx = pick_engine_index(self.engines.len());
let engine = self.engines[idx].clone();
RerankerLease {
_permit: permit,
engine,
}
}
}
fn default_pool_size() -> usize {
available_parallelism()
.map(|value| value.get().min(2))
.unwrap_or(2)
.max(1)
}
fn is_truthy(value: &str) -> bool {
matches!(
value.trim().to_ascii_lowercase().as_str(),
"1" | "true" | "yes" | "on"
)
}
fn build_rerank_init_options(config: &AppConfig) -> Result<RerankInitOptions, AppError> {
let mut options = RerankInitOptions::default();
let cache_dir = config
.fastembed_cache_dir
.as_ref()
.map(PathBuf::from)
.or_else(|| env::var("RERANKING_CACHE_DIR").ok().map(PathBuf::from))
.or_else(|| env::var("FASTEMBED_CACHE_DIR").ok().map(PathBuf::from))
.unwrap_or_else(|| {
Path::new(&config.data_dir)
.join("fastembed")
.join("reranker")
});
fs::create_dir_all(&cache_dir)?;
options.cache_dir = cache_dir;
let show_progress = config
.fastembed_show_download_progress
.or_else(|| env_bool("RERANKING_SHOW_DOWNLOAD_PROGRESS"))
.or_else(|| env_bool("FASTEMBED_SHOW_DOWNLOAD_PROGRESS"))
.unwrap_or(true);
options.show_download_progress = show_progress;
if let Some(max_length) = config.fastembed_max_length.or_else(|| {
env::var("RERANKING_MAX_LENGTH")
.ok()
.and_then(|value| value.parse().ok())
}) {
options.max_length = max_length;
}
Ok(options)
}
fn env_bool(key: &str) -> Option<bool> {
env::var(key).ok().map(|value| is_truthy(&value))
}
/// Active lease on a single TextRerank instance.
pub struct RerankerLease {
// When this drops the semaphore permit is released.
_permit: OwnedSemaphorePermit,
engine: Arc<Mutex<TextRerank>>,
}
impl RerankerLease {
pub async fn rerank(
&self,
query: &str,
documents: Vec<String>,
) -> Result<Vec<RerankResult>, AppError> {
// Lock this specific engine so we get &mut TextRerank
let mut guard = self.engine.lock().await;
guard
.rerank(query.to_owned(), documents, false, None)
.map_err(|e| AppError::InternalError(e.to_string()))
}
}