mirror of
https://github.com/perstarkse/minne.git
synced 2026-04-25 10:18:38 +02:00
refactor: better separation of dependencies to crates
node stuff to html crate only
This commit is contained in:
25
ingestion-pipeline/Cargo.toml
Normal file
25
ingestion-pipeline/Cargo.toml
Normal file
@@ -0,0 +1,25 @@
|
||||
[package]
|
||||
name = "ingestion-pipeline"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
# Workspace dependencies
|
||||
tokio = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
axum = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
async-openai = { workspace = true }
|
||||
surrealdb = { workspace = true }
|
||||
|
||||
tiktoken-rs = "0.6.0"
|
||||
reqwest = {version = "0.12.12", features = ["charset", "json"]}
|
||||
scraper = "0.22.0"
|
||||
chrono = { version = "0.4.39", features = ["serde"] }
|
||||
text-splitter = "0.18.1"
|
||||
uuid = { version = "1.10.0", features = ["v4", "serde"] }
|
||||
|
||||
common = { path = "../common" }
|
||||
composite-retrieval = { path = "../composite-retrieval" }
|
||||
143
ingestion-pipeline/src/enricher.rs
Normal file
143
ingestion-pipeline/src/enricher.rs
Normal file
@@ -0,0 +1,143 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_openai::types::{
|
||||
ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage,
|
||||
CreateChatCompletionRequest, CreateChatCompletionRequestArgs, ResponseFormat,
|
||||
ResponseFormatJsonSchema,
|
||||
};
|
||||
use common::{
|
||||
error::AppError,
|
||||
storage::{
|
||||
db::SurrealDbClient,
|
||||
types::{knowledge_entity::KnowledgeEntity, system_settings::SystemSettings},
|
||||
},
|
||||
};
|
||||
use composite_retrieval::retrieve_entities;
|
||||
use serde_json::json;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::{
|
||||
types::llm_enrichment_result::LLMEnrichmentResult,
|
||||
utils::llm_instructions::{get_ingress_analysis_schema, INGRESS_ANALYSIS_SYSTEM_MESSAGE},
|
||||
};
|
||||
|
||||
pub struct IngestionEnricher {
|
||||
db_client: Arc<SurrealDbClient>,
|
||||
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
|
||||
}
|
||||
|
||||
impl IngestionEnricher {
|
||||
pub fn new(
|
||||
db_client: Arc<SurrealDbClient>,
|
||||
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
db_client,
|
||||
openai_client,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn analyze_content(
|
||||
&self,
|
||||
category: &str,
|
||||
instructions: &str,
|
||||
text: &str,
|
||||
user_id: &str,
|
||||
) -> Result<LLMEnrichmentResult, AppError> {
|
||||
info!("getting similar entitities");
|
||||
let similar_entities = self
|
||||
.find_similar_entities(category, instructions, text, user_id)
|
||||
.await?;
|
||||
info!("got similar entitities");
|
||||
let llm_request = self
|
||||
.prepare_llm_request(category, instructions, text, &similar_entities)
|
||||
.await?;
|
||||
self.perform_analysis(llm_request).await
|
||||
}
|
||||
|
||||
async fn find_similar_entities(
|
||||
&self,
|
||||
category: &str,
|
||||
instructions: &str,
|
||||
text: &str,
|
||||
user_id: &str,
|
||||
) -> Result<Vec<KnowledgeEntity>, AppError> {
|
||||
let input_text = format!(
|
||||
"content: {}, category: {}, user_instructions: {}",
|
||||
text, category, instructions
|
||||
);
|
||||
|
||||
retrieve_entities(&self.db_client, &self.openai_client, &input_text, user_id).await
|
||||
}
|
||||
|
||||
async fn prepare_llm_request(
|
||||
&self,
|
||||
category: &str,
|
||||
instructions: &str,
|
||||
text: &str,
|
||||
similar_entities: &[KnowledgeEntity],
|
||||
) -> Result<CreateChatCompletionRequest, AppError> {
|
||||
let settings = SystemSettings::get_current(&self.db_client).await?;
|
||||
|
||||
let entities_json = json!(similar_entities
|
||||
.iter()
|
||||
.map(|entity| {
|
||||
json!({
|
||||
"KnowledgeEntity": {
|
||||
"id": entity.id,
|
||||
"name": entity.name,
|
||||
"description": entity.description
|
||||
}
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>());
|
||||
|
||||
let user_message = format!(
|
||||
"Category:\n{}\nInstructions:\n{}\nContent:\n{}\nExisting KnowledgeEntities in database:\n{}",
|
||||
category, instructions, text, entities_json
|
||||
);
|
||||
|
||||
debug!("Prepared LLM request message: {}", user_message);
|
||||
|
||||
let response_format = ResponseFormat::JsonSchema {
|
||||
json_schema: ResponseFormatJsonSchema {
|
||||
description: Some("Structured analysis of the submitted content".into()),
|
||||
name: "content_analysis".into(),
|
||||
schema: Some(get_ingress_analysis_schema()),
|
||||
strict: Some(true),
|
||||
},
|
||||
};
|
||||
|
||||
let request = CreateChatCompletionRequestArgs::default()
|
||||
.model(&settings.processing_model)
|
||||
.temperature(0.2)
|
||||
.max_tokens(3048u32)
|
||||
.messages([
|
||||
ChatCompletionRequestSystemMessage::from(INGRESS_ANALYSIS_SYSTEM_MESSAGE).into(),
|
||||
ChatCompletionRequestUserMessage::from(user_message).into(),
|
||||
])
|
||||
.response_format(response_format)
|
||||
.build()?;
|
||||
|
||||
Ok(request)
|
||||
}
|
||||
|
||||
async fn perform_analysis(
|
||||
&self,
|
||||
request: CreateChatCompletionRequest,
|
||||
) -> Result<LLMEnrichmentResult, AppError> {
|
||||
let response = self.openai_client.chat().create(request).await?;
|
||||
|
||||
let content = response
|
||||
.choices
|
||||
.first()
|
||||
.and_then(|choice| choice.message.content.as_ref())
|
||||
.ok_or(AppError::LLMParsing(
|
||||
"No content found in LLM response".into(),
|
||||
))?;
|
||||
|
||||
serde_json::from_str::<LLMEnrichmentResult>(content).map_err(|e| {
|
||||
AppError::LLMParsing(format!("Failed to parse LLM response into analysis: {}", e))
|
||||
})
|
||||
}
|
||||
}
|
||||
103
ingestion-pipeline/src/lib.rs
Normal file
103
ingestion-pipeline/src/lib.rs
Normal file
@@ -0,0 +1,103 @@
|
||||
pub mod enricher;
|
||||
pub mod pipeline;
|
||||
pub mod types;
|
||||
pub mod utils;
|
||||
|
||||
use common::storage::{
|
||||
db::SurrealDbClient,
|
||||
types::ingestion_task::{IngestionTask, IngestionTaskStatus},
|
||||
};
|
||||
use futures::StreamExt;
|
||||
use pipeline::IngestionPipeline;
|
||||
use std::sync::Arc;
|
||||
use surrealdb::Action;
|
||||
use tracing::{error, info};
|
||||
|
||||
pub async fn run_worker_loop(
|
||||
db: Arc<SurrealDbClient>,
|
||||
ingestion_pipeline: Arc<IngestionPipeline>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
loop {
|
||||
// First, check for any unfinished tasks
|
||||
let unfinished_tasks = IngestionTask::get_unfinished_tasks(&db).await?;
|
||||
if !unfinished_tasks.is_empty() {
|
||||
info!("Found {} unfinished jobs", unfinished_tasks.len());
|
||||
for task in unfinished_tasks {
|
||||
ingestion_pipeline.process_task(task).await?;
|
||||
}
|
||||
}
|
||||
|
||||
// If no unfinished jobs, start listening for new ones
|
||||
info!("Listening for new jobs...");
|
||||
let mut job_stream = IngestionTask::listen_for_tasks(&db).await?;
|
||||
while let Some(notification) = job_stream.next().await {
|
||||
match notification {
|
||||
Ok(notification) => {
|
||||
info!("Received notification: {:?}", notification);
|
||||
match notification.action {
|
||||
Action::Create => {
|
||||
if let Err(e) = ingestion_pipeline.process_task(notification.data).await
|
||||
{
|
||||
error!("Error processing task: {}", e);
|
||||
}
|
||||
}
|
||||
Action::Update => {
|
||||
match notification.data.status {
|
||||
IngestionTaskStatus::Completed
|
||||
| IngestionTaskStatus::Error(_)
|
||||
| IngestionTaskStatus::Cancelled => {
|
||||
info!(
|
||||
"Skipping already completed/error/cancelled task: {}",
|
||||
notification.data.id
|
||||
);
|
||||
continue;
|
||||
}
|
||||
IngestionTaskStatus::InProgress { attempts, .. } => {
|
||||
// Only process if this is a retry after an error, not our own update
|
||||
if let Ok(Some(current_task)) =
|
||||
db.get_item::<IngestionTask>(¬ification.data.id).await
|
||||
{
|
||||
match current_task.status {
|
||||
IngestionTaskStatus::Error(_)
|
||||
if attempts
|
||||
< common::storage::types::ingestion_task::MAX_ATTEMPTS =>
|
||||
{
|
||||
// This is a retry after an error
|
||||
if let Err(e) =
|
||||
ingestion_pipeline.process_task(current_task).await
|
||||
{
|
||||
error!("Error processing task retry: {}", e);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
info!(
|
||||
"Skipping in-progress update for task: {}",
|
||||
notification.data.id
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
IngestionTaskStatus::Created => {
|
||||
// Shouldn't happen with Update action, but process if it does
|
||||
if let Err(e) =
|
||||
ingestion_pipeline.process_task(notification.data).await
|
||||
{
|
||||
error!("Error processing task: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {} // Ignore other actions
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Error in job notification: {}", e),
|
||||
}
|
||||
}
|
||||
|
||||
// If we reach here, the stream has ended (connection lost?)
|
||||
error!("Database stream ended unexpectedly, reconnecting...");
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
164
ingestion-pipeline/src/pipeline.rs
Normal file
164
ingestion-pipeline/src/pipeline.rs
Normal file
@@ -0,0 +1,164 @@
|
||||
use std::{sync::Arc, time::Instant};
|
||||
|
||||
use chrono::Utc;
|
||||
use text_splitter::TextSplitter;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use common::{
|
||||
error::AppError,
|
||||
storage::{
|
||||
db::SurrealDbClient,
|
||||
types::{
|
||||
ingestion_task::{IngestionTask, IngestionTaskStatus, MAX_ATTEMPTS},
|
||||
knowledge_entity::KnowledgeEntity,
|
||||
knowledge_relationship::KnowledgeRelationship,
|
||||
text_chunk::TextChunk,
|
||||
text_content::TextContent,
|
||||
},
|
||||
},
|
||||
utils::embedding::generate_embedding,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
enricher::IngestionEnricher,
|
||||
types::{llm_enrichment_result::LLMEnrichmentResult, to_text_content},
|
||||
};
|
||||
|
||||
pub struct IngestionPipeline {
|
||||
db: Arc<SurrealDbClient>,
|
||||
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
|
||||
}
|
||||
|
||||
impl IngestionPipeline {
|
||||
pub async fn new(
|
||||
db: Arc<SurrealDbClient>,
|
||||
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
|
||||
) -> Result<Self, AppError> {
|
||||
Ok(Self { db, openai_client })
|
||||
}
|
||||
pub async fn process_task(&self, task: IngestionTask) -> Result<(), AppError> {
|
||||
let current_attempts = match task.status {
|
||||
IngestionTaskStatus::InProgress { attempts, .. } => attempts + 1,
|
||||
_ => 1,
|
||||
};
|
||||
|
||||
// Update status to InProgress with attempt count
|
||||
IngestionTask::update_status(
|
||||
&task.id,
|
||||
IngestionTaskStatus::InProgress {
|
||||
attempts: current_attempts,
|
||||
last_attempt: Utc::now(),
|
||||
},
|
||||
&self.db,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let text_content = to_text_content(task.content, &self.openai_client, &self.db).await?;
|
||||
|
||||
match self.process(&text_content).await {
|
||||
Ok(_) => {
|
||||
IngestionTask::update_status(&task.id, IngestionTaskStatus::Completed, &self.db)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
if current_attempts >= MAX_ATTEMPTS {
|
||||
IngestionTask::update_status(
|
||||
&task.id,
|
||||
IngestionTaskStatus::Error(format!("Max attempts reached: {}", e)),
|
||||
&self.db,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Err(AppError::Processing(e.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process(&self, content: &TextContent) -> Result<(), AppError> {
|
||||
let now = Instant::now();
|
||||
|
||||
// Perform analyis, this step also includes retrieval
|
||||
let analysis = self.perform_semantic_analysis(content).await?;
|
||||
|
||||
let end = now.elapsed();
|
||||
info!(
|
||||
"{:?} time elapsed during creation of entities and relationships",
|
||||
end
|
||||
);
|
||||
|
||||
// Convert analysis to application objects
|
||||
let (entities, relationships) = analysis
|
||||
.to_database_entities(&content.id, &content.user_id, &self.openai_client)
|
||||
.await?;
|
||||
|
||||
// Store everything
|
||||
tokio::try_join!(
|
||||
self.store_graph_entities(entities, relationships),
|
||||
self.store_vector_chunks(content),
|
||||
)?;
|
||||
|
||||
// Store original content
|
||||
self.db.store_item(content.to_owned()).await?;
|
||||
|
||||
self.db.rebuild_indexes().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn perform_semantic_analysis(
|
||||
&self,
|
||||
content: &TextContent,
|
||||
) -> Result<LLMEnrichmentResult, AppError> {
|
||||
let analyser = IngestionEnricher::new(self.db.clone(), self.openai_client.clone());
|
||||
analyser
|
||||
.analyze_content(
|
||||
&content.category,
|
||||
&content.instructions,
|
||||
&content.text,
|
||||
&content.user_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn store_graph_entities(
|
||||
&self,
|
||||
entities: Vec<KnowledgeEntity>,
|
||||
relationships: Vec<KnowledgeRelationship>,
|
||||
) -> Result<(), AppError> {
|
||||
for entity in &entities {
|
||||
debug!("Storing entity: {:?}", entity);
|
||||
self.db.store_item(entity.clone()).await?;
|
||||
}
|
||||
|
||||
for relationship in &relationships {
|
||||
debug!("Storing relationship: {:?}", relationship);
|
||||
relationship.store_relationship(&self.db).await?;
|
||||
}
|
||||
|
||||
info!(
|
||||
"Stored {} entities and {} relationships",
|
||||
entities.len(),
|
||||
relationships.len()
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn store_vector_chunks(&self, content: &TextContent) -> Result<(), AppError> {
|
||||
let splitter = TextSplitter::new(500..2000);
|
||||
let chunks = splitter.chunks(&content.text);
|
||||
|
||||
// Could potentially process chunks in parallel with a bounded concurrent limit
|
||||
for chunk in chunks {
|
||||
let embedding = generate_embedding(&self.openai_client, chunk).await?;
|
||||
let text_chunk = TextChunk::new(
|
||||
content.id.to_string(),
|
||||
chunk.to_string(),
|
||||
embedding,
|
||||
content.user_id.to_string(),
|
||||
);
|
||||
self.db.store_item(text_chunk).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
182
ingestion-pipeline/src/types/llm_enrichment_result.rs
Normal file
182
ingestion-pipeline/src/types/llm_enrichment_result.rs
Normal file
@@ -0,0 +1,182 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::task;
|
||||
|
||||
use common::{
|
||||
error::AppError,
|
||||
storage::types::{
|
||||
knowledge_entity::{KnowledgeEntity, KnowledgeEntityType},
|
||||
knowledge_relationship::KnowledgeRelationship,
|
||||
},
|
||||
utils::embedding::generate_embedding,
|
||||
};
|
||||
use futures::future::try_join_all;
|
||||
|
||||
use crate::utils::GraphMapper;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct LLMKnowledgeEntity {
|
||||
pub key: String, // Temporary identifier
|
||||
pub name: String,
|
||||
pub description: String,
|
||||
pub entity_type: String, // Should match KnowledgeEntityType variants
|
||||
}
|
||||
|
||||
/// Represents a single relationship from the LLM.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct LLMRelationship {
|
||||
#[serde(rename = "type")]
|
||||
pub type_: String, // e.g., RelatedTo, RelevantTo
|
||||
pub source: String, // Key of the source entity
|
||||
pub target: String, // Key of the target entity
|
||||
}
|
||||
|
||||
/// Represents the entire graph analysis result from the LLM.
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
pub struct LLMEnrichmentResult {
|
||||
pub knowledge_entities: Vec<LLMKnowledgeEntity>,
|
||||
pub relationships: Vec<LLMRelationship>,
|
||||
}
|
||||
|
||||
impl LLMEnrichmentResult {
|
||||
/// Converts the LLM graph analysis result into database entities and relationships.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `source_id` - A UUID representing the source identifier.
|
||||
/// * `openai_client` - OpenAI client for LLM calls.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<(Vec<KnowledgeEntity>, Vec<KnowledgeRelationship>), AppError>` - A tuple containing vectors of `KnowledgeEntity` and `KnowledgeRelationship`.
|
||||
pub async fn to_database_entities(
|
||||
&self,
|
||||
source_id: &str,
|
||||
user_id: &str,
|
||||
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
|
||||
) -> Result<(Vec<KnowledgeEntity>, Vec<KnowledgeRelationship>), AppError> {
|
||||
// Create mapper and pre-assign IDs
|
||||
let mapper = Arc::new(Mutex::new(self.create_mapper()?));
|
||||
|
||||
// Process entities
|
||||
let entities = self
|
||||
.process_entities(source_id, user_id, Arc::clone(&mapper), openai_client)
|
||||
.await?;
|
||||
|
||||
// Process relationships
|
||||
let relationships = self.process_relationships(source_id, user_id, Arc::clone(&mapper))?;
|
||||
|
||||
Ok((entities, relationships))
|
||||
}
|
||||
|
||||
fn create_mapper(&self) -> Result<GraphMapper, AppError> {
|
||||
let mut mapper = GraphMapper::new();
|
||||
|
||||
// Pre-assign all IDs
|
||||
for entity in &self.knowledge_entities {
|
||||
mapper.assign_id(&entity.key);
|
||||
}
|
||||
|
||||
Ok(mapper)
|
||||
}
|
||||
|
||||
async fn process_entities(
|
||||
&self,
|
||||
source_id: &str,
|
||||
user_id: &str,
|
||||
mapper: Arc<Mutex<GraphMapper>>,
|
||||
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
|
||||
) -> Result<Vec<KnowledgeEntity>, AppError> {
|
||||
let futures: Vec<_> = self
|
||||
.knowledge_entities
|
||||
.iter()
|
||||
.map(|entity| {
|
||||
let mapper = Arc::clone(&mapper);
|
||||
let openai_client = openai_client.clone();
|
||||
let source_id = source_id.to_string();
|
||||
let user_id = user_id.to_string();
|
||||
let entity = entity.clone();
|
||||
|
||||
task::spawn(async move {
|
||||
create_single_entity(&entity, &source_id, &user_id, mapper, &openai_client)
|
||||
.await
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
let results = try_join_all(futures)
|
||||
.await?
|
||||
.into_iter()
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
fn process_relationships(
|
||||
&self,
|
||||
source_id: &str,
|
||||
user_id: &str,
|
||||
mapper: Arc<Mutex<GraphMapper>>,
|
||||
) -> Result<Vec<KnowledgeRelationship>, AppError> {
|
||||
let mut mapper_guard = mapper
|
||||
.lock()
|
||||
.map_err(|_| AppError::GraphMapper("Failed to lock mapper".into()))?;
|
||||
self.relationships
|
||||
.iter()
|
||||
.map(|rel| {
|
||||
let source_db_id = mapper_guard.get_or_parse_id(&rel.source);
|
||||
let target_db_id = mapper_guard.get_or_parse_id(&rel.target);
|
||||
|
||||
Ok(KnowledgeRelationship::new(
|
||||
source_db_id.to_string(),
|
||||
target_db_id.to_string(),
|
||||
user_id.to_string(),
|
||||
source_id.to_string(),
|
||||
rel.type_.clone(),
|
||||
))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
async fn create_single_entity(
|
||||
llm_entity: &LLMKnowledgeEntity,
|
||||
source_id: &str,
|
||||
user_id: &str,
|
||||
mapper: Arc<Mutex<GraphMapper>>,
|
||||
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
|
||||
) -> Result<KnowledgeEntity, AppError> {
|
||||
let assigned_id = {
|
||||
let mapper = mapper
|
||||
.lock()
|
||||
.map_err(|_| AppError::GraphMapper("Failed to lock mapper".into()))?;
|
||||
mapper
|
||||
.get_id(&llm_entity.key)
|
||||
.ok_or_else(|| {
|
||||
AppError::GraphMapper(format!("ID not found for key: {}", llm_entity.key))
|
||||
})?
|
||||
.to_string()
|
||||
};
|
||||
|
||||
let embedding_input = format!(
|
||||
"name: {}, description: {}, type: {}",
|
||||
llm_entity.name, llm_entity.description, llm_entity.entity_type
|
||||
);
|
||||
|
||||
let embedding = generate_embedding(openai_client, &embedding_input).await?;
|
||||
|
||||
let now = Utc::now();
|
||||
Ok(KnowledgeEntity {
|
||||
id: assigned_id,
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
name: llm_entity.name.to_string(),
|
||||
description: llm_entity.description.to_string(),
|
||||
entity_type: KnowledgeEntityType::from(llm_entity.entity_type.to_string()),
|
||||
source_id: source_id.to_string(),
|
||||
metadata: None,
|
||||
embedding,
|
||||
user_id: user_id.into(),
|
||||
})
|
||||
}
|
||||
258
ingestion-pipeline/src/types/mod.rs
Normal file
258
ingestion-pipeline/src/types/mod.rs
Normal file
@@ -0,0 +1,258 @@
|
||||
pub mod llm_enrichment_result;
|
||||
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use async_openai::types::{
|
||||
ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage,
|
||||
CreateChatCompletionRequestArgs,
|
||||
};
|
||||
use common::storage::db::SurrealDbClient;
|
||||
use common::{
|
||||
error::AppError,
|
||||
storage::types::{
|
||||
file_info::FileInfo, ingestion_payload::IngestionPayload, system_settings::SystemSettings,
|
||||
text_content::TextContent,
|
||||
},
|
||||
};
|
||||
use reqwest;
|
||||
use scraper::{Html, Selector};
|
||||
use std::fmt::Write;
|
||||
use tiktoken_rs::{o200k_base, CoreBPE};
|
||||
|
||||
pub async fn to_text_content(
|
||||
ingestion_payload: IngestionPayload,
|
||||
openai_client: &Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
|
||||
db_client: &Arc<SurrealDbClient>,
|
||||
) -> Result<TextContent, AppError> {
|
||||
match ingestion_payload {
|
||||
IngestionPayload::Url {
|
||||
url,
|
||||
instructions,
|
||||
category,
|
||||
user_id,
|
||||
} => {
|
||||
let text = fetch_text_from_url(&url, openai_client, db_client).await?;
|
||||
Ok(TextContent::new(
|
||||
text,
|
||||
instructions,
|
||||
category,
|
||||
None,
|
||||
Some(url),
|
||||
user_id,
|
||||
))
|
||||
}
|
||||
IngestionPayload::Text {
|
||||
text,
|
||||
instructions,
|
||||
category,
|
||||
user_id,
|
||||
} => Ok(TextContent::new(
|
||||
text,
|
||||
instructions,
|
||||
category,
|
||||
None,
|
||||
None,
|
||||
user_id,
|
||||
)),
|
||||
IngestionPayload::File {
|
||||
file_info,
|
||||
instructions,
|
||||
category,
|
||||
user_id,
|
||||
} => {
|
||||
let text = extract_text_from_file(&file_info).await?;
|
||||
Ok(TextContent::new(
|
||||
text,
|
||||
instructions,
|
||||
category,
|
||||
Some(file_info),
|
||||
None,
|
||||
user_id,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get text from url, will return it as a markdown formatted string
|
||||
async fn fetch_text_from_url(
|
||||
url: &str,
|
||||
openai_client: &Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
|
||||
db_client: &Arc<SurrealDbClient>,
|
||||
) -> Result<String, AppError> {
|
||||
// Use a client with timeouts and reuse
|
||||
let client = reqwest::ClientBuilder::new()
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()?;
|
||||
let response = client.get(url).send().await?.text().await?;
|
||||
|
||||
// Preallocate string with capacity
|
||||
let mut structured_content = String::with_capacity(response.len() / 2);
|
||||
|
||||
let document = Html::parse_document(&response);
|
||||
let main_selectors = Selector::parse(
|
||||
"article, main, .article-content, .post-content, .entry-content, [role='main']",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let content_element = document
|
||||
.select(&main_selectors)
|
||||
.next()
|
||||
.or_else(|| document.select(&Selector::parse("body").unwrap()).next())
|
||||
.ok_or(AppError::NotFound("No content found".into()))?;
|
||||
|
||||
// Compile selectors once
|
||||
let heading_selector = Selector::parse("h1, h2, h3").unwrap();
|
||||
let paragraph_selector = Selector::parse("p").unwrap();
|
||||
|
||||
// Process content in one pass
|
||||
for element in content_element.select(&heading_selector) {
|
||||
let _ = writeln!(
|
||||
structured_content,
|
||||
"<heading>{}</heading>",
|
||||
element.text().collect::<String>().trim()
|
||||
);
|
||||
}
|
||||
for element in content_element.select(¶graph_selector) {
|
||||
let _ = writeln!(
|
||||
structured_content,
|
||||
"<paragraph>{}</paragraph>",
|
||||
element.text().collect::<String>().trim()
|
||||
);
|
||||
}
|
||||
|
||||
let content = structured_content
|
||||
.replace(|c: char| c.is_control(), " ")
|
||||
.replace(" ", " ");
|
||||
|
||||
process_web_content(content, openai_client, db_client).await
|
||||
}
|
||||
|
||||
pub async fn process_web_content(
|
||||
content: String,
|
||||
openai_client: &Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
|
||||
db_client: &Arc<SurrealDbClient>,
|
||||
) -> Result<String, AppError> {
|
||||
const MAX_TOKENS: usize = 122000;
|
||||
const SYSTEM_PROMPT: &str = r#"
|
||||
You are a precise content extractor for web pages. Your task:
|
||||
|
||||
1. Extract ONLY the main article/content from the provided text
|
||||
2. Maintain the original content - do not summarize or modify the core information
|
||||
3. Ignore peripheral content such as:
|
||||
- Navigation elements
|
||||
- Error messages (e.g., "JavaScript required")
|
||||
- Related articles sections
|
||||
- Comments
|
||||
- Social media links
|
||||
- Advertisement text
|
||||
|
||||
FORMAT:
|
||||
- Convert <heading> tags to markdown headings (#, ##, ###)
|
||||
- Convert <paragraph> tags to markdown paragraphs
|
||||
- Preserve quotes and important formatting
|
||||
- Remove duplicate content
|
||||
- Remove any metadata or technical artifacts
|
||||
|
||||
OUTPUT RULES:
|
||||
- Output ONLY the cleaned content in markdown
|
||||
- Do not add any explanations or meta-commentary
|
||||
- Do not add summaries or conclusions
|
||||
- Do not use any XML/HTML tags in the output
|
||||
"#;
|
||||
|
||||
let bpe = o200k_base()?;
|
||||
let settings = SystemSettings::get_current(db_client).await?;
|
||||
|
||||
// Process content in chunks if needed
|
||||
let truncated_content = if bpe.encode_with_special_tokens(&content).len() > MAX_TOKENS {
|
||||
truncate_content(&content, MAX_TOKENS, &bpe)?
|
||||
} else {
|
||||
content
|
||||
};
|
||||
|
||||
let request = CreateChatCompletionRequestArgs::default()
|
||||
.model(&settings.processing_model)
|
||||
.temperature(0.0)
|
||||
.max_tokens(16200u32)
|
||||
.messages([
|
||||
ChatCompletionRequestSystemMessage::from(SYSTEM_PROMPT).into(),
|
||||
ChatCompletionRequestUserMessage::from(truncated_content).into(),
|
||||
])
|
||||
.build()?;
|
||||
|
||||
let response = openai_client.chat().create(request).await?;
|
||||
|
||||
// Extract and return the content
|
||||
response
|
||||
.choices
|
||||
.first()
|
||||
.and_then(|choice| choice.message.content.clone())
|
||||
.ok_or(AppError::LLMParsing(
|
||||
"No content found in LLM response".into(),
|
||||
))
|
||||
}
|
||||
|
||||
fn truncate_content(
|
||||
content: &str,
|
||||
max_tokens: usize,
|
||||
tokenizer: &CoreBPE,
|
||||
) -> Result<String, AppError> {
|
||||
// Pre-allocate with estimated size
|
||||
let mut result = String::with_capacity(content.len() / 2);
|
||||
let mut current_tokens = 0;
|
||||
|
||||
// Process content by paragraph to maintain context
|
||||
for paragraph in content.split("\n\n") {
|
||||
let tokens = tokenizer.encode_with_special_tokens(paragraph).len();
|
||||
|
||||
// Check if adding paragraph exceeds limit
|
||||
if current_tokens + tokens > max_tokens {
|
||||
break;
|
||||
}
|
||||
|
||||
result.push_str(paragraph);
|
||||
result.push_str("\n\n");
|
||||
current_tokens += tokens;
|
||||
}
|
||||
|
||||
// Ensure we return valid content
|
||||
if result.is_empty() {
|
||||
return Err(AppError::Processing("Content exceeds token limit".into()));
|
||||
}
|
||||
|
||||
Ok(result.trim_end().to_string())
|
||||
}
|
||||
|
||||
/// Extracts text from a file based on its MIME type.
|
||||
async fn extract_text_from_file(file_info: &FileInfo) -> Result<String, AppError> {
|
||||
match file_info.mime_type.as_str() {
|
||||
"text/plain" => {
|
||||
// Read the file and return its content
|
||||
let content = tokio::fs::read_to_string(&file_info.path).await?;
|
||||
Ok(content)
|
||||
}
|
||||
"text/markdown" => {
|
||||
// Read the file and return its content
|
||||
let content = tokio::fs::read_to_string(&file_info.path).await?;
|
||||
Ok(content)
|
||||
}
|
||||
"application/pdf" => {
|
||||
// TODO: Implement PDF text extraction using a crate like `pdf-extract` or `lopdf`
|
||||
Err(AppError::NotFound(file_info.mime_type.clone()))
|
||||
}
|
||||
"image/png" | "image/jpeg" => {
|
||||
// TODO: Implement OCR on image using a crate like `tesseract`
|
||||
Err(AppError::NotFound(file_info.mime_type.clone()))
|
||||
}
|
||||
"application/octet-stream" => {
|
||||
let content = tokio::fs::read_to_string(&file_info.path).await?;
|
||||
Ok(content)
|
||||
}
|
||||
"text/x-rust" => {
|
||||
let content = tokio::fs::read_to_string(&file_info.path).await?;
|
||||
Ok(content)
|
||||
}
|
||||
// Handle other MIME types as needed
|
||||
_ => Err(AppError::NotFound(file_info.mime_type.clone())),
|
||||
}
|
||||
}
|
||||
41
ingestion-pipeline/src/utils/llm_instructions.rs
Normal file
41
ingestion-pipeline/src/utils/llm_instructions.rs
Normal file
@@ -0,0 +1,41 @@
|
||||
use common::storage::types::system_prompts::DEFAULT_INGRESS_ANALYSIS_SYSTEM_PROMPT;
|
||||
use serde_json::json;
|
||||
|
||||
pub static INGRESS_ANALYSIS_SYSTEM_MESSAGE: &str = DEFAULT_INGRESS_ANALYSIS_SYSTEM_PROMPT;
|
||||
|
||||
pub fn get_ingress_analysis_schema() -> serde_json::Value {
|
||||
json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"knowledge_entities": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"key": { "type": "string" },
|
||||
"name": { "type": "string" },
|
||||
"description": { "type": "string" },
|
||||
"entity_type": { "type": "string" }
|
||||
},
|
||||
"required": ["key", "name", "description", "entity_type"],
|
||||
"additionalProperties": false
|
||||
}
|
||||
},
|
||||
"relationships": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": { "type": "string" },
|
||||
"source": { "type": "string" },
|
||||
"target": { "type": "string" }
|
||||
},
|
||||
"required": ["type", "source", "target"],
|
||||
"additionalProperties": false
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["knowledge_entities", "relationships"],
|
||||
"additionalProperties": false
|
||||
})
|
||||
}
|
||||
44
ingestion-pipeline/src/utils/mod.rs
Normal file
44
ingestion-pipeline/src/utils/mod.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
pub mod llm_instructions;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Intermediate struct to hold mapping between LLM keys and generated IDs.
|
||||
#[derive(Clone)]
|
||||
pub struct GraphMapper {
|
||||
pub key_to_id: HashMap<String, Uuid>,
|
||||
}
|
||||
|
||||
impl Default for GraphMapper {
|
||||
fn default() -> Self {
|
||||
GraphMapper::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl GraphMapper {
|
||||
pub fn new() -> Self {
|
||||
GraphMapper {
|
||||
key_to_id: HashMap::new(),
|
||||
}
|
||||
}
|
||||
/// Get ID, tries to parse UUID
|
||||
pub fn get_or_parse_id(&mut self, key: &str) -> Uuid {
|
||||
if let Ok(parsed_uuid) = Uuid::parse_str(key) {
|
||||
parsed_uuid
|
||||
} else {
|
||||
*self.key_to_id.get(key).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
/// Assigns a new UUID for a given key.
|
||||
pub fn assign_id(&mut self, key: &str) -> Uuid {
|
||||
let id = Uuid::new_v4();
|
||||
self.key_to_id.insert(key.to_string(), id);
|
||||
id
|
||||
}
|
||||
|
||||
/// Retrieves the UUID for a given key.
|
||||
pub fn get_id(&self, key: &str) -> Option<&Uuid> {
|
||||
self.key_to_id.get(key)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user