feat: database migrations

This commit is contained in:
Per Stark
2025-05-04 21:16:09 +02:00
parent 43fcf6032d
commit 4d1d1eb22c
28 changed files with 880 additions and 218 deletions

View File

@@ -1,15 +1,18 @@
use crate::error::AppError;
use super::types::{analytics::Analytics, system_settings::SystemSettings, StoredObject};
use crate::error::AppError;
use axum_session::{SessionConfig, SessionError, SessionStore};
use axum_session_surreal::SessionSurrealPool;
use futures::Stream;
use include_dir::{include_dir, Dir};
use std::{ops::Deref, sync::Arc};
use surrealdb::{
engine::any::{connect, Any},
opt::auth::Root,
Error, Notification, Surreal,
};
use surrealdb_migrations::MigrationRunner;
static MIGRATIONS_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/");
#[derive(Clone)]
pub struct SurrealDbClient {
@@ -56,44 +59,60 @@ impl SurrealDbClient {
.await
}
pub async fn ensure_initialized(&self) -> Result<(), AppError> {
Self::build_indexes(self).await?;
Self::setup_auth(self).await?;
Analytics::ensure_initialized(self).await?;
SystemSettings::ensure_initialized(self).await?;
/// Applies all pending database migrations found in the embedded MIGRATIONS_DIR.
///
/// This function should be called during application startup, after connecting to
/// the database and selecting the appropriate namespace and database, but before
/// the application starts performing operations that rely on the schema.
pub async fn apply_migrations(&self) -> Result<(), AppError> {
// Instantiate the runner, load embedded files, and run 'up'
MigrationRunner::new(&self.client)
.load_files(&MIGRATIONS_DIR)
.up()
.await
.map_err(|e| AppError::Processing(e.to_string()))?;
Ok(())
}
pub async fn setup_auth(&self) -> Result<(), Error> {
self.client.query(
"DEFINE TABLE user SCHEMALESS;
DEFINE INDEX unique_name ON TABLE user FIELDS email UNIQUE;
DEFINE ACCESS account ON DATABASE TYPE RECORD
SIGNUP ( CREATE user SET email = $email, password = crypto::argon2::generate($password), anonymous = false, user_id = $user_id)
SIGNIN ( SELECT * FROM user WHERE email = $email AND crypto::argon2::compare(password, $password) );",
)
.await?;
Ok(())
}
// pub async fn ensure_initialized(&self) -> Result<(), AppError> {
// Self::build_indexes(self).await?;
// Self::setup_auth(self).await?;
pub async fn build_indexes(&self) -> Result<(), Error> {
self.client.query("DEFINE INDEX idx_embedding_chunks ON text_chunk FIELDS embedding HNSW DIMENSION 1536").await?;
self.client.query("DEFINE INDEX idx_embedding_entities ON knowledge_entity FIELDS embedding HNSW DIMENSION 1536").await?;
// Analytics::ensure_initialized(self).await?;
// SystemSettings::ensure_initialized(self).await?;
self.client
.query("DEFINE INDEX idx_job_status ON job FIELDS status")
.await?;
self.client
.query("DEFINE INDEX idx_job_user ON job FIELDS user_id")
.await?;
self.client
.query("DEFINE INDEX idx_job_created ON job FIELDS created_at")
.await?;
// Ok(())
// }
Ok(())
}
// pub async fn setup_auth(&self) -> Result<(), Error> {
// self.client.query(
// "DEFINE TABLE user SCHEMALESS;
// DEFINE INDEX unique_name ON TABLE user FIELDS email UNIQUE;
// DEFINE ACCESS account ON DATABASE TYPE RECORD
// SIGNUP ( CREATE user SET email = $email, password = crypto::argon2::generate($password), anonymous = false, user_id = $user_id)
// SIGNIN ( SELECT * FROM user WHERE email = $email AND crypto::argon2::compare(password, $password) );",
// )
// .await?;
// Ok(())
// }
// pub async fn build_indexes(&self) -> Result<(), Error> {
// self.client.query("DEFINE INDEX idx_embedding_chunks ON text_chunk FIELDS embedding HNSW DIMENSION 1536").await?;
// self.client.query("DEFINE INDEX idx_embedding_entities ON knowledge_entity FIELDS embedding HNSW DIMENSION 1536").await?;
// self.client
// .query("DEFINE INDEX idx_job_status ON job FIELDS status")
// .await?;
// self.client
// .query("DEFINE INDEX idx_job_user ON job FIELDS user_id")
// .await?;
// self.client
// .query("DEFINE INDEX idx_job_created ON job FIELDS created_at")
// .await?;
// Ok(())
// }
pub async fn rebuild_indexes(&self) -> Result<(), Error> {
self.client
@@ -222,7 +241,7 @@ mod tests {
.expect("Failed to start in-memory surrealdb");
// Call your initialization
db.ensure_initialized()
db.apply_migrations()
.await
.expect("Failed to initialize schema");
@@ -268,25 +287,15 @@ mod tests {
}
#[tokio::test]
async fn test_setup_auth() {
let namespace = "test_ns";
let database = &Uuid::new_v4().to_string(); // ensures isolation per test run
let db = SurrealDbClient::memory(namespace, database)
.await
.expect("Failed to start in-memory surrealdb");
// Should not panic or fail
db.setup_auth().await.expect("Failed to setup auth");
}
#[tokio::test]
async fn test_build_indexes() {
async fn test_applying_migrations() {
let namespace = "test_ns";
let database = &Uuid::new_v4().to_string();
let db = SurrealDbClient::memory(namespace, database)
.await
.expect("Failed to start in-memory surrealdb");
db.build_indexes().await.expect("Failed to build indexes");
db.apply_migrations()
.await
.expect("Failed to build indexes");
}
}

View File

@@ -18,7 +18,7 @@ pub enum IngestionTaskStatus {
Cancelled,
}
stored_object!(IngestionTask, "job", {
stored_object!(IngestionTask, "ingestion_task", {
content: IngestionPayload,
status: IngestionTaskStatus,
user_id: String

View File

@@ -199,7 +199,7 @@ impl User {
.client
.query(
"UPDATE type::thing('user', $id)
SET api_key = NULL
SET api_key = test_string_nullish
RETURN AFTER",
)
.bind(("id", id.to_owned()))
@@ -520,9 +520,9 @@ mod tests {
.await
.expect("Failed to start in-memory surrealdb");
db.ensure_initialized()
db.apply_migrations()
.await
.expect("Failed to setup the systemsettings");
.expect("Failed to setup the migrations");
db
}