From 244ec0ea25ef9c635a38af3336022fe4cf2275c6 Mon Sep 17 00:00:00 2001 From: Per Stark Date: Mon, 22 Dec 2025 22:27:44 +0100 Subject: [PATCH] fix: migrating embeddings to new dimensions changing order --- common/src/storage/indexes.rs | 49 +++++++++++++++++++- common/src/storage/types/knowledge_entity.rs | 34 ++++++++++++-- common/src/storage/types/text_chunk.rs | 33 +++++++++++-- 3 files changed, 109 insertions(+), 7 deletions(-) diff --git a/common/src/storage/indexes.rs b/common/src/storage/indexes.rs index 2b198e8..a196a47 100644 --- a/common/src/storage/indexes.rs +++ b/common/src/storage/indexes.rs @@ -208,7 +208,26 @@ async fn ensure_runtime_indexes_inner( ) .await } - HnswIndexState::Matches => Ok(()), + HnswIndexState::Matches => { + let status = get_index_status(db, spec.index_name, spec.table).await?; + if status.eq_ignore_ascii_case("error") { + warn!( + index = spec.index_name, + table = spec.table, + "HNSW index found in error state; triggering rebuild" + ); + create_index_with_polling( + db, + spec.definition_overwrite(embedding_dimension), + spec.index_name, + spec.table, + Some(spec.table), + ) + .await + } else { + Ok(()) + } + } HnswIndexState::Different(existing) => { info!( index = spec.index_name, @@ -234,6 +253,34 @@ async fn ensure_runtime_indexes_inner( Ok(()) } +async fn get_index_status( + db: &SurrealDbClient, + index_name: &str, + table: &str, +) -> Result { + let info_query = format!("INFO FOR INDEX {index_name} ON TABLE {table};"); + let mut info_res = db + .client + .query(info_query) + .await + .context("checking index status")?; + let info: Option = info_res.take(0).context("failed to take info result")?; + + let info = match info { + Some(i) => i, + None => return Ok("unknown".to_string()), + }; + + let building = info.get("building"); + let status = building + .and_then(|b| b.get("status")) + .and_then(|s| s.as_str()) + .unwrap_or("ready") + .to_string(); + + Ok(status) +} + async fn rebuild_indexes_inner(db: &SurrealDbClient) -> Result<()> { debug!("Rebuilding indexes with concurrent definitions"); create_fts_analyzer(db).await?; diff --git a/common/src/storage/types/knowledge_entity.rs b/common/src/storage/types/knowledge_entity.rs index b95da14..0758ea7 100644 --- a/common/src/storage/types/knowledge_entity.rs +++ b/common/src/storage/types/knowledge_entity.rs @@ -485,6 +485,29 @@ impl KnowledgeEntity { new_embeddings.insert(entity.id.clone(), (embedding, entity.user_id.clone())); } info!("Successfully generated all new embeddings."); + info!("Successfully generated all new embeddings."); + + // Clear existing embeddings and index first to prevent SurrealDB panics and dimension conflicts. + info!("Removing old index and clearing embeddings..."); + + // Explicitly remove the index first. This prevents background HNSW maintenance from crashing + // when we delete/replace data, dealing with a known SurrealDB panic. + db.client + .query(format!( + "REMOVE INDEX idx_embedding_knowledge_entity_embedding ON TABLE {};", + KnowledgeEntityEmbedding::table_name() + )) + .await + .map_err(AppError::Database)? + .check() + .map_err(AppError::Database)?; + + db.client + .query(format!("DELETE FROM {};", KnowledgeEntityEmbedding::table_name())) + .await + .map_err(AppError::Database)? + .check() + .map_err(AppError::Database)?; // Perform DB updates in a single transaction info!("Applying embedding updates in a transaction..."); @@ -500,11 +523,11 @@ impl KnowledgeEntity { .join(",") ); transaction_query.push_str(&format!( - "UPSERT type::thing('knowledge_entity_embedding', '{id}') SET \ + "CREATE type::thing('knowledge_entity_embedding', '{id}') SET \ entity_id = type::thing('knowledge_entity', '{id}'), \ embedding = {embedding}, \ user_id = '{user_id}', \ - created_at = IF created_at != NONE THEN created_at ELSE time::now() END, \ + created_at = time::now(), \ updated_at = time::now();", id = id, embedding = embedding_str, @@ -520,7 +543,12 @@ impl KnowledgeEntity { transaction_query.push_str("COMMIT TRANSACTION;"); // Execute the entire atomic operation - db.query(transaction_query).await?; + db.client + .query(transaction_query) + .await + .map_err(AppError::Database)? + .check() + .map_err(AppError::Database)?; info!("Re-embedding process for knowledge entities completed successfully."); Ok(()) diff --git a/common/src/storage/types/text_chunk.rs b/common/src/storage/types/text_chunk.rs index ef04a5c..fa2e9ff 100644 --- a/common/src/storage/types/text_chunk.rs +++ b/common/src/storage/types/text_chunk.rs @@ -379,6 +379,28 @@ impl TextChunk { } info!("Successfully generated all new embeddings."); + // Clear existing embeddings and index first to prevent SurrealDB panics and dimension conflicts. + info!("Removing old index and clearing embeddings..."); + + // Explicitly remove the index first. This prevents background HNSW maintenance from crashing + // when we delete/replace data, dealing with a known SurrealDB panic. + db.client + .query(format!( + "REMOVE INDEX idx_embedding_text_chunk_embedding ON TABLE {};", + TextChunkEmbedding::table_name() + )) + .await + .map_err(AppError::Database)? + .check() + .map_err(AppError::Database)?; + + db.client + .query(format!("DELETE FROM {};", TextChunkEmbedding::table_name())) + .await + .map_err(AppError::Database)? + .check() + .map_err(AppError::Database)?; + // Perform DB updates in a single transaction against the embedding table info!("Applying embedding updates in a transaction..."); let mut transaction_query = String::from("BEGIN TRANSACTION;"); @@ -394,12 +416,12 @@ impl TextChunk { ); write!( &mut transaction_query, - "UPSERT type::thing('text_chunk_embedding', '{id}') SET \ + "CREATE type::thing('text_chunk_embedding', '{id}') SET \ chunk_id = type::thing('text_chunk', '{id}'), \ source_id = '{source_id}', \ embedding = {embedding}, \ user_id = '{user_id}', \ - created_at = IF created_at != NONE THEN created_at ELSE time::now() END, \ + created_at = time::now(), \ updated_at = time::now();", id = id, embedding = embedding_str, @@ -418,7 +440,12 @@ impl TextChunk { transaction_query.push_str("COMMIT TRANSACTION;"); - db.query(transaction_query).await?; + db.client + .query(transaction_query) + .await + .map_err(AppError::Database)? + .check() + .map_err(AppError::Database)?; info!("Re-embedding process for text chunks completed successfully."); Ok(())