From f66660719805a9bd4c748204123993f513116aef Mon Sep 17 00:00:00 2001 From: Per Stark Date: Tue, 14 Jan 2025 13:41:36 +0100 Subject: [PATCH] memory optimization and queue fix --- src/bin/worker.rs | 4 +--- src/ingress/content_processor.rs | 6 +++--- src/ingress/jobqueue.rs | 11 ++++++++--- src/retrieval/mod.rs | 11 ++--------- src/retrieval/vector.rs | 2 +- 5 files changed, 15 insertions(+), 19 deletions(-) diff --git a/src/bin/worker.rs b/src/bin/worker.rs index c2c6f38..f648626 100644 --- a/src/bin/worker.rs +++ b/src/bin/worker.rs @@ -48,9 +48,7 @@ async fn main() -> Result<(), Box> { info!("Found {} unfinished jobs", unfinished_jobs.len()); for job in unfinished_jobs { - if let Err(e) = job_queue.process_job(job.clone(), &content_processor).await { - error!("Error processing job {}: {}", job.id, e); - } + job_queue.process_job(job, &content_processor).await?; } } diff --git a/src/ingress/content_processor.rs b/src/ingress/content_processor.rs index 963611a..e6a5fd6 100644 --- a/src/ingress/content_processor.rs +++ b/src/ingress/content_processor.rs @@ -40,9 +40,6 @@ impl ContentProcessor { } pub async fn process(&self, content: &TextContent) -> Result<(), AppError> { - // Store original content - store_item(&self.db_client, content.clone()).await?; - let now = Instant::now(); // Perform analyis, this step also includes retrieval @@ -65,6 +62,9 @@ impl ContentProcessor { self.store_vector_chunks(content), )?; + // Store original content + store_item(&self.db_client, content.to_owned()).await?; + self.db_client.rebuild_indexes().await?; Ok(()) } diff --git a/src/ingress/jobqueue.rs b/src/ingress/jobqueue.rs index cc3379e..df8c1c8 100644 --- a/src/ingress/jobqueue.rs +++ b/src/ingress/jobqueue.rs @@ -1,6 +1,9 @@ use chrono::Utc; use futures::Stream; -use std::sync::Arc; +use std::{ + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; use surrealdb::{opt::PatchOp, Error, Notification}; use tracing::{error, info}; @@ -75,7 +78,10 @@ impl JobQueue { .db .update((Job::table_name(), id)) .patch(PatchOp::replace("/status", status)) - .patch(PatchOp::replace("/updated_at", Utc::now())) + .patch(PatchOp::replace( + "/updated_at", + surrealdb::sql::Datetime::default(), + )) .await?; Ok(job) @@ -90,7 +96,6 @@ impl JobQueue { /// Get unfinished jobs, ie newly created and in progress up two times pub async fn get_unfinished_jobs(&self) -> Result, AppError> { - info!("Getting unfinished jobs"); let jobs: Vec = self .db .query( diff --git a/src/retrieval/mod.rs b/src/retrieval/mod.rs index ca0f620..f14e447 100644 --- a/src/retrieval/mod.rs +++ b/src/retrieval/mod.rs @@ -49,18 +49,11 @@ pub async fn combined_knowledge_entity_retrieval( 10, query, db_client, - "knowledge_entity".to_string(), - openai_client, - user_id, - ), - find_items_by_vector_similarity( - 5, - query, - db_client, - "text_chunk".to_string(), + "knowledge_entity", openai_client, user_id, ), + find_items_by_vector_similarity(5, query, db_client, "text_chunk", openai_client, user_id), ) .await?; diff --git a/src/retrieval/vector.rs b/src/retrieval/vector.rs index fd4efde..8b4ea90 100644 --- a/src/retrieval/vector.rs +++ b/src/retrieval/vector.rs @@ -27,7 +27,7 @@ pub async fn find_items_by_vector_similarity( take: u8, input_text: &str, db_client: &Surreal, - table: String, + table: &str, openai_client: &async_openai::Client, user_id: &str, ) -> Result, AppError>