diff --git a/common/src/storage/db.rs b/common/src/storage/db.rs index f6ea172..6475ae2 100644 --- a/common/src/storage/db.rs +++ b/common/src/storage/db.rs @@ -130,6 +130,19 @@ impl SurrealDbClient { .await } + /// Operation to upsert an object in SurrealDB, replacing any existing record + /// with the same ID. Useful for idempotent ingestion flows. + pub async fn upsert_item(&self, item: T) -> Result, Error> + where + T: StoredObject + Send + Sync + 'static, + { + let id = item.get_id().to_string(); + self.client + .upsert((T::table_name(), id)) + .content(item) + .await + } + /// Operation to retrieve all objects from a certain table, requires the struct to implement StoredObject /// /// # Returns @@ -268,6 +281,56 @@ mod tests { assert!(fetch_post.is_none()); } + #[tokio::test] + async fn upsert_item_overwrites_existing_records() { + let namespace = "test_ns"; + let database = &Uuid::new_v4().to_string(); + let db = SurrealDbClient::memory(namespace, database) + .await + .expect("Failed to start in-memory surrealdb"); + + db.apply_migrations() + .await + .expect("Failed to initialize schema"); + + let mut dummy = Dummy { + id: "abc".to_string(), + name: "first".to_string(), + created_at: Utc::now(), + updated_at: Utc::now(), + }; + + db.store_item(dummy.clone()) + .await + .expect("Failed to store initial record"); + + dummy.name = "updated".to_string(); + let upserted = db + .upsert_item(dummy.clone()) + .await + .expect("Failed to upsert record"); + assert!(upserted.is_some()); + + let fetched: Option = db.get_item(&dummy.id).await.expect("fetch after upsert"); + assert_eq!(fetched.unwrap().name, "updated"); + + let new_record = Dummy { + id: "def".to_string(), + name: "brand-new".to_string(), + created_at: Utc::now(), + updated_at: Utc::now(), + }; + db.upsert_item(new_record.clone()) + .await + .expect("Failed to upsert new record"); + + let fetched_new: Option = db + .get_item(&new_record.id) + .await + .expect("fetch inserted via upsert"); + assert_eq!(fetched_new, Some(new_record)); + } + #[tokio::test] async fn test_applying_migrations() { let namespace = "test_ns"; diff --git a/common/src/storage/types/knowledge_relationship.rs b/common/src/storage/types/knowledge_relationship.rs index 6463dbf..673bc71 100644 --- a/common/src/storage/types/knowledge_relationship.rs +++ b/common/src/storage/types/knowledge_relationship.rs @@ -41,17 +41,18 @@ impl KnowledgeRelationship { } pub async fn store_relationship(&self, db_client: &SurrealDbClient) -> Result<(), AppError> { let query = format!( - r#"RELATE knowledge_entity:`{}`->relates_to:`{}`->knowledge_entity:`{}` + r#"DELETE relates_to:`{rel_id}`; + RELATE knowledge_entity:`{in_id}`->relates_to:`{rel_id}`->knowledge_entity:`{out_id}` SET - metadata.user_id = '{}', - metadata.source_id = '{}', - metadata.relationship_type = '{}'"#, - self.in_, - self.id, - self.out, - self.metadata.user_id, - self.metadata.source_id, - self.metadata.relationship_type + metadata.user_id = '{user_id}', + metadata.source_id = '{source_id}', + metadata.relationship_type = '{relationship_type}'"#, + rel_id = self.id, + in_id = self.in_, + out_id = self.out, + user_id = self.metadata.user_id.as_str(), + source_id = self.metadata.source_id.as_str(), + relationship_type = self.metadata.relationship_type.as_str() ); db_client.query(query).await?;