From 15dc0477bfd149806d75cbe347e20c32fa2c8e05 Mon Sep 17 00:00:00 2001 From: Per Stark Date: Wed, 16 Oct 2024 09:23:42 +0200 Subject: [PATCH] del redis & neo4j --- src/lib.rs | 2 - src/models/file_info.rs | 44 ++--- src/models/ingress_content.rs | 8 +- src/models/text_content.rs | 16 +- src/neo4j/client.rs | 183 ------------------ src/neo4j/mod.rs | 1 - src/redis/client.rs | 337 ---------------------------------- src/redis/mod.rs | 1 - src/routes/file.rs | 32 ++-- src/routes/ingress.rs | 6 +- src/server.rs | 10 +- src/surrealdb/document.rs | 33 ---- src/surrealdb/graph.rs | 1 - src/surrealdb/mod.rs | 4 +- src/utils/llm.rs | 8 +- 15 files changed, 53 insertions(+), 633 deletions(-) delete mode 100644 src/neo4j/client.rs delete mode 100644 src/neo4j/mod.rs delete mode 100644 src/redis/client.rs delete mode 100644 src/redis/mod.rs delete mode 100644 src/surrealdb/document.rs delete mode 100644 src/surrealdb/graph.rs diff --git a/src/lib.rs b/src/lib.rs index 3541a23..9235438 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,5 @@ pub mod models; -pub mod neo4j; pub mod rabbitmq; -pub mod redis; pub mod routes; pub mod surrealdb; pub mod utils; diff --git a/src/models/file_info.rs b/src/models/file_info.rs index a1803b0..3b28811 100644 --- a/src/models/file_info.rs +++ b/src/models/file_info.rs @@ -14,10 +14,11 @@ use thiserror::Error; use tracing::{debug, info}; use uuid::Uuid; -use crate::{redis::client::{RedisClient, RedisClientTrait}, surrealdb::SurrealDbClient}; +use crate::surrealdb::SurrealDbClient; -#[derive(Debug, Deserialize)] +#[derive(Debug,Deserialize)] struct Record { + #[allow(dead_code)] id: RecordId, } @@ -48,9 +49,6 @@ pub enum FileError { #[error("SurrealDB error: {0}")] SurrealError(#[from] surrealdb::Error), - #[error("Redis error: {0}")] - RedisError(#[from] crate::redis::client::RedisError), - #[error("File not found for UUID: {0}")] FileNotFound(String), @@ -85,7 +83,6 @@ impl IntoResponse for FileError { FileError::Utf8(_) => (StatusCode::BAD_REQUEST, "Invalid UTF-8 data"), FileError::MimeDetection(_) => (StatusCode::BAD_REQUEST, "MIME type detection failed"), FileError::UnsupportedMime(_) => (StatusCode::UNSUPPORTED_MEDIA_TYPE, "Unsupported MIME type"), - FileError::RedisError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Redis error"), FileError::FileNotFound(_) => (StatusCode::NOT_FOUND, "File not found"), FileError::DuplicateFile(_) => (StatusCode::CONFLICT, "Duplicate file detected"), FileError::HashCollision => (StatusCode::INTERNAL_SERVER_ERROR, "Hash collision detected"), @@ -122,9 +119,10 @@ impl FileInfo { info!("SHA256: {:?}", sha); // Check if SHA exists in SurrealDB - if let Some(existing_file_info) = Self::get_by_sha(&sha, db_client).await? { - info!("Duplicate file detected with SHA256: {}", sha); - return Ok(existing_file_info); + if let Ok(file) = Self::get_by_sha(&sha, db_client).await { + info!("File already exists in database with SHA256: {}", sha); + // SHA exists: return FileInfo + return Ok(file); } // Generate a new UUID @@ -176,9 +174,10 @@ impl FileInfo { let new_sha = Self::get_sha(&new_file).await?; // Check if the new SHA already exists - if let Some(existing_file_info) = Self::get_by_sha(&new_sha, &db_client).await? { - info!("Duplicate file detected with SHA256: {}", new_sha); - return Ok(existing_file_info); + if let Ok(file) = Self::get_by_sha(&new_sha, db_client).await { + info!("File already exists in database with SHA256: {}", new_sha); + // SHA exists: return FileInfo + return Ok(file); } // Sanitize new file name @@ -191,8 +190,8 @@ impl FileInfo { let new_mime_type = Self::guess_mime_type(&new_persisted_path); // Get the existing item and remove it - let old_record = Self::get_by_uuid(uuid, &db_client).await?; - Self::delete_record(&old_record.sha256, &db_client).await?; + let old_record = Self::get_by_uuid(uuid, db_client).await?; + Self::delete_record(&old_record.sha256, db_client).await?; // Update FileInfo let updated_file_info = FileInfo { @@ -203,7 +202,7 @@ impl FileInfo { }; // Save the new item - Self::create_record(&updated_file_info,&db_client).await?; + Self::create_record(&updated_file_info,db_client).await?; // Optionally, delete the old file from the filesystem if it's no longer referenced // This requires reference counting or checking if other FileInfo entries point to the same SHA @@ -222,7 +221,7 @@ impl FileInfo { /// * `Result<(), FileError>` - Empty result or an error. pub async fn delete(uuid: Uuid, db_client: &SurrealDbClient) -> Result<(), FileError> { // Retrieve FileInfo to get SHA256 and path - let file_info = Self::get_by_uuid(uuid, &db_client).await?; + let file_info = Self::get_by_uuid(uuid, db_client).await?; // Delete the file from the filesystem let file_path = Path::new(&file_info.path); @@ -234,7 +233,7 @@ impl FileInfo { } // Delete the FileInfo from database - Self::delete_record(&file_info.sha256, &db_client).await?; + Self::delete_record(&file_info.sha256, db_client).await?; // Remove the UUID directory if empty let uuid_dir = file_path.parent().ok_or(FileError::FileNotFound(uuid.to_string()))?; @@ -324,8 +323,6 @@ impl FileInfo { /// # Returns /// * `Result<(), FileError>` - Empty result or an error. async fn create_record(file_info: &FileInfo, db_client: &SurrealDbClient) -> Result<(), FileError> { - // Define the table and primary key - // Create the record let _created: Option = db_client .client @@ -352,7 +349,7 @@ impl FileInfo { let query = format!("SELECT * FROM file WHERE uuid = '{}'", uuid); let response: Vec = db_client.client.query(query).await?.take(0)?; - Ok(response.into_iter().next().ok_or(FileError::FileNotFound(uuid.to_string()))?) + response.into_iter().next().ok_or(FileError::FileNotFound(uuid.to_string())) } /// Retrieves a `FileInfo` by SHA256. @@ -363,14 +360,13 @@ impl FileInfo { /// /// # Returns /// * `Result, FileError>` - The `FileInfo` or `None` if not found. - async fn get_by_sha(sha256: &str, db_client: &SurrealDbClient) -> Result, FileError> { - let query = format!("SELECT * FROM file WHERE sha256 = '{}'", sha256); + async fn get_by_sha(sha256: &str, db_client: &SurrealDbClient) -> Result { + let query = format!("SELECT * FROM file WHERE sha256 = '{}'", &sha256); let response: Vec = db_client.client.query(query).await?.take(0)?; debug!("{:?}", response); - Ok(response.into_iter().next()) - } + response.into_iter().next().ok_or(FileError::FileNotFound(sha256.to_string())) } /// Deletes a `FileInfo` record by SHA256. /// diff --git a/src/models/ingress_content.rs b/src/models/ingress_content.rs index a410ecf..ed37371 100644 --- a/src/models/ingress_content.rs +++ b/src/models/ingress_content.rs @@ -3,11 +3,9 @@ use thiserror::Error; use tracing::info; use url::Url; use uuid::Uuid; -use crate::{redis::client::RedisClient, surrealdb::SurrealDbClient}; - +use crate::surrealdb::SurrealDbClient; use super::{file_info::FileInfo, ingress_object::IngressObject }; - /// Struct defining the expected body when ingressing content. #[derive(Serialize, Deserialize, Debug)] pub struct IngressInput { @@ -83,8 +81,8 @@ pub async fn create_ingress_objects( if let Some(file_uuids) = input.files { for uuid_str in file_uuids { let uuid = Uuid::parse_str(&uuid_str)?; - match FileInfo::get_by_uuid(uuid, &db_client).await { - Ok(Some(file_info)) => { + match FileInfo::get_by_uuid(uuid, db_client).await { + Ok(file_info) => { object_list.push(IngressObject::File { file_info, instructions: input.instructions.clone(), diff --git a/src/models/text_content.rs b/src/models/text_content.rs index 98b80e6..f145107 100644 --- a/src/models/text_content.rs +++ b/src/models/text_content.rs @@ -1,7 +1,7 @@ use serde::{Deserialize, Serialize}; use tracing::info; use uuid::Uuid; -use crate::{models::file_info::FileInfo, neo4j::client::Neo4jClient, utils::llm::create_json_ld}; +use crate::{models::file_info::FileInfo, utils::llm::create_json_ld}; use thiserror::Error; /// Represents a single piece of text content extracted from various sources. @@ -60,21 +60,21 @@ pub enum ProcessingError { impl TextContent { /// Processes the `TextContent` by sending it to an LLM, storing in a graph DB, and vector DB. pub async fn process(&self) -> Result<(), ProcessingError> { - let client = Neo4jClient::new("127.0.0.1:7687", "neo4j", "neo4j").await.expect("Failed to create Neo4j client"); + // let client = Neo4jClient::new("127.0.0.1:7687", "neo4j", "neo4j").await.expect("Failed to create Neo4j client"); // Step 1: Send to LLM for analysis let analysis = create_json_ld(&self.category, &self.instructions, &self.text).await?; info!("{:?}", &analysis); // Step 2: Store analysis results in Graph DB - client.store_knowledge_source(&analysis.knowledge_source).await?; + // client.store_knowledge_source(&analysis.knowledge_source).await?; // Step 3: Store relationships in Graph DB - for relationship in analysis.knowledge_source.relationships.iter() { - client - .store_relationship(&analysis.knowledge_source.id, relationship) - .await?; - } + // for relationship in analysis.knowledge_source.relationships.iter() { + // client + // .store_relationship(&analysis.knowledge_source.id, relationship) + // .await?; + // } // Step 3: Split text and store in Vector DB diff --git a/src/neo4j/client.rs b/src/neo4j/client.rs deleted file mode 100644 index 7ebbb26..0000000 --- a/src/neo4j/client.rs +++ /dev/null @@ -1,183 +0,0 @@ -use neo4rs::*; -use tracing::info; -use std::error::Error; - -use crate::models::text_content::{KnowledgeSource, ProcessingError, Relationship}; - -/// A client for interacting with the Neo4j graph database. -pub struct Neo4jClient { - /// The Neo4j graph instance. - graph: Graph, -} - -impl Neo4jClient { - /// Creates a new `Neo4jClient` instance by connecting to the Neo4j database. - /// - /// # Arguments - /// - /// * `uri` - The URI of the Neo4j database (e.g., "127.0.0.1:7687"). - /// * `user` - The username for authentication. - /// * `pass` - The password for authentication. - /// - /// # Errors - /// - /// Returns an `Err` variant if the connection to the database fails. - pub async fn new(uri: &str, user: &str, pass: &str) -> Result> { - // Initialize the Neo4j graph connection. - let graph = Graph::new(uri, user, pass).await?; - Ok(Neo4jClient { graph }) - } - - /// Stores a knowledge source in the Neo4j database. - /// - /// # Arguments - /// - /// * `source` - A reference to the `KnowledgeSource` to be stored. - /// - /// # Errors - /// - /// Returns an `Err` variant if the database operation fails. - pub async fn store_knowledge_source( - &self, - source: &KnowledgeSource, - ) -> Result<(), ProcessingError> { - // Cypher query to create a knowledge source node with properties. - let cypher_query = " - CREATE (ks:KnowledgeSource { - id: $id, - type: $type, - title: $title, - description: $description - }) - RETURN ks - "; - - // Execute the query with parameters. - let _ = self.graph - .run( - query(cypher_query) - .param("id", source.id.to_string()) - .param("title", source.title.to_string()) - .param("description", source.description.to_string()), - ) - .await.map_err(|e| ProcessingError::GraphDBError(e.to_string())); - - info!("Stored knowledge source"); - - Ok(()) - } - - /// Stores a relationship between two knowledge sources in the Neo4j database. - /// - /// # Arguments - /// - /// * `source_id` - The ID of the source knowledge source. - /// * `relationship` - A reference to the `Relationship` defining the connection. - /// - /// # Errors - /// - /// Returns an `Err` variant if the database operation fails. - pub async fn store_relationship( - &self, - source_id: &str, - relationship: &Relationship, - ) -> Result<(), ProcessingError> { - // Cypher query to create a relationship between two knowledge source nodes. - let cypher_query = format!( - " - MATCH (a:KnowledgeSource {{id: $source_id}}) - MATCH (b:KnowledgeSource {{id: $target_id}}) - CREATE (a)-[:{}]->(b) - RETURN a, b - ", - relationship.type_ - ); - - // Execute the query with parameters. - let _ = self.graph - .run( - query(&cypher_query) - .param("source_id", source_id) - .param("target", relationship.target.to_string()), - ) - .await.map_err(|e| ProcessingError::GraphDBError(e.to_string())); - - info!("Stored knowledge relationship"); - - Ok(()) - } - - //// Closes the connection to the Neo4j database. - //// - //// This function ensures that all pending operations are completed before shutting down. - // pub async fn close(self) -> Result<(), Box> { - // self.graph.close().await?; - // Ok(()) - // } -} - -// #[cfg(test)] -// mod tests { -// use super::*; -// use uuid::Uuid; - -// /// Tests the `store_knowledge_source` and `store_relationship` functions. -// /// -// /// # Note -// /// -// /// Ensure that a Neo4j database is running and accessible at the specified URI -// /// before running these tests. -// #[tokio::test] -// async fn test_store_functions() { -// // Initialize the Neo4j client. -// let client = Neo4jClient::new("127.0.0.1:7687", "neo4j", "neo") -// .await -// .expect("Failed to create Neo4j client"); - -// // Create a first knowledge source. -// let source1 = KnowledgeSource { -// id: Uuid::new_v4().to_string(), -// source_type: "Document".into(), -// title: "Understanding Neural Networks".into(), -// description: -// "An in-depth analysis of neural networks and their applications in machine learning." -// .into(), -// }; - -// // Store the first knowledge source. -// client -// .store_knowledge_source(&source1) -// .await -// .expect("Failed to store knowledge source 1"); - -// // Create a second knowledge source. -// let source2 = KnowledgeSource { -// id: Uuid::new_v4().to_string(), -// source_type: "Document".into(), -// title: "Machine Learning Basics".into(), -// description: "A foundational text on machine learning principles and techniques." -// .into(), -// }; - -// // Store the second knowledge source. -// client -// .store_knowledge_source(&source2) -// .await -// .expect("Failed to store knowledge source 2"); - -// // Define a relationship between the two sources. -// let relationship = Relationship { -// relationship_type: "RelatedTo".into(), -// target_id: source2.id.clone(), -// }; - -// // Store the relationship from source1 to source2. -// client -// .store_relationship(&source1.id, &relationship) -// .await -// .expect("Failed to store relationship"); - -// // Clean up by closing the client. -// client.close().await.expect("Failed to close Neo4j client"); -// } -// } diff --git a/src/neo4j/mod.rs b/src/neo4j/mod.rs deleted file mode 100644 index b9babe5..0000000 --- a/src/neo4j/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod client; diff --git a/src/redis/client.rs b/src/redis/client.rs deleted file mode 100644 index 1f629aa..0000000 --- a/src/redis/client.rs +++ /dev/null @@ -1,337 +0,0 @@ -use axum::async_trait; -use redis::AsyncCommands; -use thiserror::Error; -use uuid::Uuid; - -use crate::models::file_info::FileInfo; - -/// Represents errors that can occur during Redis operations. -#[derive(Error, Debug)] -pub enum RedisError { - #[error("Redis connection error: {0}")] - ConnectionError(String), - - #[error("Redis command error: {0}")] - CommandError(String), - - // Add more error variants as needed. -} - -/// Defines the behavior for Redis client operations. -#[cfg_attr(test, mockall::automock)] -#[async_trait] -pub trait RedisClientTrait: Send + Sync { - /// Establishes a new multiplexed asynchronous connection to Redis. - async fn get_connection(&self) -> Result; - - /// Stores `FileInfo` in Redis using SHA256 as the key. - async fn set_file_info(&self, sha256: &str, file_info: &FileInfo) -> Result<(), RedisError>; - - /// Retrieves `FileInfo` from Redis using SHA256 as the key. - async fn get_file_info_by_sha(&self, sha256: &str) -> Result, RedisError>; - - /// Deletes `FileInfo` from Redis using SHA256 as the key. - async fn delete_file_info(&self, sha256: &str) -> Result<(), RedisError>; - - /// Sets a mapping from UUID to SHA256. - async fn set_sha_uuid_mapping(&self, uuid: &Uuid, sha256: &str) -> Result<(), RedisError>; - - /// Retrieves the SHA256 hash associated with a given UUID. - async fn get_sha_by_uuid(&self, uuid: &Uuid) -> Result, RedisError>; - - /// Deletes the UUID to SHA256 mapping from Redis. - async fn delete_sha_uuid_mapping(&self, uuid: &Uuid) -> Result<(), RedisError>; -} - -/// Provides Redis-related operations for `FileInfo`. -pub struct RedisClient { - redis_url: String, -} - -impl RedisClient { - /// Creates a new instance of `RedisClient`. - /// - /// # Arguments - /// - /// * `redis_url` - The Redis server URL (e.g., "redis://127.0.0.1/"). - /// - /// # Returns - /// - /// * `Self` - A new `RedisClient` instance. - pub fn new(redis_url: &str) -> Self { - Self { - redis_url: redis_url.to_string(), - } - } -} - -#[async_trait] -impl RedisClientTrait for RedisClient { - /// Establishes a new multiplexed asynchronous connection to Redis. - /// - /// # Returns - /// * `MultiplexedConnection` - The established connection. - async fn get_connection(&self) -> Result { - let client = redis::Client::open(self.redis_url.clone()) - .map_err(|e| RedisError::ConnectionError(e.to_string()))?; - let conn = client - .get_multiplexed_async_connection() - .await - .map_err(|e| RedisError::ConnectionError(e.to_string()))?; - Ok(conn) - } - - /// Stores `FileInfo` in Redis using SHA256 as the key. - /// - /// # Arguments - /// * `sha256` - The SHA256 hash of the file. - /// * `file_info` - The `FileInfo` object to store. - /// - /// # Returns - /// * `Result<(), RedisError>` - Empty result or an error. - async fn set_file_info(&self, sha256: &str, file_info: &FileInfo) -> Result<(), RedisError> { - let mut conn = self.get_connection().await?; - let key = format!("file_info:{}", sha256); - let value = serde_json::to_string(file_info) - .map_err(|e| RedisError::CommandError(e.to_string()))?; - conn.set(key, value).await - .map_err(|e| RedisError::CommandError(e.to_string()))?; - Ok(()) - } - - /// Retrieves `FileInfo` from Redis using SHA256 as the key. - /// - /// # Arguments - /// * `sha256` - The SHA256 hash of the file. - /// - /// # Returns - /// * `Result, RedisError>` - The `FileInfo` if found, otherwise `None`, or an error. - async fn get_file_info_by_sha(&self, sha256: &str) -> Result, RedisError> { - let mut conn = self.get_connection().await?; - let key = format!("file_info:{}", sha256); - let value: Option = conn.get(key).await - .map_err(|e| RedisError::CommandError(e.to_string()))?; - if let Some(json_str) = value { - let file_info: FileInfo = serde_json::from_str(&json_str) - .map_err(|e| RedisError::CommandError(e.to_string()))?; - Ok(Some(file_info)) - } else { - Ok(None) - } - } - - /// Deletes `FileInfo` from Redis using SHA256 as the key. - /// - /// # Arguments - /// * `sha256` - The SHA256 hash of the file. - /// - /// # Returns - /// * `Result<(), RedisError>` - Empty result or an error. - async fn delete_file_info(&self, sha256: &str) -> Result<(), RedisError> { - let mut conn = self.get_connection().await?; - let key = format!("file_info:{}", sha256); - conn.del(key).await - .map_err(|e| RedisError::CommandError(e.to_string()))?; - Ok(()) - } - - /// Sets a mapping from UUID to SHA256. - /// - /// # Arguments - /// * `uuid` - The UUID of the file. - /// * `sha256` - The SHA256 hash of the file. - /// - /// # Returns - /// * `Result<(), RedisError>` - Empty result or an error. - async fn set_sha_uuid_mapping(&self, uuid: &Uuid, sha256: &str) -> Result<(), RedisError> { - let mut conn = self.get_connection().await?; - let key = format!("uuid_sha:{}", uuid); - conn.set(key, sha256).await - .map_err(|e| RedisError::CommandError(e.to_string()))?; - Ok(()) - } - - /// Retrieves the SHA256 hash associated with a given UUID. - /// - /// # Arguments - /// * `uuid` - The UUID of the file. - /// - /// # Returns - /// * `Result, RedisError>` - The SHA256 hash if found, otherwise `None`, or an error. - async fn get_sha_by_uuid(&self, uuid: &Uuid) -> Result, RedisError> { - let mut conn = self.get_connection().await?; - let key = format!("uuid_sha:{}", uuid); - let sha: Option = conn.get(key).await - .map_err(|e| RedisError::CommandError(e.to_string()))?; - Ok(sha) - } - - /// Deletes the UUID to SHA256 mapping from Redis. - /// - /// # Arguments - /// - /// * `uuid` - The UUID of the file. - /// - /// # Returns - /// - /// * `Result<(), RedisError>` - Empty result or an error. - async fn delete_sha_uuid_mapping(&self, uuid: &Uuid) -> Result<(), RedisError> { - let mut conn = self.get_connection().await?; - let key = format!("uuid_sha:{}", uuid); - conn.del(key).await - .map_err(|e| RedisError::CommandError(e.to_string()))?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use mockall::predicate::*; - use uuid::Uuid; - - #[tokio::test] - async fn test_set_file_info() { - // Initialize the mock. - let mut mock_redis = MockRedisClientTrait::new(); - - let test_sha = "dummysha256hash".to_string(); - let test_file_info = FileInfo { - uuid: Uuid::new_v4().to_string(), - sha256: test_sha.clone(), - path: "/path/to/file".to_string(), - mime_type: "text/plain".to_string(), - }; - - // Setup expectation for `set_file_info`. - mock_redis - .expect_set_file_info() - .with(eq(test_sha.clone()), eq(test_file_info.clone())) - .times(1) - .returning(|_, _| Ok(()) ); - - // Call `set_file_info` on the mock. - let set_result = mock_redis.set_file_info(&test_sha, &test_file_info).await; - assert!(set_result.is_ok()); - } - - #[tokio::test] - async fn test_get_file_info_by_sha() { - // Initialize the mock. - let mut mock_redis = MockRedisClientTrait::new(); - - let test_sha = "dummysha256hash".to_string(); - let test_file_info = FileInfo { - uuid: Uuid::new_v4().to_string(), - sha256: test_sha.clone(), - path: "/path/to/file".to_string(), - mime_type: "text/plain".to_string(), - }; - - // Clone the FileInfo to use inside the closure. - let fi_clone = test_file_info.clone(); - - // Setup expectation for `get_file_info_by_sha`. - mock_redis - .expect_get_file_info_by_sha() - .with(eq(test_sha.clone())) - .times(1) - .returning(move |_: &str| { - // Return the cloned FileInfo. - let fi_inner = fi_clone.clone(); - Ok(Some(fi_inner)) - }); - - // Call `get_file_info_by_sha` on the mock. - let get_result = mock_redis.get_file_info_by_sha(&test_sha).await; - assert!(get_result.is_ok()); - assert_eq!(get_result.unwrap(), Some(test_file_info)); - } - - #[tokio::test] - async fn test_delete_file_info() { - // Initialize the mock. - let mut mock_redis = MockRedisClientTrait::new(); - - let test_sha = "dummysha256hash".to_string(); - - // Setup expectation for `delete_file_info`. - mock_redis - .expect_delete_file_info() - .with(eq(test_sha.clone())) - .times(1) - .returning(|_: &str| Ok(()) ); - - // Call `delete_file_info` on the mock. - let delete_result = mock_redis.delete_file_info(&test_sha).await; - assert!(delete_result.is_ok()); - } - - #[tokio::test] - async fn test_set_sha_uuid_mapping() { - // Initialize the mock. - let mut mock_redis = MockRedisClientTrait::new(); - - let test_uuid = Uuid::new_v4(); - let test_sha = "dummysha256hash".to_string(); - - // Setup expectation for `set_sha_uuid_mapping`. - mock_redis - .expect_set_sha_uuid_mapping() - .with(eq(test_uuid.clone()), eq(test_sha.clone())) - .times(1) - .returning(|_, _| Ok(()) ); - - // Call `set_sha_uuid_mapping` on the mock. - let set_result = mock_redis.set_sha_uuid_mapping(&test_uuid, &test_sha).await; - assert!(set_result.is_ok()); - } - - #[tokio::test] - async fn test_get_sha_by_uuid() { - // Initialize the mock. - let mut mock_redis = MockRedisClientTrait::new(); - - let test_uuid = Uuid::new_v4(); - let test_sha = "dummysha256hash".to_string(); - - // Clone the SHA to use inside the closure. - let sha_clone = test_sha.clone(); - - // Setup expectation for `get_sha_by_uuid`. - mock_redis - .expect_get_sha_by_uuid() - .with(eq(test_uuid.clone())) - .times(1) - .returning(move |_: &Uuid| { - let sha_inner = sha_clone.clone(); - Ok(Some(sha_inner)) - }); - - // Call `get_sha_by_uuid` on the mock. - let get_result = mock_redis.get_sha_by_uuid(&test_uuid).await; - assert!(get_result.is_ok()); - assert_eq!(get_result.unwrap(), Some(test_sha)); - } - - - #[tokio::test] - async fn test_delete_sha_uuid_mapping() { - // Initialize the mock. - let mut mock_redis = MockRedisClientTrait::new(); - - let test_uuid = Uuid::new_v4(); - - // Setup expectation for `delete_sha_uuid_mapping`. - mock_redis - .expect_delete_sha_uuid_mapping() - .with(eq(test_uuid.clone())) - .times(1) - .returning(|_: &Uuid| Ok(()) ); - - // Call `delete_sha_uuid_mapping` on the mock. - let delete_result = mock_redis.delete_sha_uuid_mapping(&test_uuid).await; - assert!(delete_result.is_ok()); - } -} - diff --git a/src/redis/mod.rs b/src/redis/mod.rs deleted file mode 100644 index b9babe5..0000000 --- a/src/redis/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod client; diff --git a/src/routes/file.rs b/src/routes/file.rs index 50c48b1..8da870a 100644 --- a/src/routes/file.rs +++ b/src/routes/file.rs @@ -1,8 +1,7 @@ +use std::sync::Arc; + use axum::{ - extract::Path, - response::IntoResponse, - Json, - + extract::Path, response::IntoResponse, Extension, Json }; use axum_typed_multipart::{TypedMultipart, FieldData, TryFromMultipart}; use serde_json::json; @@ -12,7 +11,7 @@ use uuid::Uuid; use crate::{ models::file_info::{FileError, FileInfo}, - redis::client::RedisClient, surrealdb::{document::set_file_info, SurrealDbClient}, + surrealdb::SurrealDbClient, }; #[derive(Debug, TryFromMultipart)] @@ -25,16 +24,13 @@ pub struct FileUploadRequest { /// /// Route: POST /file pub async fn upload_handler( + Extension(db_client): Extension>, TypedMultipart(input): TypedMultipart, ) -> Result { info!("Received an upload request"); - // Initialize a new RedisClient instance - let redis_client = RedisClient::new("redis://127.0.0.1/"); - - let database = SurrealDbClient::new().await.map_err(|e| FileError::PersistError(e.to_string())).unwrap(); // Process the file upload - let file_info = FileInfo::new(input.file, &database).await?; + let file_info = FileInfo::new(input.file, &db_client).await?; // Prepare the response JSON let response = json!({ @@ -55,14 +51,12 @@ pub async fn upload_handler( /// /// Route: GET /file/:uuid pub async fn get_file_handler( + Extension(db_client): Extension>, Path(uuid_str): Path, ) -> Result { // Parse UUID let uuid = Uuid::parse_str(&uuid_str).map_err(|_| FileError::InvalidUuid(uuid_str.clone()))?; - // Initialize the database client - let db_client = SurrealDbClient::new().await.map_err(|e| FileError::PersistError(e.to_string())).unwrap(); - // Retrieve FileInfo let file_info = FileInfo::get_by_uuid(uuid, &db_client).await?; @@ -84,17 +78,15 @@ pub async fn get_file_handler( /// /// Route: PUT /file/:uuid pub async fn update_file_handler( + Extension(db_client): Extension>, Path(uuid_str): Path, TypedMultipart(input): TypedMultipart, ) -> Result { // Parse UUID let uuid = Uuid::parse_str(&uuid_str).map_err(|_| FileError::InvalidUuid(uuid_str.clone()))?; - // Initialize RedisClient - let redis_client = RedisClient::new("redis://127.0.0.1/"); - // Update the file - let updated_file_info = FileInfo::update(uuid, input.file, &redis_client).await?; + let updated_file_info = FileInfo::update(uuid, input.file, &db_client).await?; // Prepare the response JSON let response = json!({ @@ -114,16 +106,14 @@ pub async fn update_file_handler( /// /// Route: DELETE /file/:uuid pub async fn delete_file_handler( + Extension(db_client): Extension>, Path(uuid_str): Path, ) -> Result { // Parse UUID let uuid = Uuid::parse_str(&uuid_str).map_err(|_| FileError::InvalidUuid(uuid_str.clone()))?; - // Initialize RedisClient - let redis_client = RedisClient::new("redis://127.0.0.1/"); - // Delete the file - FileInfo::delete(uuid, &redis_client).await?; + FileInfo::delete(uuid, &db_client).await?; info!("Deleted file with UUID: {}", uuid); diff --git a/src/routes/ingress.rs b/src/routes/ingress.rs index a89c5dd..2bb9d6b 100644 --- a/src/routes/ingress.rs +++ b/src/routes/ingress.rs @@ -1,17 +1,15 @@ use std::sync::Arc; use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use tracing::{error, info}; -use crate::{models::{file_info::FileError, ingress_content::{create_ingress_objects, IngressInput}}, rabbitmq::publisher::RabbitMQProducer, redis::client::RedisClient, surrealdb::SurrealDbClient}; +use crate::{models::ingress_content::{create_ingress_objects, IngressInput}, rabbitmq::publisher::RabbitMQProducer, surrealdb::SurrealDbClient}; pub async fn ingress_handler( Extension(producer): Extension>, + Extension(db_client): Extension>, Json(input): Json, ) -> impl IntoResponse { info!("Received input: {:?}", input); - let db_client = SurrealDbClient::new().await.map_err(|e| FileError::PersistError(e.to_string())).unwrap(); - let redis_client = RedisClient::new("redis://127.0.0.1/"); - match create_ingress_objects(input, &db_client).await { Ok(objects) => { for object in objects { diff --git a/src/server.rs b/src/server.rs index 7d08e31..47d07e8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,6 @@ use axum::{ extract::DefaultBodyLimit, routing::{delete, get, post, put}, Extension, Router }; -use tracing::info; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use zettle_db::{rabbitmq::{publisher::RabbitMQProducer, RabbitMQConfig}, routes::{file::{delete_file_handler, get_file_handler, update_file_handler, upload_handler}, ingress::ingress_handler, queue_length::queue_length_handler}, surrealdb::SurrealDbClient}; use std::sync::Arc; @@ -23,12 +22,12 @@ async fn main() -> Result<(), Box> { routing_key: "my_key".to_string(), }; + // Set up producer let producer = Arc::new(RabbitMQProducer::new(&config).await?); - // let database = SurrealDbClient::new().await?; + // Set up database client + let db_client = Arc::new(SurrealDbClient::new().await?); - // database.client.health().await?; - // info!("Passed health check"); // Create Axum router let app = Router::new() @@ -39,7 +38,8 @@ async fn main() -> Result<(), Box> { .layer(DefaultBodyLimit::max(1024 * 1024 * 1024)) .route("/file/:uuid", get(get_file_handler)) .route("/file/:uuid", put(update_file_handler)) - .route("/file/:uuid", delete(delete_file_handler)); + .route("/file/:uuid", delete(delete_file_handler)) + .layer(Extension(db_client)); tracing::info!("Listening on 0.0.0.0:3000"); let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?; diff --git a/src/surrealdb/document.rs b/src/surrealdb/document.rs deleted file mode 100644 index 60f1261..0000000 --- a/src/surrealdb/document.rs +++ /dev/null @@ -1,33 +0,0 @@ -use serde::{Deserialize, Serialize}; -use surrealdb::{engine::remote::ws::Client, RecordId, Surreal}; -use tracing::info; - -use crate::models::file_info::FileInfo; - -#[derive(Debug, Deserialize)] -struct Record { - id: RecordId, -} - -use super::SurrealError; - -pub async fn set_file_info(client: Surreal, sha256: &str, file_info: FileInfo) -> Result<(), SurrealError> { - info!("Creating in surrealdb"); - info!("{:?}, {:?}", sha256, file_info); - - // Use create instead of upsert if you're sure the record doesn't exist - let created: Option = client - .create(("file", sha256)) - .content(file_info) - .await?; - - // If you want to update or create, use upsert instead - // let created: Option = client - // .upsert(("file", sha256)) - // .content(file_info) - // .await?; - - info!("{:?}", created); - - Ok(()) -} diff --git a/src/surrealdb/graph.rs b/src/surrealdb/graph.rs deleted file mode 100644 index 8b13789..0000000 --- a/src/surrealdb/graph.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/surrealdb/mod.rs b/src/surrealdb/mod.rs index d2cd941..5f47b5f 100644 --- a/src/surrealdb/mod.rs +++ b/src/surrealdb/mod.rs @@ -1,9 +1,7 @@ use surrealdb::{engine::remote::ws::{Client, Ws}, opt::auth::Root, Surreal}; use thiserror::Error; -pub mod document; -pub mod graph; - +#[derive(Clone)] pub struct SurrealDbClient { pub client: Surreal, } diff --git a/src/utils/llm.rs b/src/utils/llm.rs index 934866e..a3c9b78 100644 --- a/src/utils/llm.rs +++ b/src/utils/llm.rs @@ -1,7 +1,7 @@ use async_openai::types::ChatCompletionRequestSystemMessage; use async_openai::types::ChatCompletionRequestUserMessage; use async_openai::types::CreateChatCompletionRequestArgs; -use tracing::info; +use tracing::debug; use crate::models::text_content::ProcessingError; use serde_json::json; use crate::models::text_content::AnalysisResult; @@ -52,9 +52,7 @@ pub async fn create_json_ld(category: &str, instructions: &str, text: &str) -> R }; // Construct the system and user messages - let system_message = format!( - "You are an expert document analyzer. You will receive a document's text content, along with user instructions and a category. Your task is to provide a structured JSON-LD object representing the content, a moderately short description of the document, how it relates to the submitted category and any relevant instructions. You shall also include related objects. The goal is to insert your output into a graph database." - ); + let system_message = "You are an expert document analyzer. You will receive a document's text content, along with user instructions and a category. Your task is to provide a structured JSON-LD object representing the content, a moderately short description of the document, how it relates to the submitted category and any relevant instructions. You shall also include related objects. The goal is to insert your output into a graph database.".to_string(); let user_message = format!( "Category: {}\nInstructions: {}\nContent:\n{}", @@ -77,7 +75,7 @@ pub async fn create_json_ld(category: &str, instructions: &str, text: &str) -> R ProcessingError::LLMError(format!("OpenAI API request failed: {}", e)) })?; - info!("{:?}", response); + debug!("{:?}", response); // Extract and parse the response for choice in response.choices {