mirror of
https://github.com/perstarkse/minne.git
synced 2026-03-28 20:31:53 +01:00
wip datetime impl
This commit is contained in:
@@ -40,7 +40,14 @@ use zettle_db::{
|
||||
},
|
||||
AppState,
|
||||
},
|
||||
storage::{db::SurrealDbClient, types::user::User},
|
||||
storage::{
|
||||
db::SurrealDbClient,
|
||||
types::{
|
||||
file_info::FileInfo, job::Job, knowledge_entity::KnowledgeEntity,
|
||||
knowledge_relationship::KnowledgeRelationship, text_chunk::TextChunk,
|
||||
text_content::TextContent, user::User,
|
||||
},
|
||||
},
|
||||
utils::{config::get_config, mailer::Mailer},
|
||||
};
|
||||
|
||||
@@ -90,8 +97,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
job_queue: Arc::new(JobQueue::new(surreal_db_client)),
|
||||
};
|
||||
|
||||
// setup_auth(&app_state.surreal_db_client).await?;
|
||||
|
||||
let session_config = SessionConfig::default()
|
||||
.with_table_name("test_session_table")
|
||||
.with_secure(true);
|
||||
@@ -104,6 +109,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
.await?;
|
||||
|
||||
app_state.surreal_db_client.build_indexes().await?;
|
||||
setup_auth(&app_state.surreal_db_client).await?;
|
||||
// app_state.surreal_db_client.drop_table::<FileInfo>().await?;
|
||||
// app_state.surreal_db_client.drop_table::<User>().await?;
|
||||
// app_state.surreal_db_client.drop_table::<Job>().await?;
|
||||
// app_state
|
||||
// .surreal_db_client
|
||||
// .drop_table::<KnowledgeEntity>()
|
||||
// .await?;
|
||||
// app_state
|
||||
// .surreal_db_client
|
||||
// .drop_table::<KnowledgeRelationship>()
|
||||
// .await?;
|
||||
// app_state
|
||||
// .surreal_db_client
|
||||
// .drop_table::<TextContent>()
|
||||
// .await?;
|
||||
// app_state
|
||||
// .surreal_db_client
|
||||
// .drop_table::<TextChunk>()
|
||||
// .await?;
|
||||
|
||||
// Create Axum router
|
||||
let app = Router::new()
|
||||
@@ -177,14 +202,14 @@ fn html_routes(
|
||||
.layer(SessionLayer::new(session_store))
|
||||
}
|
||||
|
||||
// async fn setup_auth(db: &SurrealDbClient) -> Result<(), Box<dyn std::error::Error>> {
|
||||
// db.query(
|
||||
// "DEFINE TABLE user SCHEMALESS;
|
||||
// DEFINE INDEX unique_name ON TABLE user FIELDS email UNIQUE;
|
||||
// DEFINE ACCESS account ON DATABASE TYPE RECORD
|
||||
// SIGNUP ( CREATE user SET email = $email, password = crypto::argon2::generate($password), anonymous = false, user_id = $user_id)
|
||||
// SIGNIN ( SELECT * FROM user WHERE email = $email AND crypto::argon2::compare(password, $password) );",
|
||||
// )
|
||||
// .await?;
|
||||
// Ok(())
|
||||
// }
|
||||
async fn setup_auth(db: &SurrealDbClient) -> Result<(), Box<dyn std::error::Error>> {
|
||||
db.query(
|
||||
"DEFINE TABLE user SCHEMALESS;
|
||||
DEFINE INDEX unique_name ON TABLE user FIELDS email UNIQUE;
|
||||
DEFINE ACCESS account ON DATABASE TYPE RECORD
|
||||
SIGNUP ( CREATE user SET email = $email, password = crypto::argon2::generate($password), anonymous = false, user_id = $user_id)
|
||||
SIGNIN ( SELECT * FROM user WHERE email = $email AND crypto::argon2::compare(password, $password) );",
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::task;
|
||||
|
||||
@@ -162,8 +163,11 @@ async fn create_single_entity(
|
||||
|
||||
let embedding = generate_embedding(openai_client, &embedding_input).await?;
|
||||
|
||||
let now = Utc::now();
|
||||
Ok(KnowledgeEntity {
|
||||
id: assigned_id,
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
name: llm_entity.name.to_string(),
|
||||
description: llm_entity.description.to_string(),
|
||||
entity_type: KnowledgeEntityType::from(llm_entity.entity_type.to_string()),
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use chrono::Utc;
|
||||
use futures::Stream;
|
||||
use std::{
|
||||
sync::Arc,
|
||||
@@ -33,6 +34,7 @@ impl JobQueue {
|
||||
/// Creates a new job and stores it in the database
|
||||
pub async fn enqueue(&self, content: IngressObject, user_id: String) -> Result<Job, AppError> {
|
||||
let job = Job::new(content, user_id).await;
|
||||
info!("{:?}", job);
|
||||
store_item(&self.db, job.clone()).await?;
|
||||
Ok(job)
|
||||
}
|
||||
@@ -46,6 +48,7 @@ impl JobQueue {
|
||||
.await?
|
||||
.take(0)?;
|
||||
|
||||
info!("{:?}", jobs);
|
||||
Ok(jobs)
|
||||
}
|
||||
|
||||
@@ -95,8 +98,11 @@ impl JobQueue {
|
||||
}
|
||||
|
||||
/// Get unfinished jobs, ie newly created and in progress up two times
|
||||
pub async fn get_unfinished_jobs(&self) -> Result<Vec<Job>, AppError> {
|
||||
let jobs: Vec<Job> = self
|
||||
// pub async fn get_unfinished_jobs(&self) -> Result<Vec<Job>, AppError> {
|
||||
pub async fn get_unfinished_jobs(&self) -> Result<(), AppError> {
|
||||
info!("Getting unfinished jobs");
|
||||
// let jobs: Vec<Job> = self
|
||||
let jobs = self
|
||||
.db
|
||||
.query(
|
||||
"SELECT * FROM type::table($table)
|
||||
@@ -110,12 +116,14 @@ impl JobQueue {
|
||||
)
|
||||
.bind(("table", Job::table_name()))
|
||||
.bind(("max_attempts", MAX_ATTEMPTS))
|
||||
.await?
|
||||
.take(0)?;
|
||||
.await?;
|
||||
// .take(0)?;
|
||||
|
||||
println!("Unfinished jobs found: {}", jobs.len());
|
||||
info!("{:?}", jobs);
|
||||
// println!("Unfinished jobs found: {}", jobs.len());
|
||||
|
||||
Ok(jobs)
|
||||
Ok(())
|
||||
// Ok(jobs)
|
||||
}
|
||||
|
||||
// Method to process a single job
|
||||
@@ -124,12 +132,6 @@ impl JobQueue {
|
||||
job: Job,
|
||||
processor: &ContentProcessor,
|
||||
) -> Result<(), AppError> {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis()
|
||||
.to_string();
|
||||
|
||||
let current_attempts = match job.status {
|
||||
JobStatus::InProgress { attempts, .. } => attempts + 1,
|
||||
_ => 1,
|
||||
@@ -140,7 +142,7 @@ impl JobQueue {
|
||||
&job.id,
|
||||
JobStatus::InProgress {
|
||||
attempts: current_attempts,
|
||||
last_attempt: now.clone(),
|
||||
last_attempt: Utc::now(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -8,7 +8,7 @@ use axum::{extract::State, http::StatusCode, response::IntoResponse, Extension};
|
||||
use axum_typed_multipart::{FieldData, TryFromMultipart, TypedMultipart};
|
||||
use futures::{future::try_join_all, TryFutureExt};
|
||||
use tempfile::NamedTempFile;
|
||||
use tracing::info;
|
||||
use tracing::{debug, info};
|
||||
|
||||
#[derive(Debug, TryFromMultipart)]
|
||||
pub struct IngressParams {
|
||||
@@ -32,6 +32,8 @@ pub async fn ingress_data(
|
||||
}))
|
||||
.await?;
|
||||
|
||||
debug!("Got file infos");
|
||||
|
||||
let ingress_objects = create_ingress_objects(
|
||||
IngressInput {
|
||||
content: input.content,
|
||||
@@ -41,6 +43,7 @@ pub async fn ingress_data(
|
||||
},
|
||||
user.id.as_str(),
|
||||
)?;
|
||||
debug!("Got ingress objects");
|
||||
|
||||
let futures: Vec<_> = ingress_objects
|
||||
.into_iter()
|
||||
|
||||
@@ -37,6 +37,10 @@ pub async fn index_handler(
|
||||
false => 0,
|
||||
};
|
||||
|
||||
// let latest_text_contents = match auth.current_user.is_some() {
|
||||
// true =>
|
||||
// }
|
||||
|
||||
// let knowledge_entities = User::get_knowledge_entities(
|
||||
// &auth.current_user.clone().unwrap().id,
|
||||
// &state.surreal_db_client,
|
||||
|
||||
@@ -9,6 +9,7 @@ use axum_session_auth::AuthSession;
|
||||
use axum_session_surreal::SessionSurrealPool;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use surrealdb::{engine::any::Any, Surreal};
|
||||
use tracing::info;
|
||||
|
||||
use crate::{
|
||||
error::{AppError, HtmlError},
|
||||
@@ -63,7 +64,8 @@ pub async fn process_signup_and_show_verification(
|
||||
) -> Result<impl IntoResponse, HtmlError> {
|
||||
let user = match User::create_new(form.email, form.password, &state.surreal_db_client).await {
|
||||
Ok(user) => user,
|
||||
Err(_) => {
|
||||
Err(e) => {
|
||||
info!("{:?}", e);
|
||||
return Ok(Html("<p>User already exists</p>").into_response());
|
||||
}
|
||||
};
|
||||
|
||||
@@ -71,9 +71,12 @@ impl FileInfo {
|
||||
let uuid = Uuid::new_v4();
|
||||
let sanitized_file_name = Self::sanitize_file_name(&file_name);
|
||||
|
||||
let now = Utc::now();
|
||||
// Create new FileInfo instance
|
||||
let file_info = Self {
|
||||
id: uuid.to_string(),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
sha256,
|
||||
path: Self::persist_file(&uuid, file, &sanitized_file_name, user_id)
|
||||
.await?
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{ingress::types::ingress_object::IngressObject, stored_object};
|
||||
@@ -8,7 +7,7 @@ pub enum JobStatus {
|
||||
Created,
|
||||
InProgress {
|
||||
attempts: u32,
|
||||
last_attempt: String, // timestamp
|
||||
last_attempt: DateTime<Utc>,
|
||||
},
|
||||
Completed,
|
||||
Error(String),
|
||||
@@ -18,24 +17,18 @@ pub enum JobStatus {
|
||||
stored_object!(Job, "job", {
|
||||
content: IngressObject,
|
||||
status: JobStatus,
|
||||
created_at: String,
|
||||
updated_at: String,
|
||||
user_id: String
|
||||
});
|
||||
|
||||
impl Job {
|
||||
pub async fn new(content: IngressObject, user_id: String) -> Self {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis()
|
||||
.to_string();
|
||||
let now = Utc::now();
|
||||
|
||||
Self {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
content,
|
||||
status: JobStatus::Created,
|
||||
created_at: now.clone(),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
user_id,
|
||||
}
|
||||
|
||||
@@ -44,8 +44,11 @@ impl KnowledgeEntity {
|
||||
embedding: Vec<f32>,
|
||||
user_id: String,
|
||||
) -> Self {
|
||||
let now = Utc::now();
|
||||
Self {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
source_id,
|
||||
name,
|
||||
description,
|
||||
|
||||
@@ -18,8 +18,11 @@ impl KnowledgeRelationship {
|
||||
relationship_type: String,
|
||||
metadata: Option<serde_json::Value>,
|
||||
) -> Self {
|
||||
let now = Utc::now();
|
||||
Self {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
in_,
|
||||
out,
|
||||
relationship_type,
|
||||
|
||||
@@ -23,6 +23,7 @@ macro_rules! stored_object {
|
||||
use $crate::storage::types::StoredObject;
|
||||
use serde::de::{self, Visitor};
|
||||
use std::fmt;
|
||||
use chrono::{DateTime, Utc };
|
||||
|
||||
struct FlexibleIdVisitor;
|
||||
|
||||
@@ -64,10 +65,61 @@ macro_rules! stored_object {
|
||||
deserializer.deserialize_any(FlexibleIdVisitor)
|
||||
}
|
||||
|
||||
fn serialize_datetime<S>(date: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
Into::<surrealdb::sql::Datetime>::into(*date).serialize(serializer)
|
||||
}
|
||||
|
||||
// fn deserialize_datetime<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
|
||||
// where
|
||||
// D: serde::Deserializer<'de>,
|
||||
// {
|
||||
// let dt = surrealdb::sql::Datetime::deserialize(deserializer)?;
|
||||
// Ok(DateTime::<Utc>::from(dt))
|
||||
// }
|
||||
fn deserialize_datetime<'de, D>(deserializer: D) -> Result<DateTime<Utc>, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
use serde::de::Error;
|
||||
|
||||
// Accept various formats
|
||||
let value = serde_json::Value::deserialize(deserializer)?;
|
||||
|
||||
match value {
|
||||
// Handle string format
|
||||
serde_json::Value::String(s) => {
|
||||
if s.starts_with("d\"") && s.ends_with('\"') {
|
||||
let cleaned = &s[2..s.len()-1];
|
||||
DateTime::parse_from_rfc3339(cleaned)
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
.map_err(Error::custom)
|
||||
} else {
|
||||
DateTime::parse_from_rfc3339(&s)
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
.map_err(Error::custom)
|
||||
}
|
||||
},
|
||||
// Handle object format (in case SurrealDB returns datetime as an object)
|
||||
serde_json::Value::Object(_) => {
|
||||
let dt = surrealdb::sql::Datetime::deserialize(value)
|
||||
.map_err(Error::custom)?;
|
||||
Ok(DateTime::<Utc>::from(dt))
|
||||
},
|
||||
_ => Err(Error::custom("unexpected datetime format")),
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct $name {
|
||||
#[serde(deserialize_with = "deserialize_flexible_id")]
|
||||
pub id: String,
|
||||
#[serde(serialize_with = "serialize_datetime", deserialize_with = "deserialize_datetime", default)]
|
||||
pub created_at: DateTime<Utc>,
|
||||
#[serde(serialize_with = "serialize_datetime", deserialize_with = "deserialize_datetime", default)]
|
||||
pub updated_at: DateTime<Utc>,
|
||||
$(pub $field: $ty),*
|
||||
}
|
||||
|
||||
|
||||
@@ -10,8 +10,11 @@ stored_object!(TextChunk, "text_chunk", {
|
||||
|
||||
impl TextChunk {
|
||||
pub fn new(source_id: String, chunk: String, embedding: Vec<f32>, user_id: String) -> Self {
|
||||
let now = Utc::now();
|
||||
Self {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
source_id,
|
||||
chunk,
|
||||
embedding,
|
||||
|
||||
@@ -20,8 +20,11 @@ impl TextContent {
|
||||
file_info: Option<FileInfo>,
|
||||
user_id: String,
|
||||
) -> Self {
|
||||
let now = Utc::now();
|
||||
Self {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
text,
|
||||
file_info,
|
||||
instructions,
|
||||
|
||||
@@ -7,7 +7,7 @@ use axum_session_auth::Authentication;
|
||||
use surrealdb::{engine::any::Any, Surreal};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::knowledge_entity::KnowledgeEntity;
|
||||
use super::{knowledge_entity::KnowledgeEntity, text_content::TextContent};
|
||||
|
||||
stored_object!(User, "user", {
|
||||
email: String,
|
||||
@@ -47,6 +47,8 @@ impl User {
|
||||
return Err(AppError::Auth("User already exists".into()));
|
||||
}
|
||||
|
||||
let now = Utc::now();
|
||||
|
||||
let id = Uuid::new_v4().to_string();
|
||||
let user: Option<User> = db
|
||||
.client
|
||||
@@ -54,11 +56,15 @@ impl User {
|
||||
"CREATE type::thing('user', $id) SET
|
||||
email = $email,
|
||||
password = crypto::argon2::generate($password),
|
||||
anonymous = false",
|
||||
anonymous = false,
|
||||
created_at = $created_at,
|
||||
updated_at = $updated_at",
|
||||
)
|
||||
.bind(("id", id))
|
||||
.bind(("email", email))
|
||||
.bind(("password", password))
|
||||
.bind(("created_at", now))
|
||||
.bind(("updated_at", now))
|
||||
.await?
|
||||
.take(0)?;
|
||||
|
||||
@@ -169,4 +175,18 @@ impl User {
|
||||
|
||||
Ok(entities)
|
||||
}
|
||||
|
||||
pub async fn get_latest_text_contents(
|
||||
id: &str,
|
||||
db: &SurrealDbClient,
|
||||
) -> Result<Vec<TextContent>, AppError> {
|
||||
let items: Vec<TextContent> = db
|
||||
.client
|
||||
.query("SELECT * FROM text_content WHERE user_id = $user_id ORDER BY created_at DESC LIMIT 5")
|
||||
.bind(("user_id", id.to_owned()))
|
||||
.await?
|
||||
.take(0)?;
|
||||
|
||||
Ok(items)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user