mirror of
https://github.com/perstarkse/minne.git
synced 2026-04-22 00:38:30 +02:00
fix: migrating embeddings to new dimensions
changing order
This commit is contained in:
@@ -208,7 +208,26 @@ async fn ensure_runtime_indexes_inner(
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
HnswIndexState::Matches => Ok(()),
|
HnswIndexState::Matches => {
|
||||||
|
let status = get_index_status(db, spec.index_name, spec.table).await?;
|
||||||
|
if status.eq_ignore_ascii_case("error") {
|
||||||
|
warn!(
|
||||||
|
index = spec.index_name,
|
||||||
|
table = spec.table,
|
||||||
|
"HNSW index found in error state; triggering rebuild"
|
||||||
|
);
|
||||||
|
create_index_with_polling(
|
||||||
|
db,
|
||||||
|
spec.definition_overwrite(embedding_dimension),
|
||||||
|
spec.index_name,
|
||||||
|
spec.table,
|
||||||
|
Some(spec.table),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
HnswIndexState::Different(existing) => {
|
HnswIndexState::Different(existing) => {
|
||||||
info!(
|
info!(
|
||||||
index = spec.index_name,
|
index = spec.index_name,
|
||||||
@@ -234,6 +253,34 @@ async fn ensure_runtime_indexes_inner(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn get_index_status(
|
||||||
|
db: &SurrealDbClient,
|
||||||
|
index_name: &str,
|
||||||
|
table: &str,
|
||||||
|
) -> Result<String> {
|
||||||
|
let info_query = format!("INFO FOR INDEX {index_name} ON TABLE {table};");
|
||||||
|
let mut info_res = db
|
||||||
|
.client
|
||||||
|
.query(info_query)
|
||||||
|
.await
|
||||||
|
.context("checking index status")?;
|
||||||
|
let info: Option<Value> = info_res.take(0).context("failed to take info result")?;
|
||||||
|
|
||||||
|
let info = match info {
|
||||||
|
Some(i) => i,
|
||||||
|
None => return Ok("unknown".to_string()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let building = info.get("building");
|
||||||
|
let status = building
|
||||||
|
.and_then(|b| b.get("status"))
|
||||||
|
.and_then(|s| s.as_str())
|
||||||
|
.unwrap_or("ready")
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
Ok(status)
|
||||||
|
}
|
||||||
|
|
||||||
async fn rebuild_indexes_inner(db: &SurrealDbClient) -> Result<()> {
|
async fn rebuild_indexes_inner(db: &SurrealDbClient) -> Result<()> {
|
||||||
debug!("Rebuilding indexes with concurrent definitions");
|
debug!("Rebuilding indexes with concurrent definitions");
|
||||||
create_fts_analyzer(db).await?;
|
create_fts_analyzer(db).await?;
|
||||||
|
|||||||
@@ -485,6 +485,29 @@ impl KnowledgeEntity {
|
|||||||
new_embeddings.insert(entity.id.clone(), (embedding, entity.user_id.clone()));
|
new_embeddings.insert(entity.id.clone(), (embedding, entity.user_id.clone()));
|
||||||
}
|
}
|
||||||
info!("Successfully generated all new embeddings.");
|
info!("Successfully generated all new embeddings.");
|
||||||
|
info!("Successfully generated all new embeddings.");
|
||||||
|
|
||||||
|
// Clear existing embeddings and index first to prevent SurrealDB panics and dimension conflicts.
|
||||||
|
info!("Removing old index and clearing embeddings...");
|
||||||
|
|
||||||
|
// Explicitly remove the index first. This prevents background HNSW maintenance from crashing
|
||||||
|
// when we delete/replace data, dealing with a known SurrealDB panic.
|
||||||
|
db.client
|
||||||
|
.query(format!(
|
||||||
|
"REMOVE INDEX idx_embedding_knowledge_entity_embedding ON TABLE {};",
|
||||||
|
KnowledgeEntityEmbedding::table_name()
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.map_err(AppError::Database)?
|
||||||
|
.check()
|
||||||
|
.map_err(AppError::Database)?;
|
||||||
|
|
||||||
|
db.client
|
||||||
|
.query(format!("DELETE FROM {};", KnowledgeEntityEmbedding::table_name()))
|
||||||
|
.await
|
||||||
|
.map_err(AppError::Database)?
|
||||||
|
.check()
|
||||||
|
.map_err(AppError::Database)?;
|
||||||
|
|
||||||
// Perform DB updates in a single transaction
|
// Perform DB updates in a single transaction
|
||||||
info!("Applying embedding updates in a transaction...");
|
info!("Applying embedding updates in a transaction...");
|
||||||
@@ -500,11 +523,11 @@ impl KnowledgeEntity {
|
|||||||
.join(",")
|
.join(",")
|
||||||
);
|
);
|
||||||
transaction_query.push_str(&format!(
|
transaction_query.push_str(&format!(
|
||||||
"UPSERT type::thing('knowledge_entity_embedding', '{id}') SET \
|
"CREATE type::thing('knowledge_entity_embedding', '{id}') SET \
|
||||||
entity_id = type::thing('knowledge_entity', '{id}'), \
|
entity_id = type::thing('knowledge_entity', '{id}'), \
|
||||||
embedding = {embedding}, \
|
embedding = {embedding}, \
|
||||||
user_id = '{user_id}', \
|
user_id = '{user_id}', \
|
||||||
created_at = IF created_at != NONE THEN created_at ELSE time::now() END, \
|
created_at = time::now(), \
|
||||||
updated_at = time::now();",
|
updated_at = time::now();",
|
||||||
id = id,
|
id = id,
|
||||||
embedding = embedding_str,
|
embedding = embedding_str,
|
||||||
@@ -520,7 +543,12 @@ impl KnowledgeEntity {
|
|||||||
transaction_query.push_str("COMMIT TRANSACTION;");
|
transaction_query.push_str("COMMIT TRANSACTION;");
|
||||||
|
|
||||||
// Execute the entire atomic operation
|
// Execute the entire atomic operation
|
||||||
db.query(transaction_query).await?;
|
db.client
|
||||||
|
.query(transaction_query)
|
||||||
|
.await
|
||||||
|
.map_err(AppError::Database)?
|
||||||
|
.check()
|
||||||
|
.map_err(AppError::Database)?;
|
||||||
|
|
||||||
info!("Re-embedding process for knowledge entities completed successfully.");
|
info!("Re-embedding process for knowledge entities completed successfully.");
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
@@ -379,6 +379,28 @@ impl TextChunk {
|
|||||||
}
|
}
|
||||||
info!("Successfully generated all new embeddings.");
|
info!("Successfully generated all new embeddings.");
|
||||||
|
|
||||||
|
// Clear existing embeddings and index first to prevent SurrealDB panics and dimension conflicts.
|
||||||
|
info!("Removing old index and clearing embeddings...");
|
||||||
|
|
||||||
|
// Explicitly remove the index first. This prevents background HNSW maintenance from crashing
|
||||||
|
// when we delete/replace data, dealing with a known SurrealDB panic.
|
||||||
|
db.client
|
||||||
|
.query(format!(
|
||||||
|
"REMOVE INDEX idx_embedding_text_chunk_embedding ON TABLE {};",
|
||||||
|
TextChunkEmbedding::table_name()
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.map_err(AppError::Database)?
|
||||||
|
.check()
|
||||||
|
.map_err(AppError::Database)?;
|
||||||
|
|
||||||
|
db.client
|
||||||
|
.query(format!("DELETE FROM {};", TextChunkEmbedding::table_name()))
|
||||||
|
.await
|
||||||
|
.map_err(AppError::Database)?
|
||||||
|
.check()
|
||||||
|
.map_err(AppError::Database)?;
|
||||||
|
|
||||||
// Perform DB updates in a single transaction against the embedding table
|
// Perform DB updates in a single transaction against the embedding table
|
||||||
info!("Applying embedding updates in a transaction...");
|
info!("Applying embedding updates in a transaction...");
|
||||||
let mut transaction_query = String::from("BEGIN TRANSACTION;");
|
let mut transaction_query = String::from("BEGIN TRANSACTION;");
|
||||||
@@ -394,12 +416,12 @@ impl TextChunk {
|
|||||||
);
|
);
|
||||||
write!(
|
write!(
|
||||||
&mut transaction_query,
|
&mut transaction_query,
|
||||||
"UPSERT type::thing('text_chunk_embedding', '{id}') SET \
|
"CREATE type::thing('text_chunk_embedding', '{id}') SET \
|
||||||
chunk_id = type::thing('text_chunk', '{id}'), \
|
chunk_id = type::thing('text_chunk', '{id}'), \
|
||||||
source_id = '{source_id}', \
|
source_id = '{source_id}', \
|
||||||
embedding = {embedding}, \
|
embedding = {embedding}, \
|
||||||
user_id = '{user_id}', \
|
user_id = '{user_id}', \
|
||||||
created_at = IF created_at != NONE THEN created_at ELSE time::now() END, \
|
created_at = time::now(), \
|
||||||
updated_at = time::now();",
|
updated_at = time::now();",
|
||||||
id = id,
|
id = id,
|
||||||
embedding = embedding_str,
|
embedding = embedding_str,
|
||||||
@@ -418,7 +440,12 @@ impl TextChunk {
|
|||||||
|
|
||||||
transaction_query.push_str("COMMIT TRANSACTION;");
|
transaction_query.push_str("COMMIT TRANSACTION;");
|
||||||
|
|
||||||
db.query(transaction_query).await?;
|
db.client
|
||||||
|
.query(transaction_query)
|
||||||
|
.await
|
||||||
|
.map_err(AppError::Database)?
|
||||||
|
.check()
|
||||||
|
.map_err(AppError::Database)?;
|
||||||
|
|
||||||
info!("Re-embedding process for text chunks completed successfully.");
|
info!("Re-embedding process for text chunks completed successfully.");
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
Reference in New Issue
Block a user