llm analysis wip

This commit is contained in:
Per Stark
2024-09-30 20:51:42 +02:00
parent c1ad819164
commit a348950234
10 changed files with 555 additions and 155 deletions

View File

@@ -1,7 +1,7 @@
use tokio;
use tracing::{info, error};
use tracing::info;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use zettle_db::rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig, RabbitMQError};
use zettle_db::rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig };
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {

View File

@@ -45,6 +45,7 @@ pub async fn create_ingress_objects(
input: IngressInput,
redis_client: &RedisClient,
) -> Result<Vec<IngressObject>, IngressContentError> {
// Initialize list
let mut object_list = Vec::new();
if let Some(input_content) = input.content {

View File

@@ -54,7 +54,7 @@ impl IngressObject {
}
/// Fetches and extracts text from a URL.
async fn fetch_text_from_url(url: &str) -> Result<String, IngressContentError> {
async fn fetch_text_from_url(_url: &str) -> Result<String, IngressContentError> {
unimplemented!()
}
@@ -74,6 +74,10 @@ impl IngressObject {
// TODO: Implement OCR on image using a crate like `tesseract`
Err(IngressContentError::UnsupportedMime(file_info.mime_type.clone()))
}
"application/octet-stream" => {
let content = tokio::fs::read_to_string(&file_info.path).await?;
Ok(content)
}
// Handle other MIME types as needed
_ => Err(IngressContentError::UnsupportedMime(file_info.mime_type.clone())),
}

View File

@@ -1,4 +1,7 @@
use async_openai::types::{ ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage, CreateChatCompletionRequestArgs};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tracing::info;
use crate::models::file_info::FileInfo;
use thiserror::Error;
@@ -11,6 +14,31 @@ pub struct TextContent {
pub category: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct LLMAnalysis {
pub json_ld: serde_json::Value,
pub description: String,
pub related_category: String,
pub instructions: String,
}
/// Error types for processing `TextContent`.
#[derive(Error, Debug)]
pub enum ProcessingError {
#[error("LLM processing error: {0}")]
LLMError(String),
#[error("Graph DB storage error: {0}")]
GraphDBError(String),
#[error("Vector DB storage error: {0}")]
VectorDBError(String),
#[error("Unknown processing error")]
Unknown,
}
impl TextContent {
/// Creates a new `TextContent` instance.
pub fn new(text: String, file_info: Option<FileInfo>, instructions: String, category: String) -> Self {
@@ -26,42 +54,101 @@ impl TextContent {
pub async fn process(&self) -> Result<(), ProcessingError> {
// Step 1: Send to LLM for analysis
let analysis = self.send_to_llm().await?;
info!("{:?}", analysis);
// Step 2: Store analysis results in Graph DB
self.store_in_graph_db(&analysis).await?;
// self.store_in_graph_db(&analysis).await?;
// Step 3: Split text and store in Vector DB
self.store_in_vector_db().await?;
// self.store_in_vector_db().await?;
Ok(())
}
/// Sends text to an LLM for analysis.
async fn send_to_llm(&self) -> Result<LLMAnalysis, ProcessingError> {
// TODO: Implement interaction with your specific LLM API.
// Example using reqwest:
/*
let client = reqwest::Client::new();
let response = client.post("http://llm-api/analyze")
.json(&serde_json::json!({ "text": self.text }))
.send()
.await
.map_err(|e| ProcessingError::LLMError(e.to_string()))?;
if !response.status().is_success() {
return Err(ProcessingError::LLMError(format!("LLM API returned status: {}", response.status())));
let client = async_openai::Client::new();
// Define the JSON Schema for the expected response
let schema = json!({
"type": "object",
"properties": {
"json_ld": {
"type": "object",
"properties": {
"@context": { "type": "string" },
"@type": { "type": "string" },
"name": { "type": "string" }
// Define only the essential properties
},
"required": ["@context", "@type", "name"],
"additionalProperties": false
},
"description": { "type": "string" },
"related_category": { "type": "string" },
"instructions": { "type": "string" }
},
"required": ["json_ld", "description", "related_category", "instructions"],
"additionalProperties": false
});
let response_format = async_openai::types::ResponseFormat::JsonSchema {
json_schema: async_openai::types::ResponseFormatJsonSchema {
description: Some("Structured analysis of the submitted content".into()),
name: "content_analysis".into(),
schema: Some(schema),
strict: Some(true),
},
};
// Construct the system and user messages
let system_message = format!(
"You are an expert document analyzer. You will receive a document's text content, along with user instructions and a category. Your task is to provide a structured JSON-LD object representing the content, a short description of the document, how it relates to the submitted category, and any relevant instructions."
);
let user_message = format!(
"Category: {}\nInstructions: {}\nContent:\n{}",
self.category, self.instructions, self.text
);
// Build the chat completion request
let request = CreateChatCompletionRequestArgs::default()
.model("gpt-4o-mini")
.max_tokens(1024u32)
.messages([
ChatCompletionRequestSystemMessage::from(system_message).into(),
ChatCompletionRequestUserMessage::from(user_message).into(),
])
.response_format(response_format)
.build().map_err(|e| ProcessingError::LLMError(e.to_string()))?;
// Send the request to OpenAI
let response = client.chat().create(request).await.map_err(|e| {
ProcessingError::LLMError(format!("OpenAI API request failed: {}", e.to_string()))
})?;
info!("{:?}", response);
// Extract and parse the response
for choice in response.choices {
if let Some(content) = choice.message.content {
let analysis: LLMAnalysis = serde_json::from_str(&content).map_err(|e| {
ProcessingError::LLMError(format!(
"Failed to parse LLM response into LLMAnalysis: {}",
e.to_string()
))
})?;
return Ok(analysis);
}
}
let analysis: LLMAnalysis = response.json().await
.map_err(|e| ProcessingError::LLMError(e.to_string()))?;
Ok(analysis)
*/
unimplemented!()
Err(ProcessingError::LLMError(
"No content found in LLM response".into(),
))
}
/// Stores analysis results in a graph database.
async fn store_in_graph_db(&self, analysis: &LLMAnalysis) -> Result<(), ProcessingError> {
async fn store_in_graph_db(&self, _analysis: &LLMAnalysis) -> Result<(), ProcessingError> {
// TODO: Implement storage logic for your specific graph database.
// Example:
/*
@@ -85,28 +172,3 @@ impl TextContent {
unimplemented!()
}
}
/// Represents the analysis results from the LLM.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LLMAnalysis {
pub entities: Vec<String>,
pub summary: String,
// Add other fields based on your LLM's output.
}
/// Error types for processing `TextContent`.
#[derive(Error, Debug)]
pub enum ProcessingError {
#[error("LLM processing error: {0}")]
LLMError(String),
#[error("Graph DB storage error: {0}")]
GraphDBError(String),
#[error("Vector DB storage error: {0}")]
VectorDBError(String),
#[error("Unknown processing error")]
Unknown,
}

View File

@@ -3,7 +3,7 @@ use lapin::{
};
use futures_lite::stream::StreamExt;
use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject};
use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject, text_content::TextContent};
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError};
use tracing::{info, error};
@@ -108,7 +108,9 @@ impl RabbitMQConsumer {
match self.consume().await {
Ok((ingress, delivery)) => {
info!("Received IngressObject: {:?}", ingress);
let text_content = ingress.to_text_content().await.unwrap();
text_content.process().await.unwrap();
self.ack_delivery(delivery).await?;
// Process the IngressContent
// match self.handle_ingress_content(&ingress).await {
@@ -143,109 +145,9 @@ impl RabbitMQConsumer {
Ok(())
}
// /// Processes messages in a loop
// pub async fn process_messages(&self) -> Result<(), RabbitMQError> {
// loop {
// match self.consume().await {
// Ok((ingress, delivery)) => {
// // info!("Received ingress object: {:?}", ingress);
// // Process the ingress object
// self.handle_ingress_content(&ingress).await;
// info!("Processing done, acknowledging message");
// self.ack_delivery(delivery).await?;
// }
// Err(RabbitMQError::ConsumeError(e)) => {
// error!("Error consuming message: {}", e);
// // Optionally add a delay before trying again
// tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// }
// Err(e) => {
// error!("Unexpected error: {}", e);
// break;
// }
// }
// }
// Ok(())
// }
pub async fn handle_ingress_content(&self, ingress: &IngressObject) -> Result<(), IngressContentError> {
pub async fn handle_ingress_content(&self, ingress: &IngressObject) -> Result<(), IngressContentError> {
info!("Processing IngressContent: {:?}", ingress);
// Convert IngressContent to individual TextContent instances
// let text_contents = ingress.to_text_contents().await?;
// info!("Generated {} TextContent instances", text_contents.len());
// // Limit concurrent processing (e.g., 10 at a time)
// let semaphore = Arc::new(Semaphore::new(10));
// let mut processing_futures = FuturesUnordered::new();
// for text_content in text_contents {
// let semaphore_clone = semaphore.clone();
// processing_futures.push(tokio::spawn(async move {
// let _permit = semaphore_clone.acquire().await;
// match text_content.process().await {
// Ok(_) => {
// info!("Successfully processed TextContent");
// Ok(())
// }
// Err(e) => {
// error!("Error processing TextContent: {:?}", e);
// Err(e)
// }
// }
// }));
// }
// // Await all processing tasks
// while let Some(result) = processing_futures.next().await {
// match result {
// Ok(Ok(_)) => {
// // Successfully processed
// }
// Ok(Err(e)) => {
// // Processing failed, already logged
// }
// Err(e) => {
// // Task join error
// error!("Task join error: {:?}", e);
// }
// }
// }
// Ok(())
unimplemented!()
}
}
}
// /// Handles the IngressContent based on its type
// async fn handle_ingress_content(&self, ingress: &IngressContent) {
// info!("Processing content: {:?}", ingress);
// // There are three different situations:
// // 1. There is no ingress.content but there are one or more files
// // - We should process the files and act based upon the mime type
// // - All different kinds of content return text
// // 2. There is ingress.content but there are no files
// // - We process ingress.content differently if its a URL or Text enum
// // - Return text
// // 3. There is ingress.content and files
// // - We do both
// //
// // At the end of processing we have one or several text objects with some associated
// // metadata, such as the FileInfo metadata, or the Url associated to the text
// //
// // There will always be ingress.instructions and ingress.category
// //
// // When we have all the text objects and metadata, we can begin the next processing
// // Here we will:
// // 1. Send the text content and metadata to a LLM for analyzing
// // - We want several things, JSON_LD metadata, possibly actions
// // 2. Store the JSON_LD in a graph database
// // 3. Split up the text intelligently and store it in a vector database
// //
// // We return the function if all succeeds.
// }

View File

@@ -1 +1,2 @@
// pub mod mime;
// pub mod llm;