llm analysis wip

This commit is contained in:
Per Stark
2024-09-30 20:51:42 +02:00
parent f3ad3e1893
commit dcb82ca454
10 changed files with 555 additions and 155 deletions

View File

@@ -3,7 +3,7 @@ use lapin::{
};
use futures_lite::stream::StreamExt;
use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject};
use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject, text_content::TextContent};
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError};
use tracing::{info, error};
@@ -108,7 +108,9 @@ impl RabbitMQConsumer {
match self.consume().await {
Ok((ingress, delivery)) => {
info!("Received IngressObject: {:?}", ingress);
let text_content = ingress.to_text_content().await.unwrap();
text_content.process().await.unwrap();
self.ack_delivery(delivery).await?;
// Process the IngressContent
// match self.handle_ingress_content(&ingress).await {
@@ -143,109 +145,9 @@ impl RabbitMQConsumer {
Ok(())
}
// /// Processes messages in a loop
// pub async fn process_messages(&self) -> Result<(), RabbitMQError> {
// loop {
// match self.consume().await {
// Ok((ingress, delivery)) => {
// // info!("Received ingress object: {:?}", ingress);
// // Process the ingress object
// self.handle_ingress_content(&ingress).await;
// info!("Processing done, acknowledging message");
// self.ack_delivery(delivery).await?;
// }
// Err(RabbitMQError::ConsumeError(e)) => {
// error!("Error consuming message: {}", e);
// // Optionally add a delay before trying again
// tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// }
// Err(e) => {
// error!("Unexpected error: {}", e);
// break;
// }
// }
// }
// Ok(())
// }
pub async fn handle_ingress_content(&self, ingress: &IngressObject) -> Result<(), IngressContentError> {
pub async fn handle_ingress_content(&self, ingress: &IngressObject) -> Result<(), IngressContentError> {
info!("Processing IngressContent: {:?}", ingress);
// Convert IngressContent to individual TextContent instances
// let text_contents = ingress.to_text_contents().await?;
// info!("Generated {} TextContent instances", text_contents.len());
// // Limit concurrent processing (e.g., 10 at a time)
// let semaphore = Arc::new(Semaphore::new(10));
// let mut processing_futures = FuturesUnordered::new();
// for text_content in text_contents {
// let semaphore_clone = semaphore.clone();
// processing_futures.push(tokio::spawn(async move {
// let _permit = semaphore_clone.acquire().await;
// match text_content.process().await {
// Ok(_) => {
// info!("Successfully processed TextContent");
// Ok(())
// }
// Err(e) => {
// error!("Error processing TextContent: {:?}", e);
// Err(e)
// }
// }
// }));
// }
// // Await all processing tasks
// while let Some(result) = processing_futures.next().await {
// match result {
// Ok(Ok(_)) => {
// // Successfully processed
// }
// Ok(Err(e)) => {
// // Processing failed, already logged
// }
// Err(e) => {
// // Task join error
// error!("Task join error: {:?}", e);
// }
// }
// }
// Ok(())
unimplemented!()
}
}
}
// /// Handles the IngressContent based on its type
// async fn handle_ingress_content(&self, ingress: &IngressContent) {
// info!("Processing content: {:?}", ingress);
// // There are three different situations:
// // 1. There is no ingress.content but there are one or more files
// // - We should process the files and act based upon the mime type
// // - All different kinds of content return text
// // 2. There is ingress.content but there are no files
// // - We process ingress.content differently if its a URL or Text enum
// // - Return text
// // 3. There is ingress.content and files
// // - We do both
// //
// // At the end of processing we have one or several text objects with some associated
// // metadata, such as the FileInfo metadata, or the Url associated to the text
// //
// // There will always be ingress.instructions and ingress.category
// //
// // When we have all the text objects and metadata, we can begin the next processing
// // Here we will:
// // 1. Send the text content and metadata to a LLM for analyzing
// // - We want several things, JSON_LD metadata, possibly actions
// // 2. Store the JSON_LD in a graph database
// // 3. Split up the text intelligently and store it in a vector database
// //
// // We return the function if all succeeds.
// }