fix: replaced several instances if cloning, reduced allocations

This commit is contained in:
Per Stark
2026-06-06 19:45:18 +02:00
parent 93c65970f1
commit 676fdbc132
12 changed files with 254 additions and 110 deletions
+3 -1
View File
@@ -5,6 +5,8 @@ pub mod query;
pub mod reranking;
pub mod scoring;
use std::sync::Arc;
use common::{
error::AppError,
storage::{
@@ -52,7 +54,7 @@ pub struct RetrievedChunk {
pub struct RetrievedEntity {
pub entity: KnowledgeEntity,
pub score: f32,
pub chunks: Vec<RetrievedChunk>,
pub chunks: Arc<Vec<RetrievedChunk>>,
}
/// Run chunk-first hybrid retrieval for `input_text`, optionally resolving owning entities.
+41 -20
View File
@@ -4,7 +4,7 @@ use common::{
storage::types::{knowledge_entity::KnowledgeEntity, text_chunk::TextChunk},
};
use fastembed::RerankResult;
use std::collections::HashMap;
use std::{collections::HashMap, fmt::Write, sync::Arc};
use tracing::{debug, instrument, warn};
use crate::{
@@ -106,7 +106,7 @@ pub async fn embed(ctx: &mut PipelineContext<'_>) -> Result<(), AppError> {
#[instrument(level = "trace", skip_all)]
pub async fn search_chunks(ctx: &mut PipelineContext<'_>) -> Result<(), AppError> {
debug!("Collecting chunk candidates via vector and FTS search");
let embedding = ctx.ensure_embedding().map_err(|e| *e)?.clone();
let embedding = ctx.ensure_embedding().map_err(|e| *e)?;
let tuning = &ctx.config.tuning;
let fts_take = tuning.chunk_fts_take;
let (fts_query, fts_token_count) = normalize_fts_terms(&ctx.input_text);
@@ -233,12 +233,16 @@ pub async fn resolve_entities(ctx: &mut PipelineContext<'_>) -> Result<(), AppEr
let mut best_score: HashMap<String, f32> = HashMap::new();
for scored in &ctx.chunk_values {
let source = scored.item.source_id.clone();
let attached = chunks_by_source.entry(source.clone()).or_default();
if attached.is_empty() {
source_order.push(source.clone());
best_score.insert(source.clone(), scored.fused);
let source_id = &scored.item.source_id;
let is_new_source = !chunks_by_source.contains_key(source_id);
if is_new_source {
source_order.push(source_id.clone());
best_score.insert(source_id.clone(), scored.fused);
}
let attached = chunks_by_source
.entry(source_id.clone())
.or_default();
if attached.len() < max_chunks {
attached.push(RetrievedChunk {
chunk: scored.item.clone(),
@@ -247,6 +251,11 @@ pub async fn resolve_entities(ctx: &mut PipelineContext<'_>) -> Result<(), AppEr
}
}
let chunks_by_source: HashMap<String, Arc<Vec<RetrievedChunk>>> = chunks_by_source
.into_iter()
.map(|(source, chunks)| (source, Arc::new(chunks)))
.collect();
let entities =
KnowledgeEntity::find_by_source_ids(ctx.db_client, &source_order, &ctx.user_id).await?;
@@ -264,12 +273,15 @@ pub async fn resolve_entities(ctx: &mut PipelineContext<'_>) -> Result<(), AppEr
continue;
};
let score = best_score.get(source).copied().unwrap_or(0.0);
let chunks = chunks_by_source.get(source).cloned().unwrap_or_default();
let chunks = chunks_by_source
.get(source)
.cloned()
.unwrap_or_else(|| Arc::new(Vec::new()));
for entity in entities {
results.push(RetrievedEntity {
entity,
score,
chunks: chunks.clone(),
chunks: Arc::clone(&chunks),
});
}
}
@@ -328,17 +340,26 @@ where
}
fn build_chunk_rerank_documents(chunks: &[Scored<TextChunk>], max_chunks: usize) -> Vec<String> {
chunks
.iter()
.take(max_chunks)
.map(|chunk| {
format!(
"Source: {}\nChunk:\n{}",
chunk.item.source_id,
chunk.item.chunk.trim()
)
})
.collect()
let take = chunks.len().min(max_chunks);
let mut documents = Vec::with_capacity(take);
let mut buffer = String::with_capacity(512);
for chunk in chunks.iter().take(max_chunks) {
buffer.clear();
let _ = write!(
buffer,
"Source: {}\nChunk:\n{}",
chunk.item.source_id,
chunk.item.chunk.trim()
);
let next_capacity = buffer.capacity().max(512);
documents.push(std::mem::replace(
&mut buffer,
String::with_capacity(next_capacity),
));
}
documents
}
fn apply_chunk_rerank_results(
+47 -22
View File
@@ -1,4 +1,7 @@
use std::{cmp::Ordering, collections::HashMap};
use std::{
cmp::Ordering,
collections::{hash_map::Entry, HashMap},
};
use common::storage::types::StoredObject;
@@ -119,7 +122,7 @@ pub fn reciprocal_rank_fusion<T>(
config: RrfConfig,
) -> Vec<Scored<T>>
where
T: StoredObject + Clone,
T: StoredObject,
{
let mut merged: HashMap<String, Scored<T>> = HashMap::new();
let k = if config.k <= 0.0 { 60.0 } else { config.k };
@@ -146,19 +149,30 @@ where
for (rank, candidate) in vector_ranked.into_iter().enumerate() {
let id = candidate.item.id().to_owned();
let entry = merged
.entry(id.clone())
.or_insert_with(|| Scored::new(candidate.item.clone()));
let rank_f32: f32 = u16::try_from(rank).map_or(f32::MAX, f32::from);
let contribution = vector_weight / (k + rank_f32 + 1.0);
if let Some(score) = candidate.scores.vector {
let existing = entry.scores.vector.unwrap_or(f32::MIN);
if score > existing {
entry.scores.vector = Some(score);
match merged.entry(id) {
Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
if let Some(score) = candidate.scores.vector {
let existing = entry.scores.vector.unwrap_or(f32::MIN);
if score > existing {
entry.scores.vector = Some(score);
}
}
entry.item = candidate.item;
entry.fused += contribution;
}
Entry::Vacant(vacant) => {
let mut scored = Scored::new(candidate.item);
if let Some(score) = candidate.scores.vector {
scored.scores.vector = Some(score);
}
scored.fused = contribution;
vacant.insert(scored);
}
}
entry.item = candidate.item;
let rank_f32: f32 = u16::try_from(rank).map_or(f32::MAX, f32::from);
entry.fused += vector_weight / (k + rank_f32 + 1.0);
}
}
@@ -174,19 +188,30 @@ where
for (rank, candidate) in fts_ranked.into_iter().enumerate() {
let id = candidate.item.id().to_owned();
let entry = merged
.entry(id.clone())
.or_insert_with(|| Scored::new(candidate.item.clone()));
let rank_f32: f32 = u16::try_from(rank).map_or(f32::MAX, f32::from);
let contribution = fts_weight / (k + rank_f32 + 1.0);
if let Some(score) = candidate.scores.fts {
let existing = entry.scores.fts.unwrap_or(f32::MIN);
if score > existing {
entry.scores.fts = Some(score);
match merged.entry(id) {
Entry::Occupied(mut occupied) => {
let entry = occupied.get_mut();
if let Some(score) = candidate.scores.fts {
let existing = entry.scores.fts.unwrap_or(f32::MIN);
if score > existing {
entry.scores.fts = Some(score);
}
}
entry.item = candidate.item;
entry.fused += contribution;
}
Entry::Vacant(vacant) => {
let mut scored = Scored::new(candidate.item);
if let Some(score) = candidate.scores.fts {
scored.scores.fts = Some(score);
}
scored.fused = contribution;
vacant.insert(scored);
}
}
entry.item = candidate.item;
let rank_f32: f32 = u16::try_from(rank).map_or(f32::MAX, f32::from);
entry.fused += fts_weight / (k + rank_f32 + 1.0);
}
}