del redis & neo4j

This commit is contained in:
Per Stark
2024-10-16 09:23:42 +02:00
parent 2ceb721329
commit 15dc0477bf
15 changed files with 53 additions and 633 deletions

View File

@@ -1,7 +1,5 @@
pub mod models;
pub mod neo4j;
pub mod rabbitmq;
pub mod redis;
pub mod routes;
pub mod surrealdb;
pub mod utils;

View File

@@ -14,10 +14,11 @@ use thiserror::Error;
use tracing::{debug, info};
use uuid::Uuid;
use crate::{redis::client::{RedisClient, RedisClientTrait}, surrealdb::SurrealDbClient};
use crate::surrealdb::SurrealDbClient;
#[derive(Debug, Deserialize)]
#[derive(Debug,Deserialize)]
struct Record {
#[allow(dead_code)]
id: RecordId,
}
@@ -48,9 +49,6 @@ pub enum FileError {
#[error("SurrealDB error: {0}")]
SurrealError(#[from] surrealdb::Error),
#[error("Redis error: {0}")]
RedisError(#[from] crate::redis::client::RedisError),
#[error("File not found for UUID: {0}")]
FileNotFound(String),
@@ -85,7 +83,6 @@ impl IntoResponse for FileError {
FileError::Utf8(_) => (StatusCode::BAD_REQUEST, "Invalid UTF-8 data"),
FileError::MimeDetection(_) => (StatusCode::BAD_REQUEST, "MIME type detection failed"),
FileError::UnsupportedMime(_) => (StatusCode::UNSUPPORTED_MEDIA_TYPE, "Unsupported MIME type"),
FileError::RedisError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Redis error"),
FileError::FileNotFound(_) => (StatusCode::NOT_FOUND, "File not found"),
FileError::DuplicateFile(_) => (StatusCode::CONFLICT, "Duplicate file detected"),
FileError::HashCollision => (StatusCode::INTERNAL_SERVER_ERROR, "Hash collision detected"),
@@ -122,9 +119,10 @@ impl FileInfo {
info!("SHA256: {:?}", sha);
// Check if SHA exists in SurrealDB
if let Some(existing_file_info) = Self::get_by_sha(&sha, db_client).await? {
info!("Duplicate file detected with SHA256: {}", sha);
return Ok(existing_file_info);
if let Ok(file) = Self::get_by_sha(&sha, db_client).await {
info!("File already exists in database with SHA256: {}", sha);
// SHA exists: return FileInfo
return Ok(file);
}
// Generate a new UUID
@@ -176,9 +174,10 @@ impl FileInfo {
let new_sha = Self::get_sha(&new_file).await?;
// Check if the new SHA already exists
if let Some(existing_file_info) = Self::get_by_sha(&new_sha, &db_client).await? {
info!("Duplicate file detected with SHA256: {}", new_sha);
return Ok(existing_file_info);
if let Ok(file) = Self::get_by_sha(&new_sha, db_client).await {
info!("File already exists in database with SHA256: {}", new_sha);
// SHA exists: return FileInfo
return Ok(file);
}
// Sanitize new file name
@@ -191,8 +190,8 @@ impl FileInfo {
let new_mime_type = Self::guess_mime_type(&new_persisted_path);
// Get the existing item and remove it
let old_record = Self::get_by_uuid(uuid, &db_client).await?;
Self::delete_record(&old_record.sha256, &db_client).await?;
let old_record = Self::get_by_uuid(uuid, db_client).await?;
Self::delete_record(&old_record.sha256, db_client).await?;
// Update FileInfo
let updated_file_info = FileInfo {
@@ -203,7 +202,7 @@ impl FileInfo {
};
// Save the new item
Self::create_record(&updated_file_info,&db_client).await?;
Self::create_record(&updated_file_info,db_client).await?;
// Optionally, delete the old file from the filesystem if it's no longer referenced
// This requires reference counting or checking if other FileInfo entries point to the same SHA
@@ -222,7 +221,7 @@ impl FileInfo {
/// * `Result<(), FileError>` - Empty result or an error.
pub async fn delete(uuid: Uuid, db_client: &SurrealDbClient) -> Result<(), FileError> {
// Retrieve FileInfo to get SHA256 and path
let file_info = Self::get_by_uuid(uuid, &db_client).await?;
let file_info = Self::get_by_uuid(uuid, db_client).await?;
// Delete the file from the filesystem
let file_path = Path::new(&file_info.path);
@@ -234,7 +233,7 @@ impl FileInfo {
}
// Delete the FileInfo from database
Self::delete_record(&file_info.sha256, &db_client).await?;
Self::delete_record(&file_info.sha256, db_client).await?;
// Remove the UUID directory if empty
let uuid_dir = file_path.parent().ok_or(FileError::FileNotFound(uuid.to_string()))?;
@@ -324,8 +323,6 @@ impl FileInfo {
/// # Returns
/// * `Result<(), FileError>` - Empty result or an error.
async fn create_record(file_info: &FileInfo, db_client: &SurrealDbClient) -> Result<(), FileError> {
// Define the table and primary key
// Create the record
let _created: Option<Record> = db_client
.client
@@ -352,7 +349,7 @@ impl FileInfo {
let query = format!("SELECT * FROM file WHERE uuid = '{}'", uuid);
let response: Vec<FileInfo> = db_client.client.query(query).await?.take(0)?;
Ok(response.into_iter().next().ok_or(FileError::FileNotFound(uuid.to_string()))?)
response.into_iter().next().ok_or(FileError::FileNotFound(uuid.to_string()))
}
/// Retrieves a `FileInfo` by SHA256.
@@ -363,14 +360,13 @@ impl FileInfo {
///
/// # Returns
/// * `Result<Option<FileInfo>, FileError>` - The `FileInfo` or `None` if not found.
async fn get_by_sha(sha256: &str, db_client: &SurrealDbClient) -> Result<Option<FileInfo>, FileError> {
let query = format!("SELECT * FROM file WHERE sha256 = '{}'", sha256);
async fn get_by_sha(sha256: &str, db_client: &SurrealDbClient) -> Result<FileInfo, FileError> {
let query = format!("SELECT * FROM file WHERE sha256 = '{}'", &sha256);
let response: Vec<FileInfo> = db_client.client.query(query).await?.take(0)?;
debug!("{:?}", response);
Ok(response.into_iter().next())
}
response.into_iter().next().ok_or(FileError::FileNotFound(sha256.to_string())) }
/// Deletes a `FileInfo` record by SHA256.
///

View File

@@ -3,11 +3,9 @@ use thiserror::Error;
use tracing::info;
use url::Url;
use uuid::Uuid;
use crate::{redis::client::RedisClient, surrealdb::SurrealDbClient};
use crate::surrealdb::SurrealDbClient;
use super::{file_info::FileInfo, ingress_object::IngressObject };
/// Struct defining the expected body when ingressing content.
#[derive(Serialize, Deserialize, Debug)]
pub struct IngressInput {
@@ -83,8 +81,8 @@ pub async fn create_ingress_objects(
if let Some(file_uuids) = input.files {
for uuid_str in file_uuids {
let uuid = Uuid::parse_str(&uuid_str)?;
match FileInfo::get_by_uuid(uuid, &db_client).await {
Ok(Some(file_info)) => {
match FileInfo::get_by_uuid(uuid, db_client).await {
Ok(file_info) => {
object_list.push(IngressObject::File {
file_info,
instructions: input.instructions.clone(),

View File

@@ -1,7 +1,7 @@
use serde::{Deserialize, Serialize};
use tracing::info;
use uuid::Uuid;
use crate::{models::file_info::FileInfo, neo4j::client::Neo4jClient, utils::llm::create_json_ld};
use crate::{models::file_info::FileInfo, utils::llm::create_json_ld};
use thiserror::Error;
/// Represents a single piece of text content extracted from various sources.
@@ -60,21 +60,21 @@ pub enum ProcessingError {
impl TextContent {
/// Processes the `TextContent` by sending it to an LLM, storing in a graph DB, and vector DB.
pub async fn process(&self) -> Result<(), ProcessingError> {
let client = Neo4jClient::new("127.0.0.1:7687", "neo4j", "neo4j").await.expect("Failed to create Neo4j client");
// let client = Neo4jClient::new("127.0.0.1:7687", "neo4j", "neo4j").await.expect("Failed to create Neo4j client");
// Step 1: Send to LLM for analysis
let analysis = create_json_ld(&self.category, &self.instructions, &self.text).await?;
info!("{:?}", &analysis);
// Step 2: Store analysis results in Graph DB
client.store_knowledge_source(&analysis.knowledge_source).await?;
// client.store_knowledge_source(&analysis.knowledge_source).await?;
// Step 3: Store relationships in Graph DB
for relationship in analysis.knowledge_source.relationships.iter() {
client
.store_relationship(&analysis.knowledge_source.id, relationship)
.await?;
}
// for relationship in analysis.knowledge_source.relationships.iter() {
// client
// .store_relationship(&analysis.knowledge_source.id, relationship)
// .await?;
// }
// Step 3: Split text and store in Vector DB

View File

@@ -1,183 +0,0 @@
use neo4rs::*;
use tracing::info;
use std::error::Error;
use crate::models::text_content::{KnowledgeSource, ProcessingError, Relationship};
/// A client for interacting with the Neo4j graph database.
pub struct Neo4jClient {
/// The Neo4j graph instance.
graph: Graph,
}
impl Neo4jClient {
/// Creates a new `Neo4jClient` instance by connecting to the Neo4j database.
///
/// # Arguments
///
/// * `uri` - The URI of the Neo4j database (e.g., "127.0.0.1:7687").
/// * `user` - The username for authentication.
/// * `pass` - The password for authentication.
///
/// # Errors
///
/// Returns an `Err` variant if the connection to the database fails.
pub async fn new(uri: &str, user: &str, pass: &str) -> Result<Self, Box<dyn Error>> {
// Initialize the Neo4j graph connection.
let graph = Graph::new(uri, user, pass).await?;
Ok(Neo4jClient { graph })
}
/// Stores a knowledge source in the Neo4j database.
///
/// # Arguments
///
/// * `source` - A reference to the `KnowledgeSource` to be stored.
///
/// # Errors
///
/// Returns an `Err` variant if the database operation fails.
pub async fn store_knowledge_source(
&self,
source: &KnowledgeSource,
) -> Result<(), ProcessingError> {
// Cypher query to create a knowledge source node with properties.
let cypher_query = "
CREATE (ks:KnowledgeSource {
id: $id,
type: $type,
title: $title,
description: $description
})
RETURN ks
";
// Execute the query with parameters.
let _ = self.graph
.run(
query(cypher_query)
.param("id", source.id.to_string())
.param("title", source.title.to_string())
.param("description", source.description.to_string()),
)
.await.map_err(|e| ProcessingError::GraphDBError(e.to_string()));
info!("Stored knowledge source");
Ok(())
}
/// Stores a relationship between two knowledge sources in the Neo4j database.
///
/// # Arguments
///
/// * `source_id` - The ID of the source knowledge source.
/// * `relationship` - A reference to the `Relationship` defining the connection.
///
/// # Errors
///
/// Returns an `Err` variant if the database operation fails.
pub async fn store_relationship(
&self,
source_id: &str,
relationship: &Relationship,
) -> Result<(), ProcessingError> {
// Cypher query to create a relationship between two knowledge source nodes.
let cypher_query = format!(
"
MATCH (a:KnowledgeSource {{id: $source_id}})
MATCH (b:KnowledgeSource {{id: $target_id}})
CREATE (a)-[:{}]->(b)
RETURN a, b
",
relationship.type_
);
// Execute the query with parameters.
let _ = self.graph
.run(
query(&cypher_query)
.param("source_id", source_id)
.param("target", relationship.target.to_string()),
)
.await.map_err(|e| ProcessingError::GraphDBError(e.to_string()));
info!("Stored knowledge relationship");
Ok(())
}
//// Closes the connection to the Neo4j database.
////
//// This function ensures that all pending operations are completed before shutting down.
// pub async fn close(self) -> Result<(), Box<dyn Error>> {
// self.graph.close().await?;
// Ok(())
// }
}
// #[cfg(test)]
// mod tests {
// use super::*;
// use uuid::Uuid;
// /// Tests the `store_knowledge_source` and `store_relationship` functions.
// ///
// /// # Note
// ///
// /// Ensure that a Neo4j database is running and accessible at the specified URI
// /// before running these tests.
// #[tokio::test]
// async fn test_store_functions() {
// // Initialize the Neo4j client.
// let client = Neo4jClient::new("127.0.0.1:7687", "neo4j", "neo")
// .await
// .expect("Failed to create Neo4j client");
// // Create a first knowledge source.
// let source1 = KnowledgeSource {
// id: Uuid::new_v4().to_string(),
// source_type: "Document".into(),
// title: "Understanding Neural Networks".into(),
// description:
// "An in-depth analysis of neural networks and their applications in machine learning."
// .into(),
// };
// // Store the first knowledge source.
// client
// .store_knowledge_source(&source1)
// .await
// .expect("Failed to store knowledge source 1");
// // Create a second knowledge source.
// let source2 = KnowledgeSource {
// id: Uuid::new_v4().to_string(),
// source_type: "Document".into(),
// title: "Machine Learning Basics".into(),
// description: "A foundational text on machine learning principles and techniques."
// .into(),
// };
// // Store the second knowledge source.
// client
// .store_knowledge_source(&source2)
// .await
// .expect("Failed to store knowledge source 2");
// // Define a relationship between the two sources.
// let relationship = Relationship {
// relationship_type: "RelatedTo".into(),
// target_id: source2.id.clone(),
// };
// // Store the relationship from source1 to source2.
// client
// .store_relationship(&source1.id, &relationship)
// .await
// .expect("Failed to store relationship");
// // Clean up by closing the client.
// client.close().await.expect("Failed to close Neo4j client");
// }
// }

View File

@@ -1 +0,0 @@
pub mod client;

View File

@@ -1,337 +0,0 @@
use axum::async_trait;
use redis::AsyncCommands;
use thiserror::Error;
use uuid::Uuid;
use crate::models::file_info::FileInfo;
/// Represents errors that can occur during Redis operations.
#[derive(Error, Debug)]
pub enum RedisError {
#[error("Redis connection error: {0}")]
ConnectionError(String),
#[error("Redis command error: {0}")]
CommandError(String),
// Add more error variants as needed.
}
/// Defines the behavior for Redis client operations.
#[cfg_attr(test, mockall::automock)]
#[async_trait]
pub trait RedisClientTrait: Send + Sync {
/// Establishes a new multiplexed asynchronous connection to Redis.
async fn get_connection(&self) -> Result<redis::aio::MultiplexedConnection, RedisError>;
/// Stores `FileInfo` in Redis using SHA256 as the key.
async fn set_file_info(&self, sha256: &str, file_info: &FileInfo) -> Result<(), RedisError>;
/// Retrieves `FileInfo` from Redis using SHA256 as the key.
async fn get_file_info_by_sha(&self, sha256: &str) -> Result<Option<FileInfo>, RedisError>;
/// Deletes `FileInfo` from Redis using SHA256 as the key.
async fn delete_file_info(&self, sha256: &str) -> Result<(), RedisError>;
/// Sets a mapping from UUID to SHA256.
async fn set_sha_uuid_mapping(&self, uuid: &Uuid, sha256: &str) -> Result<(), RedisError>;
/// Retrieves the SHA256 hash associated with a given UUID.
async fn get_sha_by_uuid(&self, uuid: &Uuid) -> Result<Option<String>, RedisError>;
/// Deletes the UUID to SHA256 mapping from Redis.
async fn delete_sha_uuid_mapping(&self, uuid: &Uuid) -> Result<(), RedisError>;
}
/// Provides Redis-related operations for `FileInfo`.
pub struct RedisClient {
redis_url: String,
}
impl RedisClient {
/// Creates a new instance of `RedisClient`.
///
/// # Arguments
///
/// * `redis_url` - The Redis server URL (e.g., "redis://127.0.0.1/").
///
/// # Returns
///
/// * `Self` - A new `RedisClient` instance.
pub fn new(redis_url: &str) -> Self {
Self {
redis_url: redis_url.to_string(),
}
}
}
#[async_trait]
impl RedisClientTrait for RedisClient {
/// Establishes a new multiplexed asynchronous connection to Redis.
///
/// # Returns
/// * `MultiplexedConnection` - The established connection.
async fn get_connection(&self) -> Result<redis::aio::MultiplexedConnection, RedisError> {
let client = redis::Client::open(self.redis_url.clone())
.map_err(|e| RedisError::ConnectionError(e.to_string()))?;
let conn = client
.get_multiplexed_async_connection()
.await
.map_err(|e| RedisError::ConnectionError(e.to_string()))?;
Ok(conn)
}
/// Stores `FileInfo` in Redis using SHA256 as the key.
///
/// # Arguments
/// * `sha256` - The SHA256 hash of the file.
/// * `file_info` - The `FileInfo` object to store.
///
/// # Returns
/// * `Result<(), RedisError>` - Empty result or an error.
async fn set_file_info(&self, sha256: &str, file_info: &FileInfo) -> Result<(), RedisError> {
let mut conn = self.get_connection().await?;
let key = format!("file_info:{}", sha256);
let value = serde_json::to_string(file_info)
.map_err(|e| RedisError::CommandError(e.to_string()))?;
conn.set(key, value).await
.map_err(|e| RedisError::CommandError(e.to_string()))?;
Ok(())
}
/// Retrieves `FileInfo` from Redis using SHA256 as the key.
///
/// # Arguments
/// * `sha256` - The SHA256 hash of the file.
///
/// # Returns
/// * `Result<Option<FileInfo>, RedisError>` - The `FileInfo` if found, otherwise `None`, or an error.
async fn get_file_info_by_sha(&self, sha256: &str) -> Result<Option<FileInfo>, RedisError> {
let mut conn = self.get_connection().await?;
let key = format!("file_info:{}", sha256);
let value: Option<String> = conn.get(key).await
.map_err(|e| RedisError::CommandError(e.to_string()))?;
if let Some(json_str) = value {
let file_info: FileInfo = serde_json::from_str(&json_str)
.map_err(|e| RedisError::CommandError(e.to_string()))?;
Ok(Some(file_info))
} else {
Ok(None)
}
}
/// Deletes `FileInfo` from Redis using SHA256 as the key.
///
/// # Arguments
/// * `sha256` - The SHA256 hash of the file.
///
/// # Returns
/// * `Result<(), RedisError>` - Empty result or an error.
async fn delete_file_info(&self, sha256: &str) -> Result<(), RedisError> {
let mut conn = self.get_connection().await?;
let key = format!("file_info:{}", sha256);
conn.del(key).await
.map_err(|e| RedisError::CommandError(e.to_string()))?;
Ok(())
}
/// Sets a mapping from UUID to SHA256.
///
/// # Arguments
/// * `uuid` - The UUID of the file.
/// * `sha256` - The SHA256 hash of the file.
///
/// # Returns
/// * `Result<(), RedisError>` - Empty result or an error.
async fn set_sha_uuid_mapping(&self, uuid: &Uuid, sha256: &str) -> Result<(), RedisError> {
let mut conn = self.get_connection().await?;
let key = format!("uuid_sha:{}", uuid);
conn.set(key, sha256).await
.map_err(|e| RedisError::CommandError(e.to_string()))?;
Ok(())
}
/// Retrieves the SHA256 hash associated with a given UUID.
///
/// # Arguments
/// * `uuid` - The UUID of the file.
///
/// # Returns
/// * `Result<Option<String>, RedisError>` - The SHA256 hash if found, otherwise `None`, or an error.
async fn get_sha_by_uuid(&self, uuid: &Uuid) -> Result<Option<String>, RedisError> {
let mut conn = self.get_connection().await?;
let key = format!("uuid_sha:{}", uuid);
let sha: Option<String> = conn.get(key).await
.map_err(|e| RedisError::CommandError(e.to_string()))?;
Ok(sha)
}
/// Deletes the UUID to SHA256 mapping from Redis.
///
/// # Arguments
///
/// * `uuid` - The UUID of the file.
///
/// # Returns
///
/// * `Result<(), RedisError>` - Empty result or an error.
async fn delete_sha_uuid_mapping(&self, uuid: &Uuid) -> Result<(), RedisError> {
let mut conn = self.get_connection().await?;
let key = format!("uuid_sha:{}", uuid);
conn.del(key).await
.map_err(|e| RedisError::CommandError(e.to_string()))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use mockall::predicate::*;
use uuid::Uuid;
#[tokio::test]
async fn test_set_file_info() {
// Initialize the mock.
let mut mock_redis = MockRedisClientTrait::new();
let test_sha = "dummysha256hash".to_string();
let test_file_info = FileInfo {
uuid: Uuid::new_v4().to_string(),
sha256: test_sha.clone(),
path: "/path/to/file".to_string(),
mime_type: "text/plain".to_string(),
};
// Setup expectation for `set_file_info`.
mock_redis
.expect_set_file_info()
.with(eq(test_sha.clone()), eq(test_file_info.clone()))
.times(1)
.returning(|_, _| Ok(()) );
// Call `set_file_info` on the mock.
let set_result = mock_redis.set_file_info(&test_sha, &test_file_info).await;
assert!(set_result.is_ok());
}
#[tokio::test]
async fn test_get_file_info_by_sha() {
// Initialize the mock.
let mut mock_redis = MockRedisClientTrait::new();
let test_sha = "dummysha256hash".to_string();
let test_file_info = FileInfo {
uuid: Uuid::new_v4().to_string(),
sha256: test_sha.clone(),
path: "/path/to/file".to_string(),
mime_type: "text/plain".to_string(),
};
// Clone the FileInfo to use inside the closure.
let fi_clone = test_file_info.clone();
// Setup expectation for `get_file_info_by_sha`.
mock_redis
.expect_get_file_info_by_sha()
.with(eq(test_sha.clone()))
.times(1)
.returning(move |_: &str| {
// Return the cloned FileInfo.
let fi_inner = fi_clone.clone();
Ok(Some(fi_inner))
});
// Call `get_file_info_by_sha` on the mock.
let get_result = mock_redis.get_file_info_by_sha(&test_sha).await;
assert!(get_result.is_ok());
assert_eq!(get_result.unwrap(), Some(test_file_info));
}
#[tokio::test]
async fn test_delete_file_info() {
// Initialize the mock.
let mut mock_redis = MockRedisClientTrait::new();
let test_sha = "dummysha256hash".to_string();
// Setup expectation for `delete_file_info`.
mock_redis
.expect_delete_file_info()
.with(eq(test_sha.clone()))
.times(1)
.returning(|_: &str| Ok(()) );
// Call `delete_file_info` on the mock.
let delete_result = mock_redis.delete_file_info(&test_sha).await;
assert!(delete_result.is_ok());
}
#[tokio::test]
async fn test_set_sha_uuid_mapping() {
// Initialize the mock.
let mut mock_redis = MockRedisClientTrait::new();
let test_uuid = Uuid::new_v4();
let test_sha = "dummysha256hash".to_string();
// Setup expectation for `set_sha_uuid_mapping`.
mock_redis
.expect_set_sha_uuid_mapping()
.with(eq(test_uuid.clone()), eq(test_sha.clone()))
.times(1)
.returning(|_, _| Ok(()) );
// Call `set_sha_uuid_mapping` on the mock.
let set_result = mock_redis.set_sha_uuid_mapping(&test_uuid, &test_sha).await;
assert!(set_result.is_ok());
}
#[tokio::test]
async fn test_get_sha_by_uuid() {
// Initialize the mock.
let mut mock_redis = MockRedisClientTrait::new();
let test_uuid = Uuid::new_v4();
let test_sha = "dummysha256hash".to_string();
// Clone the SHA to use inside the closure.
let sha_clone = test_sha.clone();
// Setup expectation for `get_sha_by_uuid`.
mock_redis
.expect_get_sha_by_uuid()
.with(eq(test_uuid.clone()))
.times(1)
.returning(move |_: &Uuid| {
let sha_inner = sha_clone.clone();
Ok(Some(sha_inner))
});
// Call `get_sha_by_uuid` on the mock.
let get_result = mock_redis.get_sha_by_uuid(&test_uuid).await;
assert!(get_result.is_ok());
assert_eq!(get_result.unwrap(), Some(test_sha));
}
#[tokio::test]
async fn test_delete_sha_uuid_mapping() {
// Initialize the mock.
let mut mock_redis = MockRedisClientTrait::new();
let test_uuid = Uuid::new_v4();
// Setup expectation for `delete_sha_uuid_mapping`.
mock_redis
.expect_delete_sha_uuid_mapping()
.with(eq(test_uuid.clone()))
.times(1)
.returning(|_: &Uuid| Ok(()) );
// Call `delete_sha_uuid_mapping` on the mock.
let delete_result = mock_redis.delete_sha_uuid_mapping(&test_uuid).await;
assert!(delete_result.is_ok());
}
}

View File

@@ -1 +0,0 @@
pub mod client;

View File

@@ -1,8 +1,7 @@
use std::sync::Arc;
use axum::{
extract::Path,
response::IntoResponse,
Json,
extract::Path, response::IntoResponse, Extension, Json
};
use axum_typed_multipart::{TypedMultipart, FieldData, TryFromMultipart};
use serde_json::json;
@@ -12,7 +11,7 @@ use uuid::Uuid;
use crate::{
models::file_info::{FileError, FileInfo},
redis::client::RedisClient, surrealdb::{document::set_file_info, SurrealDbClient},
surrealdb::SurrealDbClient,
};
#[derive(Debug, TryFromMultipart)]
@@ -25,16 +24,13 @@ pub struct FileUploadRequest {
///
/// Route: POST /file
pub async fn upload_handler(
Extension(db_client): Extension<Arc<SurrealDbClient>>,
TypedMultipart(input): TypedMultipart<FileUploadRequest>,
) -> Result<impl IntoResponse, FileError> {
info!("Received an upload request");
// Initialize a new RedisClient instance
let redis_client = RedisClient::new("redis://127.0.0.1/");
let database = SurrealDbClient::new().await.map_err(|e| FileError::PersistError(e.to_string())).unwrap();
// Process the file upload
let file_info = FileInfo::new(input.file, &database).await?;
let file_info = FileInfo::new(input.file, &db_client).await?;
// Prepare the response JSON
let response = json!({
@@ -55,14 +51,12 @@ pub async fn upload_handler(
///
/// Route: GET /file/:uuid
pub async fn get_file_handler(
Extension(db_client): Extension<Arc<SurrealDbClient>>,
Path(uuid_str): Path<String>,
) -> Result<impl IntoResponse, FileError> {
// Parse UUID
let uuid = Uuid::parse_str(&uuid_str).map_err(|_| FileError::InvalidUuid(uuid_str.clone()))?;
// Initialize the database client
let db_client = SurrealDbClient::new().await.map_err(|e| FileError::PersistError(e.to_string())).unwrap();
// Retrieve FileInfo
let file_info = FileInfo::get_by_uuid(uuid, &db_client).await?;
@@ -84,17 +78,15 @@ pub async fn get_file_handler(
///
/// Route: PUT /file/:uuid
pub async fn update_file_handler(
Extension(db_client): Extension<Arc<SurrealDbClient>>,
Path(uuid_str): Path<String>,
TypedMultipart(input): TypedMultipart<FileUploadRequest>,
) -> Result<impl IntoResponse, FileError> {
// Parse UUID
let uuid = Uuid::parse_str(&uuid_str).map_err(|_| FileError::InvalidUuid(uuid_str.clone()))?;
// Initialize RedisClient
let redis_client = RedisClient::new("redis://127.0.0.1/");
// Update the file
let updated_file_info = FileInfo::update(uuid, input.file, &redis_client).await?;
let updated_file_info = FileInfo::update(uuid, input.file, &db_client).await?;
// Prepare the response JSON
let response = json!({
@@ -114,16 +106,14 @@ pub async fn update_file_handler(
///
/// Route: DELETE /file/:uuid
pub async fn delete_file_handler(
Extension(db_client): Extension<Arc<SurrealDbClient>>,
Path(uuid_str): Path<String>,
) -> Result<impl IntoResponse, FileError> {
// Parse UUID
let uuid = Uuid::parse_str(&uuid_str).map_err(|_| FileError::InvalidUuid(uuid_str.clone()))?;
// Initialize RedisClient
let redis_client = RedisClient::new("redis://127.0.0.1/");
// Delete the file
FileInfo::delete(uuid, &redis_client).await?;
FileInfo::delete(uuid, &db_client).await?;
info!("Deleted file with UUID: {}", uuid);

View File

@@ -1,17 +1,15 @@
use std::sync::Arc;
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use tracing::{error, info};
use crate::{models::{file_info::FileError, ingress_content::{create_ingress_objects, IngressInput}}, rabbitmq::publisher::RabbitMQProducer, redis::client::RedisClient, surrealdb::SurrealDbClient};
use crate::{models::ingress_content::{create_ingress_objects, IngressInput}, rabbitmq::publisher::RabbitMQProducer, surrealdb::SurrealDbClient};
pub async fn ingress_handler(
Extension(producer): Extension<Arc<RabbitMQProducer>>,
Extension(db_client): Extension<Arc<SurrealDbClient>>,
Json(input): Json<IngressInput>,
) -> impl IntoResponse {
info!("Received input: {:?}", input);
let db_client = SurrealDbClient::new().await.map_err(|e| FileError::PersistError(e.to_string())).unwrap();
let redis_client = RedisClient::new("redis://127.0.0.1/");
match create_ingress_objects(input, &db_client).await {
Ok(objects) => {
for object in objects {

View File

@@ -1,7 +1,6 @@
use axum::{
extract::DefaultBodyLimit, routing::{delete, get, post, put}, Extension, Router
};
use tracing::info;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use zettle_db::{rabbitmq::{publisher::RabbitMQProducer, RabbitMQConfig}, routes::{file::{delete_file_handler, get_file_handler, update_file_handler, upload_handler}, ingress::ingress_handler, queue_length::queue_length_handler}, surrealdb::SurrealDbClient};
use std::sync::Arc;
@@ -23,12 +22,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
routing_key: "my_key".to_string(),
};
// Set up producer
let producer = Arc::new(RabbitMQProducer::new(&config).await?);
// let database = SurrealDbClient::new().await?;
// Set up database client
let db_client = Arc::new(SurrealDbClient::new().await?);
// database.client.health().await?;
// info!("Passed health check");
// Create Axum router
let app = Router::new()
@@ -39,7 +38,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.layer(DefaultBodyLimit::max(1024 * 1024 * 1024))
.route("/file/:uuid", get(get_file_handler))
.route("/file/:uuid", put(update_file_handler))
.route("/file/:uuid", delete(delete_file_handler));
.route("/file/:uuid", delete(delete_file_handler))
.layer(Extension(db_client));
tracing::info!("Listening on 0.0.0.0:3000");
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;

View File

@@ -1,33 +0,0 @@
use serde::{Deserialize, Serialize};
use surrealdb::{engine::remote::ws::Client, RecordId, Surreal};
use tracing::info;
use crate::models::file_info::FileInfo;
#[derive(Debug, Deserialize)]
struct Record {
id: RecordId,
}
use super::SurrealError;
pub async fn set_file_info(client: Surreal<Client>, sha256: &str, file_info: FileInfo) -> Result<(), SurrealError> {
info!("Creating in surrealdb");
info!("{:?}, {:?}", sha256, file_info);
// Use create instead of upsert if you're sure the record doesn't exist
let created: Option<Record> = client
.create(("file", sha256))
.content(file_info)
.await?;
// If you want to update or create, use upsert instead
// let created: Option<Record> = client
// .upsert(("file", sha256))
// .content(file_info)
// .await?;
info!("{:?}", created);
Ok(())
}

View File

@@ -1 +0,0 @@

View File

@@ -1,9 +1,7 @@
use surrealdb::{engine::remote::ws::{Client, Ws}, opt::auth::Root, Surreal};
use thiserror::Error;
pub mod document;
pub mod graph;
#[derive(Clone)]
pub struct SurrealDbClient {
pub client: Surreal<Client>,
}

View File

@@ -1,7 +1,7 @@
use async_openai::types::ChatCompletionRequestSystemMessage;
use async_openai::types::ChatCompletionRequestUserMessage;
use async_openai::types::CreateChatCompletionRequestArgs;
use tracing::info;
use tracing::debug;
use crate::models::text_content::ProcessingError;
use serde_json::json;
use crate::models::text_content::AnalysisResult;
@@ -52,9 +52,7 @@ pub async fn create_json_ld(category: &str, instructions: &str, text: &str) -> R
};
// Construct the system and user messages
let system_message = format!(
"You are an expert document analyzer. You will receive a document's text content, along with user instructions and a category. Your task is to provide a structured JSON-LD object representing the content, a moderately short description of the document, how it relates to the submitted category and any relevant instructions. You shall also include related objects. The goal is to insert your output into a graph database."
);
let system_message = "You are an expert document analyzer. You will receive a document's text content, along with user instructions and a category. Your task is to provide a structured JSON-LD object representing the content, a moderately short description of the document, how it relates to the submitted category and any relevant instructions. You shall also include related objects. The goal is to insert your output into a graph database.".to_string();
let user_message = format!(
"Category: {}\nInstructions: {}\nContent:\n{}",
@@ -77,7 +75,7 @@ pub async fn create_json_ld(category: &str, instructions: &str, text: &str) -> R
ProcessingError::LLMError(format!("OpenAI API request failed: {}", e))
})?;
info!("{:?}", response);
debug!("{:?}", response);
// Extract and parse the response
for choice in response.choices {