memory optimization and queue fix

This commit is contained in:
Per Stark
2025-01-14 13:41:36 +01:00
parent 2d96f06478
commit 972919a515
5 changed files with 15 additions and 19 deletions

View File

@@ -48,9 +48,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Found {} unfinished jobs", unfinished_jobs.len()); info!("Found {} unfinished jobs", unfinished_jobs.len());
for job in unfinished_jobs { for job in unfinished_jobs {
if let Err(e) = job_queue.process_job(job.clone(), &content_processor).await { job_queue.process_job(job, &content_processor).await?;
error!("Error processing job {}: {}", job.id, e);
}
} }
} }

View File

@@ -40,9 +40,6 @@ impl ContentProcessor {
} }
pub async fn process(&self, content: &TextContent) -> Result<(), AppError> { pub async fn process(&self, content: &TextContent) -> Result<(), AppError> {
// Store original content
store_item(&self.db_client, content.clone()).await?;
let now = Instant::now(); let now = Instant::now();
// Perform analyis, this step also includes retrieval // Perform analyis, this step also includes retrieval
@@ -65,6 +62,9 @@ impl ContentProcessor {
self.store_vector_chunks(content), self.store_vector_chunks(content),
)?; )?;
// Store original content
store_item(&self.db_client, content.to_owned()).await?;
self.db_client.rebuild_indexes().await?; self.db_client.rebuild_indexes().await?;
Ok(()) Ok(())
} }

View File

@@ -1,6 +1,9 @@
use chrono::Utc; use chrono::Utc;
use futures::Stream; use futures::Stream;
use std::sync::Arc; use std::{
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use surrealdb::{opt::PatchOp, Error, Notification}; use surrealdb::{opt::PatchOp, Error, Notification};
use tracing::{error, info}; use tracing::{error, info};
@@ -75,7 +78,10 @@ impl JobQueue {
.db .db
.update((Job::table_name(), id)) .update((Job::table_name(), id))
.patch(PatchOp::replace("/status", status)) .patch(PatchOp::replace("/status", status))
.patch(PatchOp::replace("/updated_at", Utc::now())) .patch(PatchOp::replace(
"/updated_at",
surrealdb::sql::Datetime::default(),
))
.await?; .await?;
Ok(job) Ok(job)
@@ -90,7 +96,6 @@ impl JobQueue {
/// Get unfinished jobs, ie newly created and in progress up two times /// Get unfinished jobs, ie newly created and in progress up two times
pub async fn get_unfinished_jobs(&self) -> Result<Vec<Job>, AppError> { pub async fn get_unfinished_jobs(&self) -> Result<Vec<Job>, AppError> {
info!("Getting unfinished jobs");
let jobs: Vec<Job> = self let jobs: Vec<Job> = self
.db .db
.query( .query(

View File

@@ -49,18 +49,11 @@ pub async fn combined_knowledge_entity_retrieval(
10, 10,
query, query,
db_client, db_client,
"knowledge_entity".to_string(), "knowledge_entity",
openai_client,
user_id,
),
find_items_by_vector_similarity(
5,
query,
db_client,
"text_chunk".to_string(),
openai_client, openai_client,
user_id, user_id,
), ),
find_items_by_vector_similarity(5, query, db_client, "text_chunk", openai_client, user_id),
) )
.await?; .await?;

View File

@@ -27,7 +27,7 @@ pub async fn find_items_by_vector_similarity<T>(
take: u8, take: u8,
input_text: &str, input_text: &str,
db_client: &Surreal<Any>, db_client: &Surreal<Any>,
table: String, table: &str,
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>, openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
user_id: &str, user_id: &str,
) -> Result<Vec<T>, AppError> ) -> Result<Vec<T>, AppError>