diff --git a/common/src/storage/types/knowledge_entity.rs b/common/src/storage/types/knowledge_entity.rs index 19fab47..9683b41 100644 --- a/common/src/storage/types/knowledge_entity.rs +++ b/common/src/storage/types/knowledge_entity.rs @@ -250,13 +250,10 @@ impl KnowledgeEntity { pub async fn store_with_embedding( entity: KnowledgeEntity, embedding: Vec, + embedding_dimensions: usize, db: &SurrealDbClient, ) -> Result<(), AppError> { - let settings = SystemSettings::get_current(db).await?; - KnowledgeEntityEmbedding::validate_dimension( - &embedding, - settings.embedding_dimensions as usize, - )?; + KnowledgeEntityEmbedding::validate_dimension(&embedding, embedding_dimensions)?; let entity_id = entity.id.clone(); let emb = KnowledgeEntityEmbedding::new( @@ -722,13 +719,13 @@ mod tests { ); let emb = vec![0.1, 0.2, 0.3, 0.4, 0.5]; - KnowledgeEntity::store_with_embedding(entity1.clone(), emb.clone(), &db) + KnowledgeEntity::store_with_embedding(entity1.clone(), emb.clone(), 5, &db) .await .with_context(|| "Failed to store entity 1".to_string())?; - KnowledgeEntity::store_with_embedding(entity2.clone(), emb.clone(), &db) + KnowledgeEntity::store_with_embedding(entity2.clone(), emb.clone(), 5, &db) .await .with_context(|| "Failed to store entity 2".to_string())?; - KnowledgeEntity::store_with_embedding(different_entity.clone(), emb.clone(), &db) + KnowledgeEntity::store_with_embedding(different_entity.clone(), emb.clone(), 5, &db) .await .with_context(|| "Failed to store different entity".to_string())?; @@ -819,10 +816,10 @@ mod tests { user_id, ); - KnowledgeEntity::store_with_embedding(entity1, vec![0.1, 0.2, 0.3], &db) + KnowledgeEntity::store_with_embedding(entity1, vec![0.1, 0.2, 0.3], 3, &db) .await .expect("store entity1"); - KnowledgeEntity::store_with_embedding(entity2, vec![0.3, 0.2, 0.1], &db) + KnowledgeEntity::store_with_embedding(entity2, vec![0.3, 0.2, 0.1], 3, &db) .await .expect("store entity2"); @@ -895,7 +892,7 @@ mod tests { user_id.clone(), ); - KnowledgeEntity::store_with_embedding(entity.clone(), vec![0.1, 0.2, 0.3], &db) + KnowledgeEntity::store_with_embedding(entity.clone(), vec![0.1, 0.2, 0.3], 3, &db) .await .with_context(|| "store entity with embedding".to_string())?; @@ -970,10 +967,10 @@ mod tests { user_id.clone(), ); - KnowledgeEntity::store_with_embedding(e1.clone(), vec![1.0, 0.0, 0.0], &db) + KnowledgeEntity::store_with_embedding(e1.clone(), vec![1.0, 0.0, 0.0], 3, &db) .await .with_context(|| "store e1".to_string())?; - KnowledgeEntity::store_with_embedding(e2.clone(), vec![0.0, 1.0, 0.0], &db) + KnowledgeEntity::store_with_embedding(e2.clone(), vec![0.0, 1.0, 0.0], 3, &db) .await .with_context(|| "store e2".to_string())?; @@ -1062,7 +1059,7 @@ mod tests { user_id.clone(), ); - KnowledgeEntity::store_with_embedding(entity.clone(), vec![0.1, 0.2, 0.3], &db) + KnowledgeEntity::store_with_embedding(entity.clone(), vec![0.1, 0.2, 0.3], 3, &db) .await .with_context(|| "store entity with embedding".to_string())?; diff --git a/common/src/storage/types/knowledge_entity_embedding.rs b/common/src/storage/types/knowledge_entity_embedding.rs index a4e5c35..d43d27e 100644 --- a/common/src/storage/types/knowledge_entity_embedding.rs +++ b/common/src/storage/types/knowledge_entity_embedding.rs @@ -205,7 +205,7 @@ mod tests { let embedding_vec = vec![0.11_f32, 0.22, 0.33]; let entity = build_knowledge_entity_with_id(entity_key, source_id, user_id); - KnowledgeEntity::store_with_embedding(entity.clone(), embedding_vec.clone(), &db) + KnowledgeEntity::store_with_embedding(entity.clone(), embedding_vec.clone(), 3, &db) .await .with_context(|| "Failed to store entity with embedding".to_string())?; @@ -234,7 +234,7 @@ mod tests { let entity = build_knowledge_entity_with_id(entity_key, source_id, user_id); - KnowledgeEntity::store_with_embedding(entity.clone(), vec![0.5_f32, 0.6, 0.7], &db) + KnowledgeEntity::store_with_embedding(entity.clone(), vec![0.5_f32, 0.6, 0.7], 3, &db) .await .with_context(|| "Failed to store entity with embedding".to_string())?; @@ -266,7 +266,7 @@ mod tests { let entity = build_knowledge_entity_with_id("entity-store", source_id, user_id); - KnowledgeEntity::store_with_embedding(entity.clone(), embedding.clone(), &db) + KnowledgeEntity::store_with_embedding(entity.clone(), embedding.clone(), 3, &db) .await .with_context(|| "Failed to store entity with embedding".to_string())?; @@ -295,7 +295,7 @@ mod tests { let db = prepare_knowledge_entity_test_db(3).await?; let entity = build_knowledge_entity_with_id("entity-dim", "source-dim", "user-dim"); - let result = KnowledgeEntity::store_with_embedding(entity, vec![0.1, 0.2], &db).await; + let result = KnowledgeEntity::store_with_embedding(entity, vec![0.1, 0.2], 3, &db).await; assert!(matches!(result, Err(AppError::Validation(_)))); @@ -313,13 +313,13 @@ mod tests { let entity2 = build_knowledge_entity_with_id("entity-s2", source_id, user_id); let entity_other = build_knowledge_entity_with_id("entity-other", other_source, user_id); - KnowledgeEntity::store_with_embedding(entity1.clone(), vec![1.0_f32, 1.1, 1.2], &db) + KnowledgeEntity::store_with_embedding(entity1.clone(), vec![1.0_f32, 1.1, 1.2], 3, &db) .await .with_context(|| "Failed to store entity with embedding".to_string())?; - KnowledgeEntity::store_with_embedding(entity2.clone(), vec![2.0_f32, 2.1, 2.2], &db) + KnowledgeEntity::store_with_embedding(entity2.clone(), vec![2.0_f32, 2.1, 2.2], 3, &db) .await .with_context(|| "Failed to store entity with embedding".to_string())?; - KnowledgeEntity::store_with_embedding(entity_other.clone(), vec![3.0_f32, 3.1, 3.2], &db) + KnowledgeEntity::store_with_embedding(entity_other.clone(), vec![3.0_f32, 3.1, 3.2], 3, &db) .await .with_context(|| "Failed to store entity with embedding".to_string())?; @@ -403,7 +403,7 @@ mod tests { let source_id = "source-fetch"; let entity = build_knowledge_entity_with_id(entity_key, source_id, user_id); - KnowledgeEntity::store_with_embedding(entity.clone(), vec![0.7_f32, 0.8, 0.9], &db) + KnowledgeEntity::store_with_embedding(entity.clone(), vec![0.7_f32, 0.8, 0.9], 3, &db) .await .with_context(|| "Failed to store entity with embedding".to_string())?; @@ -441,7 +441,7 @@ mod tests { let source_id = "source-upsert"; let entity = build_knowledge_entity_with_id("entity-upsert", source_id, user_id); - KnowledgeEntity::store_with_embedding(entity.clone(), vec![1.0_f32, 0.0, 0.0], &db) + KnowledgeEntity::store_with_embedding(entity.clone(), vec![1.0_f32, 0.0, 0.0], 3, &db) .await .with_context(|| "initial store".to_string())?; diff --git a/common/src/storage/types/system_settings.rs b/common/src/storage/types/system_settings.rs index 7bedfb1..80b5333 100644 --- a/common/src/storage/types/system_settings.rs +++ b/common/src/storage/types/system_settings.rs @@ -691,7 +691,7 @@ mod tests { "user1".into(), ); - TextChunk::store_with_embedding(initial_chunk.clone(), vec![0.1; 1536], &db) + TextChunk::store_with_embedding(initial_chunk.clone(), vec![0.1; 1536], 1536, &db) .await .with_context(|| "Failed to store initial chunk with embedding".to_string())?; diff --git a/common/src/storage/types/text_chunk.rs b/common/src/storage/types/text_chunk.rs index 5adbe0c..0096bab 100644 --- a/common/src/storage/types/text_chunk.rs +++ b/common/src/storage/types/text_chunk.rs @@ -3,7 +3,6 @@ use std::collections::HashMap; use std::fmt::Write; use crate::storage::indexes::hnsw_index_overwrite_sql; -use crate::storage::types::system_settings::SystemSettings; use crate::storage::types::text_chunk_embedding::TextChunkEmbedding; use crate::utils::embedding::RE_EMBED_BATCH_SIZE; use crate::{error::AppError, storage::db::SurrealDbClient, stored_object}; @@ -67,10 +66,10 @@ impl TextChunk { pub async fn store_with_embedding( chunk: TextChunk, embedding: Vec, + embedding_dimensions: usize, db: &SurrealDbClient, ) -> Result<(), AppError> { - let settings = SystemSettings::get_current(db).await?; - TextChunkEmbedding::validate_dimension(&embedding, settings.embedding_dimensions as usize)?; + TextChunkEmbedding::validate_dimension(&embedding, embedding_dimensions)?; let chunk_id = chunk.id.clone(); let emb = TextChunkEmbedding::new( @@ -104,7 +103,7 @@ impl TextChunk { Ok(()) } - /// Vector search over text chunks using the embedding table, fetching full chunk rows and embeddings. + /// Vector search over text chunks using the embedding table, fetching full chunk rows and scores. pub async fn vector_search( take: usize, query_embedding: &[f32], @@ -122,7 +121,6 @@ impl TextChunk { r#" SELECT chunk_id, - embedding, vector::similarity::cosine(embedding, $embedding) AS score FROM {emb_table} WHERE user_id = $user_id @@ -466,15 +464,16 @@ mod tests { user_id.clone(), ); - TextChunk::store_with_embedding(chunk1.clone(), vec![0.1, 0.2, 0.3, 0.4, 0.5], &db) + TextChunk::store_with_embedding(chunk1.clone(), vec![0.1, 0.2, 0.3, 0.4, 0.5], 5, &db) .await .with_context(|| "store chunk1".to_string())?; - TextChunk::store_with_embedding(chunk2.clone(), vec![0.1, 0.2, 0.3, 0.4, 0.5], &db) + TextChunk::store_with_embedding(chunk2.clone(), vec![0.1, 0.2, 0.3, 0.4, 0.5], 5, &db) .await .with_context(|| "store chunk2".to_string())?; TextChunk::store_with_embedding( different_chunk.clone(), vec![0.1, 0.2, 0.3, 0.4, 0.5], + 5, &db, ) .await @@ -536,7 +535,7 @@ mod tests { "user123".to_string(), ); - TextChunk::store_with_embedding(chunk.clone(), vec![0.1, 0.2, 0.3, 0.4, 0.5], &db) + TextChunk::store_with_embedding(chunk.clone(), vec![0.1, 0.2, 0.3, 0.4, 0.5], 5, &db) .await .with_context(|| "store chunk".to_string())?; @@ -584,10 +583,10 @@ mod tests { "user123".to_string(), ); - TextChunk::store_with_embedding(chunk1.clone(), vec![0.1, 0.2, 0.3, 0.4, 0.5], &db) + TextChunk::store_with_embedding(chunk1.clone(), vec![0.1, 0.2, 0.3, 0.4, 0.5], 5, &db) .await .expect("store chunk1"); - TextChunk::store_with_embedding(chunk2.clone(), vec![0.5, 0.4, 0.3, 0.2, 0.1], &db) + TextChunk::store_with_embedding(chunk2.clone(), vec![0.5, 0.4, 0.3, 0.2, 0.1], 5, &db) .await .expect("store chunk2"); @@ -632,7 +631,7 @@ mod tests { .await .with_context(|| "redefine index".to_string())?; - TextChunk::store_with_embedding(chunk.clone(), vec![0.1, 0.2, 0.3], &db) + TextChunk::store_with_embedding(chunk.clone(), vec![0.1, 0.2, 0.3], 3, &db) .await .with_context(|| "store with embedding".to_string())?; @@ -683,7 +682,7 @@ mod tests { "runtime_user".to_string(), ); - TextChunk::store_with_embedding(chunk.clone(), vec![0.1, 0.2, 0.3], &db) + TextChunk::store_with_embedding(chunk.clone(), vec![0.1, 0.2, 0.3], 3, &db) .await .with_context(|| "store with embedding".to_string())?; @@ -755,7 +754,7 @@ mod tests { user_id.clone(), ); - TextChunk::store_with_embedding(chunk.clone(), vec![0.1, 0.2, 0.3], &db) + TextChunk::store_with_embedding(chunk.clone(), vec![0.1, 0.2, 0.3], 3, &db) .await .with_context(|| "store".to_string())?; @@ -792,10 +791,10 @@ mod tests { let chunk1 = TextChunk::new("s1".to_string(), "chunk one".to_string(), user_id.clone()); let chunk2 = TextChunk::new("s2".to_string(), "chunk two".to_string(), user_id.clone()); - TextChunk::store_with_embedding(chunk1.clone(), vec![1.0, 0.0, 0.0], &db) + TextChunk::store_with_embedding(chunk1.clone(), vec![1.0, 0.0, 0.0], 3, &db) .await .with_context(|| "store chunk1".to_string())?; - TextChunk::store_with_embedding(chunk2.clone(), vec![0.0, 1.0, 0.0], &db) + TextChunk::store_with_embedding(chunk2.clone(), vec![0.0, 1.0, 0.0], 3, &db) .await .with_context(|| "store chunk2".to_string())?; @@ -948,7 +947,7 @@ mod tests { let chunk = TextChunk::new("src".to_string(), "body".to_string(), "user".to_string()); - let err = TextChunk::store_with_embedding(chunk, vec![0.1, 0.2], &db) + let err = TextChunk::store_with_embedding(chunk, vec![0.1, 0.2], 3, &db) .await .expect_err("expected dimension validation failure"); assert!(matches!(err, AppError::Validation(_))); @@ -978,7 +977,7 @@ mod tests { user_id.clone(), ); - TextChunk::store_with_embedding(chunk.clone(), vec![0.1, 0.2, 0.3], &db) + TextChunk::store_with_embedding(chunk.clone(), vec![0.1, 0.2, 0.3], 3, &db) .await .with_context(|| "store chunk with embedding".to_string())?; diff --git a/html-router/src/routes/knowledge/handlers.rs b/html-router/src/routes/knowledge/handlers.rs index db5a297..5548519 100644 --- a/html-router/src/routes/knowledge/handlers.rs +++ b/html-router/src/routes/knowledge/handlers.rs @@ -203,7 +203,13 @@ pub async fn create_knowledge_entity( ); let new_entity_id = new_entity.id.clone(); - KnowledgeEntity::store_with_embedding(new_entity, embedding, &state.db).await?; + KnowledgeEntity::store_with_embedding( + new_entity, + embedding, + state.embedding_provider.dimension(), + &state.db, + ) + .await?; let relationship_type = relationship_type_or_default(form.relationship_type.as_deref()); let user_id = user.id.clone(); diff --git a/ingestion-pipeline/src/pipeline/persistence.rs b/ingestion-pipeline/src/pipeline/persistence.rs index 3182552..9a6e689 100644 --- a/ingestion-pipeline/src/pipeline/persistence.rs +++ b/ingestion-pipeline/src/pipeline/persistence.rs @@ -48,6 +48,7 @@ const STORE_RELATIONSHIPS: &str = r" pub(super) async fn store_vector_chunks( db: &SurrealDbClient, task_id: &str, + embedding_dimensions: usize, chunks: Vec, ) -> Result { let chunk_count = chunks.len(); @@ -58,7 +59,13 @@ pub(super) async fn store_vector_chunks( chunk_len = embedded.chunk.chunk.chars().count(), "chunk persisted" ); - TextChunk::store_with_embedding(embedded.chunk, embedded.embedding, db).await?; + TextChunk::store_with_embedding( + embedded.chunk, + embedded.embedding, + embedding_dimensions, + db, + ) + .await?; } Ok(chunk_count) @@ -71,11 +78,18 @@ pub(super) async fn store_vector_chunks( pub(super) async fn store_graph_entities( db: &SurrealDbClient, tuning: &IngestionTuning, + embedding_dimensions: usize, entities: Vec, relationships: Vec, ) -> Result<(), AppError> { for embedded in entities { - KnowledgeEntity::store_with_embedding(embedded.entity, embedded.embedding, db).await?; + KnowledgeEntity::store_with_embedding( + embedded.entity, + embedded.embedding, + embedding_dimensions, + db, + ) + .await?; } if relationships.is_empty() { diff --git a/ingestion-pipeline/src/pipeline/stages.rs b/ingestion-pipeline/src/pipeline/stages.rs index 9484141..bfe2063 100644 --- a/ingestion-pipeline/src/pipeline/stages.rs +++ b/ingestion-pipeline/src/pipeline/stages.rs @@ -6,7 +6,10 @@ use common::{ error::AppError, - storage::{indexes::rebuild, types::ingestion_payload::IngestionPayload}, + storage::{ + indexes::rebuild, + types::{ingestion_payload::IngestionPayload, system_settings::SystemSettings}, + }, }; use state_machines::core::GuardError; use tracing::{debug, instrument}; @@ -155,8 +158,26 @@ pub async fn persist( let entity_count = entities.len(); let relationship_count = relationships.len(); - let chunk_count = store_vector_chunks(ctx.db, ctx.task_id.as_str(), chunks).await?; - store_graph_entities(ctx.db, &ctx.pipeline_config.tuning, entities, relationships).await?; + let settings = SystemSettings::get_current(ctx.db).await?; + let embedding_dimensions = usize::try_from(settings.embedding_dimensions).map_err(|_| { + AppError::InternalError("system_settings.embedding_dimensions exceeds usize::MAX".into()) + })?; + + let chunk_count = store_vector_chunks( + ctx.db, + ctx.task_id.as_str(), + embedding_dimensions, + chunks, + ) + .await?; + store_graph_entities( + ctx.db, + &ctx.pipeline_config.tuning, + embedding_dimensions, + entities, + relationships, + ) + .await?; ctx.db.store_item(text_content).await?; rebuild(ctx.db).await?; diff --git a/main/src/bootstrap/startup.rs b/main/src/bootstrap/startup.rs index 9418e1f..daf688a 100644 --- a/main/src/bootstrap/startup.rs +++ b/main/src/bootstrap/startup.rs @@ -320,7 +320,7 @@ mod tests { "dimension migration test chunk".into(), "user1".into(), ); - TextChunk::store_with_embedding(chunk, vec![0.1, 0.2, 0.3], &services.db) + TextChunk::store_with_embedding(chunk, vec![0.1, 0.2, 0.3], 3, &services.db) .await .expect("store chunk at old dimension"); diff --git a/retrieval-pipeline/src/lib.rs b/retrieval-pipeline/src/lib.rs index 975e737..42e2dbb 100644 --- a/retrieval-pipeline/src/lib.rs +++ b/retrieval-pipeline/src/lib.rs @@ -139,7 +139,7 @@ mod tests { user_id.into(), ); - TextChunk::store_with_embedding(chunk.clone(), chunk_embedding_primary(), &db).await?; + TextChunk::store_with_embedding(chunk.clone(), chunk_embedding_primary(), 3, &db).await?; let embedding_provider = test_embedding_provider(); let params = pipeline::RetrievalParams { @@ -185,8 +185,9 @@ mod tests { user_id.into(), ); - TextChunk::store_with_embedding(primary_chunk, chunk_embedding_primary(), &db).await?; - TextChunk::store_with_embedding(secondary_chunk, chunk_embedding_secondary(), &db).await?; + TextChunk::store_with_embedding(primary_chunk, chunk_embedding_primary(), 3, &db).await?; + TextChunk::store_with_embedding(secondary_chunk, chunk_embedding_secondary(), 3, &db) + .await?; let embedding_provider = test_embedding_provider(); let params = pipeline::RetrievalParams { @@ -230,7 +231,7 @@ mod tests { "Async Rust programming uses the Tokio runtime for concurrent tasks.".into(), user_id.into(), ); - TextChunk::store_with_embedding(chunk.clone(), chunk_embedding_primary(), &db).await?; + TextChunk::store_with_embedding(chunk.clone(), chunk_embedding_primary(), 3, &db).await?; let entity = KnowledgeEntity::new( "entity_source".into(),