From e7d01e90506ca4a6ff14a485a85dddaa156ebca5 Mon Sep 17 00:00:00 2001 From: Gregory Schier Date: Thu, 19 Feb 2026 13:53:06 -0800 Subject: [PATCH] Fix OAuth plugin transport errors and ad-hoc response persistence --- crates/yaak/src/send.rs | 161 +++++++++--------- plugins/auth-oauth2/src/fetchAccessToken.ts | 4 + .../src/getOrRefreshAccessToken.ts | 4 + 3 files changed, 93 insertions(+), 76 deletions(-) diff --git a/crates/yaak/src/send.rs b/crates/yaak/src/send.rs index 56a566e9..9082c2c5 100644 --- a/crates/yaak/src/send.rs +++ b/crates/yaak/src/send.rs @@ -24,7 +24,7 @@ use yaak_models::models::{ HttpResponseEvent, HttpResponseHeader, HttpResponseState, ProxySetting, ProxySettingAuth, }; use yaak_models::query_manager::QueryManager; -use yaak_models::util::UpdateSource; +use yaak_models::util::{UpdateSource, generate_prefixed_id}; use yaak_plugins::events::{ CallHttpAuthenticationRequest, HttpHeader, PluginContext, RenderPurpose, }; @@ -488,11 +488,16 @@ pub async fn send_http_request( response.elapsed = 0; response.elapsed_headers = 0; response.elapsed_dns = 0; - response = params - .query_manager - .connect() - .upsert_http_response(&response, ¶ms.update_source, params.blob_manager) - .map_err(SendHttpRequestError::PersistResponse)?; + let persist_response = !response.request_id.is_empty(); + if persist_response { + response = params + .query_manager + .connect() + .upsert_http_response(&response, ¶ms.update_source, params.blob_manager) + .map_err(SendHttpRequestError::PersistResponse)?; + } else if response.id.is_empty() { + response.id = generate_prefixed_id("rs"); + } let (event_tx, mut event_rx) = mpsc::channel::(HTTP_EVENT_CHANNEL_CAPACITY); @@ -503,16 +508,18 @@ pub async fn send_http_request( let emit_events_to = params.emit_events_to.clone(); let event_handle = tokio::spawn(async move { while let Some(event) = event_rx.recv().await { - let db_event = HttpResponseEvent::new( - &event_response_id, - &event_workspace_id, - event.clone().into(), - ); - if let Err(err) = event_query_manager - .connect() - .upsert_http_response_event(&db_event, &event_update_source) - { - warn!("Failed to persist HTTP response event: {}", err); + if persist_response { + let db_event = HttpResponseEvent::new( + &event_response_id, + &event_workspace_id, + event.clone().into(), + ); + if let Err(err) = event_query_manager + .connect() + .upsert_http_response_event(&db_event, &event_update_source) + { + warn!("Failed to persist HTTP response event: {}", err); + } } if let Some(tx) = emit_events_to.as_ref() { @@ -531,15 +538,17 @@ pub async fn send_http_request( Ok(response) => response, Err(err) => { persist_cookie_jar(params.query_manager, cookie_jar.as_mut(), cookie_store.as_ref())?; - let _ = persist_response_error( - params.query_manager, - params.blob_manager, - ¶ms.update_source, - &response, - started_at, - err.to_string(), - request_started_url, - ); + if persist_response { + let _ = persist_response_error( + params.query_manager, + params.blob_manager, + ¶ms.update_source, + &response, + started_at, + err.to_string(), + request_started_url, + ); + } if let Err(join_err) = event_handle.await { warn!("Failed to join response event task: {}", join_err); } @@ -548,40 +557,39 @@ pub async fn send_http_request( }; let headers_elapsed = duration_to_i32(started_at.elapsed()); - response = params - .query_manager - .connect() - .upsert_http_response( - &HttpResponse { - state: HttpResponseState::Connected, - elapsed_headers: headers_elapsed, - status: i32::from(http_response.status), - status_reason: http_response.status_reason.clone(), - url: http_response.url.clone(), - remote_addr: http_response.remote_addr.clone(), - version: http_response.version.clone(), - headers: http_response - .headers - .iter() - .map(|(name, value)| HttpResponseHeader { - name: name.clone(), - value: value.clone(), - }) - .collect(), - request_headers: http_response - .request_headers - .iter() - .map(|(name, value)| HttpResponseHeader { - name: name.clone(), - value: value.clone(), - }) - .collect(), - ..response - }, - ¶ms.update_source, - params.blob_manager, - ) - .map_err(SendHttpRequestError::PersistResponse)?; + let connected_response = HttpResponse { + state: HttpResponseState::Connected, + elapsed_headers: headers_elapsed, + status: i32::from(http_response.status), + status_reason: http_response.status_reason.clone(), + url: http_response.url.clone(), + remote_addr: http_response.remote_addr.clone(), + version: http_response.version.clone(), + headers: http_response + .headers + .iter() + .map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() }) + .collect(), + request_headers: http_response + .request_headers + .iter() + .map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() }) + .collect(), + ..response + }; + if persist_response { + response = params + .query_manager + .connect() + .upsert_http_response( + &connected_response, + ¶ms.update_source, + params.blob_manager, + ) + .map_err(SendHttpRequestError::PersistResponse)?; + } else { + response = connected_response; + } let (response_body, body_stats) = http_response.bytes().await.map_err(SendHttpRequestError::ReadResponseBody)?; @@ -598,23 +606,24 @@ pub async fn send_http_request( SendHttpRequestError::WriteResponseBody { path: body_path.clone(), source } })?; - response = params - .query_manager - .connect() - .upsert_http_response( - &HttpResponse { - body_path: Some(body_path.to_string_lossy().to_string()), - content_length: Some(usize_to_i32(response_body.len())), - content_length_compressed: Some(u64_to_i32(body_stats.size_compressed)), - elapsed: duration_to_i32(started_at.elapsed()), - elapsed_headers: headers_elapsed, - state: HttpResponseState::Closed, - ..response - }, - ¶ms.update_source, - params.blob_manager, - ) - .map_err(SendHttpRequestError::PersistResponse)?; + let final_response = HttpResponse { + body_path: Some(body_path.to_string_lossy().to_string()), + content_length: Some(usize_to_i32(response_body.len())), + content_length_compressed: Some(u64_to_i32(body_stats.size_compressed)), + elapsed: duration_to_i32(started_at.elapsed()), + elapsed_headers: headers_elapsed, + state: HttpResponseState::Closed, + ..response + }; + if persist_response { + response = params + .query_manager + .connect() + .upsert_http_response(&final_response, ¶ms.update_source, params.blob_manager) + .map_err(SendHttpRequestError::PersistResponse)?; + } else { + response = final_response; + } if let Err(join_err) = event_handle.await { warn!("Failed to join response event task: {}", join_err); diff --git a/plugins/auth-oauth2/src/fetchAccessToken.ts b/plugins/auth-oauth2/src/fetchAccessToken.ts index 56e0335f..d0c28c8d 100644 --- a/plugins/auth-oauth2/src/fetchAccessToken.ts +++ b/plugins/auth-oauth2/src/fetchAccessToken.ts @@ -61,6 +61,10 @@ export async function fetchAccessToken( console.log('[oauth2] Got access token response', resp.status); + if (resp.error) { + throw new Error(`Failed to fetch access token: ${resp.error}`); + } + const body = resp.bodyPath ? readFileSync(resp.bodyPath, 'utf8') : ''; if (resp.status < 200 || resp.status >= 300) { diff --git a/plugins/auth-oauth2/src/getOrRefreshAccessToken.ts b/plugins/auth-oauth2/src/getOrRefreshAccessToken.ts index 07fc4ea0..fcd47148 100644 --- a/plugins/auth-oauth2/src/getOrRefreshAccessToken.ts +++ b/plugins/auth-oauth2/src/getOrRefreshAccessToken.ts @@ -71,6 +71,10 @@ export async function getOrRefreshAccessToken( httpRequest.authenticationType = 'none'; // Don't inherit workspace auth const resp = await ctx.httpRequest.send({ httpRequest }); + if (resp.error) { + throw new Error(`Failed to refresh access token: ${resp.error}`); + } + if (resp.status >= 400 && resp.status < 500) { // Client errors (4xx) indicate the refresh token is invalid, expired, or revoked // Delete the token and return null to trigger a fresh authorization flow