ingestion-pipeline crated init, begun moving

This commit is contained in:
Per Stark
2025-03-06 15:29:13 +01:00
parent d156f9e48e
commit 812bce27d1
21 changed files with 648 additions and 601 deletions

View File

@@ -6,7 +6,7 @@ use axum::{
Router,
};
use middleware_api_auth::api_auth;
use routes::ingress::ingress_data;
use routes::ingress::ingest_data;
pub mod api_state;
mod middleware_api_auth;
@@ -19,7 +19,7 @@ where
ApiState: FromRef<S>,
{
Router::new()
.route("/ingress", post(ingress_data))
.route("/ingress", post(ingest_data))
.layer(DefaultBodyLimit::max(1024 * 1024 * 1024))
.route_layer(from_fn_with_state(app_state.clone(), api_auth))
}

View File

@@ -2,17 +2,19 @@ use axum::{extract::State, http::StatusCode, response::IntoResponse, Extension};
use axum_typed_multipart::{FieldData, TryFromMultipart, TypedMultipart};
use common::{
error::{ApiError, AppError},
ingress::ingress_object::IngressObject,
storage::types::{file_info::FileInfo, job::Job, user::User},
storage::types::{
file_info::FileInfo, ingestion_payload::IngestionPayload, ingestion_task::IngestionTask,
user::User,
},
};
use futures::{future::try_join_all, TryFutureExt};
use tempfile::NamedTempFile;
use tracing::{debug, info};
use tracing::info;
use crate::api_state::ApiState;
#[derive(Debug, TryFromMultipart)]
pub struct IngressParams {
pub struct IngestParams {
pub content: Option<String>,
pub instructions: String,
pub category: String,
@@ -21,10 +23,10 @@ pub struct IngressParams {
pub files: Vec<FieldData<NamedTempFile>>,
}
pub async fn ingress_data(
pub async fn ingest_data(
State(state): State<ApiState>,
Extension(user): Extension<User>,
TypedMultipart(input): TypedMultipart<IngressParams>,
TypedMultipart(input): TypedMultipart<IngestParams>,
) -> Result<impl IntoResponse, ApiError> {
info!("Received input: {:?}", input);
@@ -36,20 +38,19 @@ pub async fn ingress_data(
)
.await?;
debug!("Got file infos");
let ingress_objects = IngressObject::create_ingress_objects(
let payloads = IngestionPayload::create_ingestion_payload(
input.content,
input.instructions,
input.category,
file_infos,
user.id.as_str(),
)?;
debug!("Got ingress objects");
let futures: Vec<_> = ingress_objects
let futures: Vec<_> = payloads
.into_iter()
.map(|object| Job::create_and_add_to_db(object.clone(), user.id.clone(), &state.db))
.map(|object| {
IngestionTask::create_and_add_to_db(object.clone(), user.id.clone(), &state.db)
})
.collect();
try_join_all(futures).await.map_err(AppError::from)?;